导入相关的库

In [97]:
import pandas as pd  
from neo4j import GraphDatabase  
import time  

neo4j配置

In [98]:
NEO4J_URI="bolt://localhost:7687"  
NEO4J_USERNAME="neo4j"  
NEO4J_PASSWORD="YH1223725"  
NEO4J_DATABASE="neo4j"  
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))  

定义一个批处理方法， 使用批处理方法将数据导入 Neo4j 。

参数：statement 是要执行的 Cypher 查询，df 是要导入的数据框，batch_size 是每批要导入的行数。

In [99]:
def batched_import(statement, df, batch_size=1000):  
    total = len(df)  
    start_s = time.time()  
    for start in range(0,total, batch_size):  
        batch = df.iloc[start: min(start+batch_size,total)]  
        result = driver.execute_query("UNWIND $rows AS value " + statement,  
                                      rows=batch.to_dict('records'),  
                                      database_=NEO4J_DATABASE)  
        print(result.summary.counters)  
    print(f'{total} rows in { time.time() - start_s} s.')  
    return total  


In [100]:
statements = """  
create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique;  
create constraint document_id if not exists for (d:__Document__) require d.id is unique;  
create constraint entity_id if not exists for (c:__Community__) require c.community is unique;  
create constraint entity_id if not exists for (e:__Entity__) require e.id is unique;  
create constraint entity_title if not exists for (e:__Entity__) require e.name is unique;  
create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique;  
create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique;  
""".split(";")  
for statement in statements:  
    if len((statement or "").strip()) > 0:  
        print(statement)  
        driver.execute_query(statement) 

  
create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique
  
create constraint document_id if not exists for (d:__Document__) require d.id is unique
  
create constraint entity_id if not exists for (c:__Community__) require c.community is unique
  
create constraint entity_id if not exists for (e:__Entity__) require e.id is unique
  
create constraint entity_title if not exists for (e:__Entity__) require e.name is unique
  
create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique
  
create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique


In [101]:
doc_df = pd.read_parquet(f'D:/GISERR/graduate_study/pytorch/graphrag/ragtest/output/create_final_documents.parquet', columns=["id", "title"])  
print(doc_df.head(2)  )

                                 id          title
0  8009aa9fb1bd7977f381d7f84afebb2e  emergency.txt
1  617ce69cd329921fdad91acf862e69dd      flood.txt


In [102]:
statement = """  
MERGE (d:__Document__ {id:value.id})  
SET d += value {.title}
"""  
batched_import(statement, doc_df) 

{'_contains_updates': True, 'labels_added': 3, 'nodes_created': 3, 'properties_set': 6}
3 rows in 0.17892837524414062 s.


3

In [103]:
text_df = pd.read_parquet(f'D:/GISERR/graduate_study/pytorch/graphrag/ragtest/output/create_final_text_units.parquet',  
                          columns=["id","text","n_tokens","document_ids"])  
text_df.head(2)  



Unnamed: 0,id,text,n_tokens,document_ids
0,990e23eb0b27c888fce4fa446cfdc2c8,"城市暴雨内涝综述：特征、机理、数据与方法\n黄华兵1,2,3\n，王先伟1,2,3\n，柳 ...",1200,[137a8b999a70d2d8969368ee2ab87297]
1,f18f9f23d9fd658e9ce600c56035755d,2 年 184 个，2013 年 234 个，而且在大江\n大河水势基本调控平稳的情况下 2...,1200,[137a8b999a70d2d8969368ee2ab87297]


In [104]:
statement = """  
MERGE (c:__Chunk__ {id:value.id})  
SET c += value {.text, .n_tokens}  
WITH c, value  
UNWIND value.document_ids AS document  
MATCH (d:__Document__ {id:document})  
MERGE (c)-[:PART_OF]->(d)  
"""  
batched_import(statement, text_df)  



{'_contains_updates': True, 'labels_added': 61, 'relationships_created': 61, 'nodes_created': 61, 'properties_set': 183}
61 rows in 0.3596787452697754 s.


61

In [105]:
entity_df = pd.read_parquet(f'D:/GISERR/graduate_study/pytorch/graphrag/ragtest/output/create_final_entities.parquet',  
                            columns=["name","type","description","human_readable_id","id","text_unit_ids"])  
entity_df.head(2)  



