In [1]:
!pwd

/home/zxd/code/Chat/GraphragDemo


In [1]:
! ls 

api_test.py			    kg_visualized.ipynb  requirements.txt
demo				    kg_visualized.py	 settings.yaml
global_search.ipynb		    local_search.ipynb	 temp
global_search.py		    local_search.py	 template
grapg_api.py			    neo4j_test.py	 visualize-graphml.py
graphrag_import_neo4j_cypher.ipynb  neo4j_visualized.py  yfiles_graphs.py
graph-visualization.ipynb	    other
kg_3d.py			    README.md


In [6]:
# 导入相关的包
import time
import pandas as pd
from neo4j import GraphDatabase# 指定Parquet文件路径
GRAPHRAG_FOLDER = "/home/zxd/code/Chat/GraphragDemo/demo/test/output/artifacts"

# 数据库连接相关参数配置
NEO4J_URI = "bolt://192.168.0.245:7687"
NEO4J_USERNAME = "neo4j"
NEO4J_PASSWORD = "12345678"
NEO4J_DATABASE = "neo4j"

In [7]:
# 实例化一个图数据库实例
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))


In [9]:
# 在图数据库中创建约束  初始化
statements = """
create constraint document_id if not exists for (d:__Document__) require d.id is unique;
create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique;
create constraint entity_id if not exists for (e:__Entity__) require e.id is unique;
create constraint entity_title if not exists for (e:__Entity__) require e.name is unique;
create constraint entity_id if not exists for (c:__Community__) require c.community is unique;
create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique;
create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique;
""".split(";")
for statement in statements:
    if len((statement or "").strip()) > 0:
        print(statement)
        driver.execute_query(statement)


create constraint document_id if not exists for (d:__Document__) require d.id is unique

create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique

create constraint entity_id if not exists for (e:__Entity__) require e.id is unique

create constraint entity_title if not exists for (e:__Entity__) require e.name is unique

create constraint entity_id if not exists for (c:__Community__) require c.community is unique

create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique

create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique


In [14]:
# 使用批处理方式将数据帧导入Neo4j
# 参数：statement 是要执行的 Cypher 查询，df 是要导入的数据帧，batch_size 是每批要导入的行数
def batched_import(statement, df, batch_size=1000):
    # 计算数据帧df中的总行数，并将其存储在total变量中
    total = len(df)
    # 记录当前时间，以便后续计算导入操作所花费的总时间
    start_s = time.time()
    # 每次循环处理一批数据，步数为batch_size
    for start in range(0, total, batch_size):
        # 使用Pandas的iloc方法提取当前批次的数据子集
        # start是当前批次的起始行号
        # min(start + batch_size, total)是当前批次的结束行号，确保不会超过总行数
        batch = df.iloc[start: min(start + batch_size, total)]
        # "UNWIND $rows AS value "是Cypher中的一个操作，它将 $row中的每个元素逐个解包，并作为value传递给Cypher语句statement
        result = driver.execute_query("UNWIND $rows AS value " + statement,
                                      # 将当前批次的 DataFrame 转换为字典的列表
                                      # 每一行数据变成一个字典，columns 作为键
                                      rows=batch.to_dict('records'),
                                      database_=NEO4J_DATABASE)
        # 打印执行结果的摘要统计信息，包括创建的节点、关系等计数
        print(f'执行结果的摘要统计信息，包括创建的节点、关系等计数: {result.summary.counters}')
    # 计算并打印导入总行数和耗时
    print(f'{total} rows in {time.time() - start_s} s.')
    # 返回导入的总行数
    return total


In [11]:
# 按顺序依次执行如下步骤

