In [4]:
from easydict import EasyDict as edict
import pandas as pd
import pydgraph
import hashlib
import tqdm
import gzip
import csv
import os


filepath = './common-crawl/' + os.listdir('./common-crawl/')[0]

# Preview data
df = pd.read_csv(filepath, nrows=10, compression='gzip', usecols=['asn_num', 'domain', 'ip', 'country', 'asn_org', 'path'])
df.to_csv('sample.csv')
df.head(10)

Unnamed: 0,domain,ip,asn_num,country,asn_org,path
0,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/consejos-sonidos-eventos-traductores.php
1,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/contacto-interpretes-traductores-de-confer...
2,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/contratar-interprete-de-conferencia-aice.php
3,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/enlace-1-traductor-interprete.php
4,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/enlace-6-traductor-interprete.php
5,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/equipo-tecnico-eventos-traductores.php
6,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/estudiar-grado-traductor-interprete.php
7,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/estudiar-interpretacion.php
8,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/estudiar-postgrado-traductor-interprete.php
9,aice-interpretes.com,89.140.72.171,6739,ES,ONO-AS Cableuropa - ONO,/es/experiencia-especializacion.php


In [2]:
# Create a client.
def get_client():
    client_stub = pydgraph.DgraphClientStub('localhost:9080')
    return pydgraph.DgraphClient(client_stub)


# Drop All - discard all data and start from a clean slate.
def drop_all(client):
    print('Dropping all')
    return client.alter(pydgraph.Operation(drop_all=True))


# Set schema.
def set_schema(client):
    print('Setting Schema')
    schema = """
    asnnum: int @index(int) .
    org: string .
    domains: [uid] @reverse .
    country: [uid] @reverse .
    
    type ASN {
        asnnum
        org
        domains
    }
    
    domain: string @index(term,exact) .
    tld: string .
    ip: string .
    documents: [uid] @reverse .
    
    type Domain {
        domain
        tld
        ip
        documents
        country
    }
    
    path: string @index(term) .
    type Document {
        path
    }
    
    country_code: string @index(exact) .
    type Country {
        country_code
        domains
    }
    
    """
    return client.alter(pydgraph.Operation(schema=schema))

In [3]:
def insert(client, filename, batch_size=100, iterations=None):
    # Create caches
    domain_uids = dict() 
    asn_uids = dict()
    country_uids = dict()
    
    # Create file read streamer
    reader = csv.DictReader(gzip.open(filename, 'rt'))
    
    # Create a new transaction.
    txn = client.txn()
    try:
        n=0
        for index, row in tqdm.tqdm(enumerate(reader)):
            row = edict(row)
            
            # Create ASN if not exists
            if row.asn_num not in asn_uids:
                asn = {
                    'uid': '_:' + str(row.asn_num),
                    'dgraph.type': 'ASN',
                    'asnnum': row.asn_num,
                    'org': row.asn_org
                }
                response = txn.mutate(set_obj=asn)
                asn_uids[row.asn_num] = response.uids[str(row.asn_num)]
                
            # Create country if not exists
            if row.country not in country_uids:
                country = {
                    'uid': '_:' + row.country,
                    'dgraph.type': 'Country',
                    'country_code': row.country,
                }
                response = txn.mutate(set_obj=country)
                country_uids[row.country] = response.uids[row.country]
            
            # Create domain if not exists
            if row.domain not in domain_uids:
                domain = {
                    'uid': '_:' + row.domain,
                    'dgraph.type': 'Domain',
                    'domain': row.domain,
                    'tld': row.domain.split('.')[-1],
                    'ip': row.ip,
                }
                response = txn.mutate(set_obj=domain)
                domain_uids[row.domain] = response.uids[row.domain]
                
                # Draw edge from asn to domain
                edge = {
                    'uid': asn_uids[row.asn_num], 
                    'domains': [
                        {'uid': domain_uids[row.domain]},
                    ]
                }
                response = txn.mutate(set_obj=edge)
                
                # Draw edge from country to domain
                edge = {
                    'uid': country_uids[row.country],
                    'domains': [{'uid': domain_uids[row.domain]}]
                }
                response = txn.mutate(set_obj=edge)
                
                # Draw edge from domain to country
                edge = {
                    'uid': domain_uids[row.domain],
                    'country': [{'uid': country_uids[row.country]}]
                }
                response = txn.mutate(set_obj=edge)
            
            # Create document
            doc_uid = hashlib.md5(row.path.encode()).hexdigest()
            document = {
                'uid': '_:' + doc_uid,
                'dgraph.type': 'Document',
                'path': row.path,
            }
            response = txn.mutate(set_obj=document)
            
            # Draw edge to domain
            edge = {
                'uid': domain_uids[row.domain],
                'documents': [{'uid': response.uids[doc_uid]}]
            }
            response = txn.mutate(set_obj=edge)
            
            n+=1
            if n == batch_size:
                n=0
                txn.commit()
                
                # If max iterations exceded, return 
                if iterations is not None and index > iterations:
                    return
                
                # Help garbage collection
                del txn
                txn = client.txn()

        # Commit transaction.
        txn.commit()

    finally:
        # Clean up. Calling this after txn.commit() is a no-op and hence safe.
        txn.discard()
        
    reader.close()

client = get_client()
drop_all(client)
set_schema(client)
insert(client, filepath, batch_size=100, iterations=10000)

Dropping all
Setting Schema


10099it [01:28, 114.01it/s]