Unnamed: 0,name,type,description,human_readable_id,id,text_unit_ids
0,黄华兵,PERSON,Huang Huabing is the first author of the artic...,0,d94a9924432343c782cb3e85a8dd6403,"[8cd8d68773216fe3d60265af9b8405b9, 990e23eb0b2..."
1,王先伟,PERSON,Wang Xianwei is a co-author of the article tha...,1,fd82935686be4562ad30241efbbd3c48,"[990e23eb0b27c888fce4fa446cfdc2c8, f18f9f23d9f..."


In [106]:
entity_statement = """  
MERGE (e:__Entity__ {id:value.id})  
SET e += value {.human_readable_id, .description, name:replace(value.name,'"','')}  
WITH e, value  
  
UNWIND value.text_unit_ids AS text_unit  
MATCH (c:__Chunk__ {id:text_unit})  
MERGE (c)-[:HAS_ENTITY]->(e)  
"""  
batched_import(entity_statement, entity_df)  



{'_contains_updates': True, 'labels_added': 558, 'relationships_created': 699, 'nodes_created': 558, 'properties_set': 2232}
558 rows in 1.1159262657165527 s.


558

In [107]:
rel_df = pd.read_parquet(f'D:/GISERR/graduate_study/pytorch/graphrag/ragtest/output/create_final_relationships.parquet',  
                         columns=["source","target","id","rank","weight","human_readable_id","description","text_unit_ids"])  
rel_df.head(2)  



Unnamed: 0,source,target,id,rank,weight,human_readable_id,description,text_unit_ids
0,黄华兵,中山大学地理科学与规划学院,45ac7a6a1d9a449d8b1f019d2ad20121,10,8.0,0,黄华兵在中山大学地理科学与规划学院工作,[990e23eb0b27c888fce4fa446cfdc2c8]
1,黄华兵,广东省公共安全与灾害工程技术研究中心,584ab7150fb74552a40a266d51645ffa,9,8.0,1,黄华兵在广东省公共安全与灾害工程技术研究中心工作,[990e23eb0b27c888fce4fa446cfdc2c8]


In [108]:
rel_statement = """  
    MATCH (source:__Entity__ {name:replace(value.source,'"','')})  
    MATCH (target:__Entity__ {name:replace(value.target,'"','')})  
    // not necessary to merge on id as there is only one relationship per pair  
    MERGE (source)-[rel:RELATED {id: value.id}]->(target)  
    SET rel += value {.rank, .weight, .human_readable_id, .description, .text_unit_ids}  
    RETURN count(*) as createdRels  
"""  
batched_import(rel_statement, rel_df)


{'_contains_updates': True, 'relationships_created': 517, 'properties_set': 3102}
517 rows in 0.5728490352630615 s.


517

In [109]:
community_df = pd.read_parquet(f'D:/GISERR/graduate_study/pytorch/graphrag/ragtest/output/create_final_communities.parquet',  
                     columns=["id","level","title","text_unit_ids","relationship_ids"])  
community_df.head(2)  



Unnamed: 0,id,level,title,text_unit_ids,relationship_ids
0,10,0,Community 10,"[1741a115769bf8f5939be7ddabffd710, 1149517f738...","[8d3bbdef620148619d2496175f3c290a, 3e3478a4526..."
1,5,0,Community 5,"[7d88ce694c885e6ebcfc2aa4797648ad, 7d88ce694c8...","[0280c199824b45bb9e0a1fcf88fde693, 379fb0ac9fb..."


In [110]:
statement = """  
MERGE (c:__Community__ {community:value.id})  
SET c += value {.level, .title}  
/*  
UNWIND value.text_unit_ids as text_unit_id  
MATCH (t:__Chunk__ {id:text_unit_id})  
MERGE (c)-[:HAS_CHUNK]->(t)  
WITH distinct c, value  
*/  
WITH *  
UNWIND value.relationship_ids as rel_id  
MATCH (start:__Entity__)-[:RELATED {id:rel_id}]->(end:__Entity__)  
MERGE (start)-[:IN_COMMUNITY]->(c)  
MERGE (end)-[:IN_COMMUNITY]->(c)  
RETURN count(distinct c) as createdCommunities  
"""  
batched_import(statement, community_df)  



{'_contains_updates': True, 'labels_added': 41, 'relationships_created': 462, 'nodes_created': 41, 'properties_set': 123}
41 rows in 1.1223158836364746 s.


41