# 1、创建或更新documents
# 从指定的 Parquet 文件 create_final_documents.parquet 中读取 id、title 和 raw_content 这三列
# 并将它们加载到一个名为 doc_df 的 Pandas 数据帧中。这个数据帧可以进一步用于数据处理、分析或导入操作
doc_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_documents.parquet', columns=["id", "title", "raw_content"])
# # 打印输出数据帧 doc_df 的前 30 行内容
# print(doc_df.head(30))
# MERGE (d:__Document__ {id:value.id})尝试在数据库中找到一个具有 id 属性值为 value.id 的 __Document__ 节点
# 如果找到，则匹配这个节点；如果找不到，则创建一个新的 __Document__ 节点，并将 id 属性设置为 value.id
# SET d += value {.title, .raw_content}将外部 value 对象的 title 属性值赋给节点 d 的 title 属性
# 如果 d 节点已经存在 title 属性，它将被更新为新值；如果 d 节点没有 title 属性，则会新建一个
statement = """
MERGE (d:__Document__ {id:value.id})
SET d += value {.title, .raw_content}
"""
total = batched_import(statement, doc_df)
print("返回的结果：", total)

{'_contains_updates': True, 'labels_added': 2, 'nodes_created': 2, 'properties_set': 6}
2 rows in 0.06768298149108887 s.
返回的结果： 2


In [12]:
# 2、创建或更新chunks与documents之间的关系
# 从指定的 Parquet 文件 create_final_text_units.parquet 中读取列
# 并将它们加载到一个名为 text_df 的 Pandas 数据帧中。这个数据帧可以进一步用于数据处理、分析或导入操作
text_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_text_units.parquet',
                          columns=["id", "text", "n_tokens", "document_ids", "entity_ids", "relationship_ids",
                                   "covariate_ids"])
# 打印输出数据帧 text_df 的前 30 行内容
print(text_df.head(30))

                                 id  \
0  96b5175e26a388cf1d740d7bcf8ef400   
1  5e4e3114ee75f88c1004f89f68145f8b   
2  56f04fab27a00a8ecbc7a35e3e44dfb2   
3  54009af597c936daf9d211f69d1d89bf   

                                                text  n_tokens  \
0  \n广州众易用智能科技有限公司，亦称为众易用AI，是一家定位于“AI行业+应用解决方案专家”...      1086   
1  广州众易用智能科技有限公司 Guangzhou Zhongyiyong Intelligen...      1200   
2  多宝阁、虚拟展厅、移动端藏品交互等多种形式。合作单位包括广东省工艺美术珍品馆、广东民间工艺博...       897   
3  �目RGB 动捕l 光谱视觉与高通量检测l AI 数字人l AI 教育培训l AI 模型工具...      1200   

                         document_ids  \
0  [885f185e5c2ef4809265808340501996]   
1  [9d6382cd85267b4d441c0cf6e2635aba]   
2  [9d6382cd85267b4d441c0cf6e2635aba]   
3  [9d6382cd85267b4d441c0cf6e2635aba]   

                                          entity_ids  \
