## Neo4j Import of GraphRAG Result Parquet files
This notebook imports the results of the GraphRAG indexing process into the Neo4j Graph database for further processing, analysis or visualization.

### How does it work?
The notebook loads the parquet files from the output folder of your indexing process and loads them into Pandas dataframes. It then uses a batching approach to send a slice of the data into Neo4j to create nodes and relationships and add relevant properties. The id-arrays on most entities are turned into relationships.

All operations use `MERGE`, so they are idempotent, and you can run the script multiple times.

If you need to clean out the database, you can run the following statement
```
MATCH (n)
CALL { WITH n DETACH DELETE n } IN TRANSACTIONS OF 25000 ROWS;
```

In [None]:
GRAPHRAG_FOLDER="output/20241002-135325/artifacts"

In [13]:
import pandas as pd
from neo4j import GraphDatabase
import time

In [14]:
NEO4J_URI="bolt://localhost"
NEO4J_USERNAME="neo4j"
NEO4J_PASSWORD="siat-mic"
NEO4J_DATABASE="pskdemo"

driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))

In [15]:
# def batched_import(statement, df, batch_size=1000):
#     """
#     Import a dataframe into Neo4j using a batched approach.
#     Parameters: statement is the Cypher query to execute, df is the dataframe to import, and batch_size is the number of rows to import in each batch.
#     """
#     total = len(df)
#     start_s = time.time()
#     for start in range(0,total, batch_size):
#         batch = df.iloc[start: min(start+batch_size,total)]
#         result = driver.execute_query("UNWIND $rows AS value " + statement, 
#                                       rows=batch.to_dict('records'),
#                                       database_=NEO4J_DATABASE)
#         print(result.summary.counters)
#     print(f'{total} rows in { time.time() - start_s} s.')    
#     return total

def batched_import(statement, df, batch_size=1000):
    total = len(df)
    start_s = time.time()
    successful_batches = 0
    for start in range(0, total, batch_size):
        batch = df.iloc[start: min(start+batch_size, total)]
        try:
            result = driver.execute_query("UNWIND $rows AS value " + statement,
                                          rows=batch.to_dict('records'),
                                          database_=NEO4J_DATABASE)
            successful_batches += 1
            print(f"Batch {successful_batches} processed, counters: {result.summary.counters}")
        except:
            print(f"Error processing batch starting at row {start}:")
            # 根据实际需要执行的降级或者 校正逻辑
    print(f'{total} rows processed in {time.time() - start_s} seconds.')    
    return total

### Indexes and Constraints
Indexes in Neo4j are only used to find the starting points for graph queries, e.g. quickly finding two nodes to connect. Constraints exist to avoid duplicates, we create them mostly on id's of Entity types.

We use some Types as markers with two underscores before and after to distinguish them from the actual entity types.

The default relationship type here is `RELATED` but we could also infer a real relationship-type from the description or the types of the start and end-nodes.

* `__Entity__`
* `__Document__`
* `__Chunk__`
* `__Community__`
* `__Covariate__`

In [16]:
# create constraints, idempotent operation

statements = """
create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique;
create constraint document_id if not exists for (d:__Document__) require d.id is unique;
create constraint entity_id if not exists for (c:__Community__) require c.community is unique;
create constraint entity_id if not exists for (e:__Entity__) require e.id is unique;
create constraint entity_title if not exists for (e:__Entity__) require e.name is unique;
create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique;
create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique;
""".split(";")

for statement in statements:
    if len((statement or "").strip()) > 0:
        print(statement)
        driver.execute_query(statement)


create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique

create constraint document_id if not exists for (d:__Document__) require d.id is unique

create constraint entity_id if not exists for (c:__Community__) require c.community is unique

create constraint entity_id if not exists for (e:__Entity__) require e.id is unique

create constraint entity_title if not exists for (e:__Entity__) require e.name is unique

create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique

create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique


## Import Process
### Importing the Documents
We're loading the parquet file for the documents and create nodes with their ids and add the title property. We don't need to store text_unit_ids as we can create the relationships and the text content is also contained in the chunks.

In [17]:
doc_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_documents.parquet', columns=["id", "title"])
doc_df.head(2)

