In [1]:
from neo4j import GraphDatabase
import pandas as pd
import numpy as np
from tqdm import tqdm
import json
import time

## 前處理
以 arxiv 資料及為例

In [2]:
file = "data/arxiv_metadata.json"

metadata  = []

lines = 1000    # 1k for testing

with open(file, 'r') as f:
    
    for line in tqdm(f):
        metadata.append(json.loads(line))
        lines -= 1
        if lines == 0: break
            
df = pd.DataFrame(metadata)

999it [00:00, 26177.06it/s]


In [3]:
df.shape

(1000, 14)

In [4]:
def get_author_list(line):
    # Cleans author dataframe column, creating a list of authors in the row.
    return [e[1] + ' ' + e[0] for e in line]


def get_category_list(line):
    # Cleans category dataframe column, creating a list of categories in the row.
    return list(line.split(" "))


df['cleaned_authors_list'] = df['authors_parsed'].map(get_author_list)
df['category_list'] = df['categories'].map(get_category_list)
df = df.drop(['submitter', 'authors', 
             'comments', 'journal-ref', 
             'doi', 'report-no', 'license', 
             'versions', 'update_date', 
             'abstract', 'authors_parsed', 
             'categories'], axis=1)

In [5]:
df.head()

Unnamed: 0,id,title,cleaned_authors_list,category_list
0,704.0001,Calculation of prompt diphoton production cros...,"[C. Balázs, E. L. Berger, P. M. Nadolsky, C. -...",[hep-ph]
1,704.0002,Sparsity-certifying Graph Decompositions,"[Ileana Streinu, Louis Theran]","[math.CO, cs.CG]"
2,704.0003,The evolution of the Earth-Moon system based o...,[Hongjun Pan],[physics.gen-ph]
3,704.0004,A determinant of Stirling cycle numbers counts...,[David Callan],[math.CO]
4,704.0005,From dyadic $\Lambda_{\alpha}$ to $\Lambda_{\a...,"[Wael Abu-Shammala, Alberto Torchinsky]","[math.CA, math.FA]"


## Create neo4j connect
Neo4j 是用來建立從python 傳 cypher 的橋樑

並沒有 像 py2neo 有獨立的建立方式

---

Neo4j 是連接整個 driver （包含底下的所有 DB，所以下 query 的時候要指定 DB)。（不確定是否可以指定 system）

而 py2neo 是以 graph 為單位進行連接的


In [6]:

class Neo4jConnection:
    '''
    準備一個 connector
    + 負責建橋、砸橋（？
    + 傳入 cypher 到 neo4j 並接受 RETRUN 的東西
    '''
    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db='arXiv'):
        assert self.__driver is not None, "Driver not initialized!"
        session = None 
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response


In [7]:
conn = Neo4jConnection(uri="bolt://localhost:7687", user="neo4j", pwd="123")

In [8]:
conn.query("CREATE CONSTRAINT unique_paper FOR (p:Paper) REQUIRE p.id IS UNIQUE", db='arXiv')
conn.query("CREATE CONSTRAINT unique_author FOR (a:Author) REQUIRE a.name IS UNIQUE", db='arXiv')
conn.query("CREATE CONSTRAINT unique_category FOR (c:Category) REQUIRE c.category IS UNIQUE", db='arXiv')

Query failed: {code: Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists} {message: An equivalent constraint already exists, 'Constraint( id=4, name='unique_paper', type='UNIQUENESS', schema=(:Paper {id}), ownedIndex=3 )'.}
Query failed: {code: Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists} {message: An equivalent constraint already exists, 'Constraint( id=6, name='unique_author', type='UNIQUENESS', schema=(:Author {name}), ownedIndex=5 )'.}
Query failed: {code: Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists} {message: An equivalent constraint already exists, 'Constraint( id=8, name='unique_category', type='UNIQUENESS', schema=(:Category {category}), ownedIndex=7 )'.}


In [9]:
def add_categories(categories):
    # Adds category nodes to the Neo4j graph.
    # $ 代表要傳入的參數
    # {} 是 cypher 跟外部無關
    # UNWIND 像是 for 迴圈，遍歷所有對象
    # MERGE = if not MATCH, then CREATE
    # to_dict = [{'category': 'name_1'}, {'category': 'name_2'}, .....]
    query = '''
            UNWIND $rows AS row 
            MERGE (c:Category {category: row.category})
            RETURN count(*) as total
            '''
    return conn.query(query, parameters = {'rows':categories.to_dict('records')})