0  [b45241d70f0e43fca764df95b2b81f77, 4119fd06010...   
1  [b45241d70f0e43fca764df95b2b81f77, d3835bf3dda...   
2  [4119fd06010c494caa07f439b333f4c5, 1745a2485a9...   
3  [27f9fbe6ad8c4a8b9acee0d3596ed57c, 94a96

In [13]:
# MERGE (c:__Chunk__ {id:value.id})尝试匹配或创建一个节点。如果具有指定属性的节点存在，则返回该节点；否则，创建一个新的节点
# SET c += value {.text, .n_tokens}从 value 对象中提取属性，并将它们赋值给节点 c 的同名属性
# WITH c, value用于将当前查询上下文中的变量传递给接下来的查询部分。在这里，c 和 value 被传递到下一步的查询中
# UNWIND value.document_ids AS document将列表 value.document_ids 中的每个元素依次展开为单独的记录,并将每个元素命名为 document，进行单独处理
# MATCH (d:__Document__ {id:document})查找 __Document__ 标签的节点，并且 id 属性值等于 document
# MERGE (c)-[:PART_OF]->(d)在__Chunk__节点与 __Document__ 节点之间创建一个 PART_OF 类型的关系。如果关系已经存在，则不创建新的关系，表示 c 是 d 的一部分
statement = """
MERGE (c:__Chunk__ {id:value.id})
SET c += value {.text, .n_tokens}
WITH c, value
UNWIND value.document_ids AS document
MATCH (d:__Document__ {id:document})
MERGE (c)-[:PART_OF]->(d)
"""
batched_import(statement, text_df)

{'_contains_updates': True, 'labels_added': 4, 'relationships_created': 4, 'nodes_created': 4, 'properties_set': 12}
4 rows in 0.06248807907104492 s.


4

In [16]:
# 3、创建或更新entities与chunks之间的关系
# 从指定的 Parquet 文件 create_final_entities.parquet 中读取列
# 并将它们加载到一个名为 entity_df 的 Pandas 数据帧中。这个数据帧可以进一步用于数据处理、分析或导入操作
entity_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_entities.parquet',
                            columns=["name", "type", "description", "human_readable_id", "id", "description_embedding",
                                     "text_unit_ids"])
# 打印输出数据帧 entity_df 的前 30 行内容
print(entity_df.head(30))
# MERGE (e:__Entity__ {id:value.id})尝试匹配或创建一个节点。如果具有指定属性的节点存在，则返回该节点；否则，创建一个新的节点
# SET e += value {.name, .type, .description, .human_readable_id, .id, .description_embedding, .text_unit_ids}从 value 对象中提取属性，并将它们赋值给节点 e 的同名属性
# WITH e, value用于将当前查询上下文中的变量传递给接下来的查询部分。在这里，e 和 value 被传递到下一步的查询中
# CALL db.create.setNodeVectorProperty(e, "description_embedding", value.description_embedding)调用 Neo4j 中的自定义过程，设置 e 节点的 description_embedding 属性,将 value.description_embedding 的值作为向量属性存储在 e 节点的 description_embedding 属性中
# CALL apoc.create.addLabels()使用 APOC 库中的方法为节点 e 添加标签,根据 value.type 的值决定要添加的标签即将entity的类型均设置为标签
# UNWIND value.text_unit_ids AS text_unit将列表 value.text_unit_ids 中的每个元素依次展开为单独的记录,并将每个元素命名为 text_unit，进行单独处理
# MATCH (c:__Chunk__ {id:text_unit})查找 __Chunk__ 标签的节点，并且 id 属性值等于 text_unit
# MERGE (c)-[:HAS_ENTITY]->(e)在__Chunk__节点与 __Entity__ 节点之间创建一个 HAS_ENTITY 类型的关系。如果关系已经存在，则不创建新的关系，表示表示该文本块包含该实体
entity_statement = """
MERGE (e:__Entity__ {id:value.id})
SET e += value {.name, .type, .description, .human_readable_id, .id, .description_embedding, .text_unit_ids}
WITH e, value
CALL db.create.setNodeVectorProperty(e, "description_embedding", value.description_embedding)
CALL apoc.create.addLabels(e, case when coalesce(value.type,"") = "" then [] else [apoc.text.upperCamelCase(replace(value.type,'"',''))] end) yield node
UNWIND value.text_unit_ids AS text_unit
MATCH (c:__Chunk__ {id:text_unit})
MERGE (c)-[:HAS_ENTITY]->(e)
"""
batched_import(entity_statement, entity_df)

             name          type  \
0   广州众易用智能科技有限公司  ORGANIZATION   
1           众易用AI  ORGANIZATION   
2             黄志青        PERSON   
3       中山大学南昌研究院           GEO   
4      暨南大学新闻传播学院           GEO   
5           985高校           GEO   
6              网易  ORGANIZATION   
7              腾讯  ORGANIZATION   
8            动漫游戏           GEO   
9            教育培训           GEO   
10           室内设计           GEO   
11          电力新能源           GEO   
12             文博           GEO   
13             医疗           GEO   
14           智能检测           GEO   
15           发明专利  ORGANIZATION   
16          软件著作权  ORGANIZATION   
17          作品著作权  ORGANIZATION   
18           商标注册  ORGANIZATION   
19       人工智能软件开发  ORGANIZATION   
20           技术服务  ORGANIZATION   
21         数据处理服务  ORGANIZATION   
22       信息系统集成服务  ORGANIZATION   
23         网络技术服务  ORGANIZATION   
24      光学仪器制造与销售  ORGANIZATION   
25      机械设备研发与销售  ORGANIZATION   
26           广告设计  ORGANIZATION   
27         动漫游戏开发  O

ConstraintError: {code: Neo.ClientError.Schema.ConstraintValidationFailed} {message: Node(151) already exists with label `__Entity__` and property `name` = '广州众易用智能科技有限公司'}