Unnamed: 0,id,title
0,501b15313a5b7a866056af4b22ac5fc0,10.1002_adfm.201705094.md
1,a50ab37c5b0b5a68fecd1a8f2a80302e,10.1002_adma.201804547.md


In [18]:
# import documents
statement = """
MERGE (d:__Document__ {id:value.id})
SET d += value {.title}
"""

batched_import(statement, doc_df)

Batch 1 processed, counters: {'_contains_updates': True, 'labels_added': 102, 'nodes_created': 102, 'properties_set': 204}
102 rows processed in 0.33764171600341797 seconds.


102

### Loading Text Units
We load the text units, create a node per id and set the text and number of tokens. Then we connect them to the documents that we created before.

In [19]:
text_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_text_units.parquet',
                          columns=["id","text","n_tokens","document_ids"])
text_df.head(2)

Unnamed: 0,id,text,n_tokens,document_ids
0,bbce30e7dc572bce177cba82bc6037c0,# Strong Self-Trapped Exciton Emission and Hig...,2000,[041bf644e7ec23fa052505c1edb4fac8]
1,36fb9bc4eec29c1edde2cce78e0da6e5,the STE states and then to ${^2}\mathrm{F}_{...,2000,[041bf644e7ec23fa052505c1edb4fac8]


In [20]:
statement = """
MERGE (c:__Chunk__ {id:value.id})
SET c += value {.text, .n_tokens}
WITH c, value
UNWIND value.document_ids AS document
MATCH (d:__Document__ {id:document})
MERGE (c)-[:PART_OF]->(d)
"""

batched_import(statement, text_df)

Batch 1 processed, counters: {'_contains_updates': True, 'labels_added': 968, 'relationships_created': 968, 'nodes_created': 968, 'properties_set': 2904}
968 rows processed in 1.003819227218628 seconds.


968

### Loading Nodes
For the nodes we store id, name, description, embedding (if available), human readable id.

In [21]:
entity_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_entities.parquet',
                            columns=["name","type","description","human_readable_id","id","description_embedding","text_unit_ids"])

# 去重，根据特定列，假设 'id' 是唯一的
entity_df = entity_df.drop_duplicates(subset=['id'])

# 检查 'name' 是否满足唯一约束，去重处理
if 'name' in entity_df.columns:
    entity_df = entity_df.drop_duplicates(subset=['name'])

entity_df.head(2)

Unnamed: 0,name,type,description,human_readable_id,id,description_embedding,text_unit_ids
0,CS2AGINCL6:SB,FINAL SYNTHESIZED PRODUCT,Cesium silver indium chloride doped with antim...,0,039e68fa67444c90aefca8cfbe210b8b,"[0.01675415, -0.009048462, -0.02154541, 0.0342...",[a3e265300541958c6b4b8ccd864688b5]
1,"CS2AGINCL6:SB,YB",FINAL SYNTHESIZED PRODUCT,Cesium silver indium chloride co-doped with an...,1,550131f8da414dc8b37cd532e31bc8b0,"[0.02394104, -0.0068206787, -0.021896362, 0.03...",[a3e265300541958c6b4b8ccd864688b5]


In [22]:
# entity_statement = """
# MERGE (e:__Entity__ {id:value.id})
# SET e += value {.human_readable_id, .description, name:replace(value.name,'"','')}
# WITH e, value
# CALL db.create.setNodeVectorProperty(e, "description_embedding", value.description_embedding)
# CALL apoc.create.addLabels(e, case when coalesce(value.type,"") = "" then [] else [apoc.text.upperCamelCase(replace(value.type,'"',''))] end) yield node
# UNWIND value.text_unit_ids AS text_unit
# MATCH (c:__Chunk__ {id:text_unit})
# MERGE (c)-[:HAS_ENTITY]->(e)
# """

# batched_import(entity_statement, entity_df)

entity_statement = """
MERGE (e:__Entity__ {id:value.id})
ON CREATE SET e.name = replace(value.name,'"',''),
              e.human_readable_id = value.human_readable_id,
              e.description = value.description
ON MATCH SET e.human_readable_id = value.human_readable_id,
              e.description = value.description
WITH e, value
CALL db.create.setNodeVectorProperty(e, "description_embedding", value.description_embedding)
WITH e, value
CALL apoc.create.addLabels(e, 
    CASE 
        WHEN coalesce(value.type, "") = "" THEN [] 
        ELSE [apoc.text.upperCamelCase(replace(value.type, '"', ''))] 
    END) YIELD node
UNWIND value.text_unit_ids AS text_unit
MATCH (c:__Chunk__ {id:text_unit})
MERGE (c)-[:HAS_ENTITY]->(e)
"""