In [10]:
def insert_data(query, rows, batch_size = 100):
    # Function to handle the updating the Neo4j database in batch mode.
    
    total = 0
    batch = 0
    start = time.time()
    result = None
    
    while batch * batch_size < len(rows):

        res = conn.query(query, 
                         parameters = {'rows': rows[batch*batch_size:(batch+1)*batch_size].to_dict('records')})
        total += res[0]['total'] # 回傳的東西會是 list
        batch += 1
        result = {"total":total, 
                  "batches":batch, 
                  "time":time.time()-start}
        print(result)
        
    return result

In [11]:
def add_authors(rows, batch_size=100):
    # Adds author nodes to the Neo4j graph as a batch job.
    query = '''
            UNWIND $rows AS row
            MERGE (:Author {name: row.author})
            RETURN count(*) as total
            '''
    return insert_data(query, rows, batch_size)

In [12]:
def add_papers(rows, batch_size=50):
   # Adds paper nodes and (:Author)--(:Paper) and 
   # (:Paper)--(:Category) relationships to the Neo4j graph as a 
   # batch job.
   # MERGE ON CREATE SET : Merge a node and set properties if the node needs to be created.
   # MERGE ON MATCH SET : Merge a node and set properties if the node are found.
   
   query = '''
   UNWIND $rows as row
   MERGE (p:Paper {id:row.id}) ON CREATE SET p.title = row.title
 
   // connect categories
   WITH row, p
   UNWIND row.category_list AS category_name
   MATCH (c:Category {category: category_name})
   MERGE (p)-[:IN_CATEGORY]->(c)
 
   // connect authors
   WITH distinct row, p // reduce cardinality
   UNWIND row.cleaned_authors_list AS author
   MATCH (a:Author {name: author})
   MERGE (a)-[:AUTHORED]->(p)
   RETURN count(distinct p) as total
   '''
 
   return insert_data(query, rows, batch_size)

In [13]:
categories = pd.DataFrame(df[['category_list']])
categories.rename(columns={'category_list':'category'},
                  inplace=True)
categories = categories.explode('category') \
                       .drop_duplicates(subset=['category'])

authors = pd.DataFrame(df[['cleaned_authors_list']])
authors.rename(columns={'cleaned_authors_list':'author'},
               inplace=True)
authors=authors.explode('author').drop_duplicates(subset=['author'])

add_categories(categories)
add_authors(authors)
add_papers(df)

{'total': 100, 'batches': 1, 'time': 0.30388498306274414}
{'total': 200, 'batches': 2, 'time': 0.4838831424713135}
{'total': 300, 'batches': 3, 'time': 0.6468002796173096}
{'total': 400, 'batches': 4, 'time': 0.8205969333648682}
{'total': 500, 'batches': 5, 'time': 0.9042003154754639}
{'total': 600, 'batches': 6, 'time': 0.983361005783081}
{'total': 700, 'batches': 7, 'time': 1.0516211986541748}
{'total': 800, 'batches': 8, 'time': 1.1373450756072998}
{'total': 900, 'batches': 9, 'time': 1.2101860046386719}
{'total': 1000, 'batches': 10, 'time': 1.2697420120239258}
{'total': 1100, 'batches': 11, 'time': 1.4041051864624023}
{'total': 1200, 'batches': 12, 'time': 1.4376680850982666}
{'total': 1300, 'batches': 13, 'time': 1.476639986038208}
{'total': 1400, 'batches': 14, 'time': 1.5192251205444336}
{'total': 1500, 'batches': 15, 'time': 1.5577671527862549}
{'total': 1600, 'batches': 16, 'time': 1.599647045135498}
{'total': 1700, 'batches': 17, 'time': 1.65061616897583}
{'total': 1800, 'ba

{'total': 1000, 'batches': 20, 'time': 2.788080930709839}

喔我的電腦要燒起來了

In [14]:

# SIZE 計算數量
query_string = '''
MATCH (c:Category) 
RETURN c.category, SIZE(()-[:IN_CATEGORY]->(c)) AS inDegree 
ORDER BY inDegree DESC LIMIT 20
'''
top_cat_df = pd.DataFrame([dict(_) for _ in conn.query(query_string)])
top_cat_df.head(20)

Unnamed: 0,c.category,inDegree
0,astro-ph,208
1,hep-th,89
2,hep-ph,80
3,quant-ph,67
4,cond-mat.mtrl-sci,56
5,cond-mat.str-el,55
6,gr-qc,55
7,cond-mat.mes-hall,50
8,cond-mat.stat-mech,47
9,math.MP,35