batched_import(entity_statement, entity_df)

Batch 1 processed, counters: {'_contains_updates': True, 'labels_added': 1000, 'relationships_created': 2536, 'nodes_created': 1000, 'properties_set': 4000}
Batch 2 processed, counters: {'_contains_updates': True, 'labels_added': 1000, 'relationships_created': 1479, 'nodes_created': 1000, 'properties_set': 4000}
Batch 3 processed, counters: {'_contains_updates': True, 'labels_added': 1000, 'relationships_created': 1196, 'nodes_created': 1000, 'properties_set': 4000}
Batch 4 processed, counters: {'_contains_updates': True, 'labels_added': 1000, 'relationships_created': 1096, 'nodes_created': 1000, 'properties_set': 4000}
Batch 5 processed, counters: {'_contains_updates': True, 'labels_added': 923, 'relationships_created': 974, 'nodes_created': 923, 'properties_set': 3692}
4923 rows processed in 25.526809453964233 seconds.


4923

In [23]:
# entity_statement = """
# MERGE (e:__Entity__ {id:value.id})
# SET e.human_readable_id = value.human_readable_id,
#     e.description = value.description,
#     e.name = replace(value.name,'"','')
# WITH e, value
# CALL db.create.setNodeVectorProperty(e, "description_embedding", value.description_embedding)
# WITH e, value
# CALL apoc.create.addLabels(e, 
#     CASE 
#         WHEN coalesce(value.type, "") = "" THEN [] 
#         ELSE [apoc.text.upperCamelCase(replace(value.type, '"', ''))] 
#     END) YIELD node
# UNWIND value.text_unit_ids AS text_unit
# MATCH (c:__Chunk__ {id:text_unit})
# MERGE (c)-[:HAS_ENTITY]->(e)
# """

# batched_import(entity_statement, entity_df)

### Import Relationships
For the relationships we find the source and target node by name, using the base `__Entity__` type. After creating the RELATED relationships, we set the description as attribute.

In [24]:
rel_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_relationships.parquet',
                         columns=["source","target","id","rank","weight","human_readable_id","description","text_unit_ids"])
rel_df.head(2)

Unnamed: 0,source,target,id,rank,weight,human_readable_id,description,text_unit_ids
0,CS2AGINCL6:SB,WARM WHITE-LED,6508b3fe0522487da99b64fc2726d1a7,15,5.0,0,Cs2AgInCl6:Sb is used to create warm white-LEDs,[a3e265300541958c6b4b8ccd864688b5]
1,CS2AGINCL6:SB,UV CHIP,55a84a0c0471424fa430fcf8331f0010,10,4.0,1,UV chip is used to pump Cs2AgInCl6:Sb,[a3e265300541958c6b4b8ccd864688b5]


In [25]:
rel_statement = """
    MATCH (source:__Entity__ {name:replace(value.source,'"','')})
    MATCH (target:__Entity__ {name:replace(value.target,'"','')})
    // not necessary to merge on id as there is only one relationship per pair
    MERGE (source)-[rel:RELATED {id: value.id}]->(target)
    SET rel += value {.rank, .weight, .human_readable_id, .description, .text_unit_ids}
    RETURN count(*) as createdRels
"""

batched_import(rel_statement, rel_df)

Batch 1 processed, counters: {'_contains_updates': True, 'relationships_created': 1000, 'properties_set': 6000}
Batch 2 processed, counters: {'_contains_updates': True, 'relationships_created': 1000, 'properties_set': 6000}
Batch 3 processed, counters: {'_contains_updates': True, 'relationships_created': 1000, 'properties_set': 6000}
Batch 4 processed, counters: {'_contains_updates': True, 'relationships_created': 467, 'properties_set': 2802}
3467 rows processed in 1.0521461963653564 seconds.


3467

### Importing Communities
For communities we import their id, title, level. We connect the `__Community__` nodes to the start and end nodes of the relationships they refer to.

Connecting them to the chunks they orignate from is optional, as the entites are already connected to the chunks.

In [26]:
community_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_communities.parquet', 
                     columns=["id","level","title","text_unit_ids","relationship_ids"])

community_df.head(2)

Unnamed: 0,id,level,title,text_unit_ids,relationship_ids
0,11,0,Community 11,"[a3e265300541958c6b4b8ccd864688b5, 133f0e027ee...","[6508b3fe0522487da99b64fc2726d1a7, 55a84a0c047..."
1,4,0,Community 4,"[59bf230c16c9b748130643fa41c6cb7d,a3e265300541...","[162830008a1e4d91a122539aa73c8d65, 66d02d4cec0..."


In [27]:
statement = """
MERGE (c:__Community__ {community:value.id})
SET c += value {.level, .title}
/*
UNWIND value.text_unit_ids as text_unit_id
MATCH (t:__Chunk__ {id:text_unit_id})
MERGE (c)-[:HAS_CHUNK]->(t)
WITH distinct c, value
*/
WITH *
UNWIND value.relationship_ids as rel_id
MATCH (start:__Entity__)-[:RELATED {id:rel_id}]->(end:__Entity__)
MERGE (start)-[:IN_COMMUNITY]->(c)
MERGE (end)-[:IN_COMMUNITY]->(c)
RETURN count(distinct c) as createdCommunities
"""

batched_import(statement, community_df)

Batch 1 processed, counters: {'_contains_updates': True, 'labels_added': 162, 'relationships_created': 4966, 'nodes_created': 162, 'properties_set': 486}
162 rows processed in 10.953670978546143 seconds.


162

### Importing Community Reports
Fo the community reports we create nodes for each communitiy set the id, community, level, title, summary, rank, and rank_explanation and connect them to the entities they are about. For the findings we create the findings in context of the communities.

In [28]:
community_report_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_community_reports.parquet',
                               columns=["id","community","level","title","summary", "findings","rank","rank_explanation","full_content"])
community_report_df.head(2)

Unnamed: 0,id,community,level,title,summary,findings,rank,rank_explanation,full_content
0,3edb7330-f454-41bf-8dfa-cb7fa97334d2,100,1,Natural Light and Visible Camera Interaction,This community focuses on the interaction betw...,[{'explanation': 'The community has identified...,4.5,The rating reflects the moderate relevance of ...,# Natural Light and Visible Camera Interaction...
1,b76236e6-e529-4e9f-a7f9-ead069b759cd,101,1,SNO Transistor Synthesis and Performance Analysis,This community is centered around the synthesi...,[{'explanation': 'Silicon dioxide (SiO2) plays...,9.2,The high rating is due to the comprehensive na...,# SNO Transistor Synthesis and Performance Ana...


In [29]:
# import communities
community_statement = """
MERGE (c:__Community__ {community:value.community})
SET c += value {.level, .title, .rank, .rank_explanation, .full_content, .summary}
WITH c, value
UNWIND range(0, size(value.findings)-1) AS finding_idx
WITH c, value, finding_idx, value.findings[finding_idx] as finding
MERGE (c)-[:HAS_FINDING]->(f:Finding {id:finding_idx})
SET f += finding
"""
batched_import(community_statement, community_report_df)

Batch 1 processed, counters: {'_contains_updates': True, 'labels_added': 830, 'relationships_created': 830, 'nodes_created': 830, 'properties_set': 3408}
153 rows processed in 0.311781644821167 seconds.


153

### Importing Covariates
Covariates are for instance claims on entities, we connect them to the chunks where they originate from.

**By default, covariates are not included in the output, so the file might not exists in your output if you didn't set the configuration to extract claims**

In [30]:
"""
# cov_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_covariates.parquet')
# cov_df.head(2)
"""

"\n# cov_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_covariates.parquet')\n# cov_df.head(2)\n"

In [31]:
cov_statement = """
MERGE (c:__Covariate__ {id:value.id})
SET c += apoc.map.clean(value, ["text_unit_id", "document_ids", "n_tokens"], [NULL, ""])
WITH c, value
MATCH (ch:__Chunk__ {id: value.text_unit_id})
MERGE (ch)-[:HAS_COVARIATE]->(c)
"""
# batched_import(cov_statement, cov_df)