In [2]:
from neo4j import GraphDatabase


NEO4J_URI="bolt://localhost"
NEO4J_USERNAME="neo4j"
NEO4J_PASSWORD="Snowball2019"  # 这里替换成自己设置的密码
NEO4J_DATABASE="neo4j"         # 社区版仅能使用默认的neo4j数据库，不支持创建其他数据库

driver = GraphDatabase.driver(
    NEO4J_URI, 
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD)
    )

In [3]:
driver = GraphDatabase.driver(
    NEO4J_URI, 
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD)
    )

def test_connection():
    # 在这里创建会话
    with driver.session() as session:
        session.run("MATCH (n) RETURN n LIMIT 1")

try:
    test_connection()
    print("连接成功！")
except Exception as e:
    print("连接失败:", e)
finally:
    driver.close()  # 确保在所有操作完成后再关闭驱动程序

连接成功！


In [4]:
import time
import pandas as pd
import concurrent.futures
from neo4j import GraphDatabase


def parallel_batched_import(statement, df, batch_size=100, max_workers=8):
    """
    使用并行处理进行批量导入数据到Neo4j
    
    参数:
    - statement: Cypher查询语句，使用value作为每行数据的引用
    - df: 要导入的DataFrame
    - batch_size: 每批处理的行数
    - max_workers: 并行线程数
    
    返回:
    - 导入统计信息的字典
    """

    # 1. 初始化，计算总行数，批次数，并记录开始时间
    total = len(df)
    batches = (total + batch_size - 1) // batch_size  # 向上取整
    start_time = time.time()
    results = []
    
    print(f"开始并行导入 {total} 行数据，分为 {batches} 个批次，每批 {batch_size} 条")
    

    # 2. 定义批处理函数
    def process_batch(batch_idx):
        """
        批处理函数，用于处理每个批次的数据
        """

        # 1. 计算批次的起始和结束索引
        start = batch_idx * batch_size
        end = min(start + batch_size, total)
        batch = df.iloc[start:end]
        
        batch_start_time = time.time()
        
        try:
            with driver.session(database=NEO4J_DATABASE) as session:
                # NWIND 是 Cypher 查询语言中的一个关键字，用于将一个列表展开为多行。 $rows 是一个参数，表示将要传入的行数据
                # 完整意思是：将$rows参数（一个列表）中的每个元素展开，每个元素被赋值给变量value， 对每个value执行后续的Cypher语句
                #    id   name   age
                #    0    张三    25
                #    1    李四    30

                # 调用.to_dict("records")后，得到：
                # [
                #     {"id": 1, "name": "张三", "age": 25},
                #     {"id": 2, "name": "李四", "age": 30}
                # ]
                result = session.run(
                    "UNWIND $rows AS value " + statement,   
                    rows=batch.to_dict("records")
                )
                summary = result.consume()    # Neo4j 中用于处理查询结果的一个方法。它的主要作用是获取查询的摘要信息，包括执行统计、计数和其他相关信息
                batch_duration = time.time() - batch_start_time
                
                return {
                    "batch": batch_idx,
                    "rows": end - start,
                    "success": True,
                    "duration": batch_duration,
                    "counters": summary.counters   # summary.counters 是在执行 Cypher 查询后返回的统计信息
                }
        except Exception as e:
            batch_duration = time.time() - batch_start_time
            print(f"批次 {batch_idx} (行 {start}-{end-1}) 处理失败: {str(e)}")
            
            return {
                "batch": batch_idx,
                "rows": end - start,
                "success": False,
                "duration": batch_duration,
                "error": str(e)
            }
    
    # 使用线程池并行处理批次
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_batch, i) for i in range(batches)]
        
        # 处理完成的批次
        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            result = future.result()
            results.append(result)
            
            if result["success"]:
                print(f"批次 {result['batch']} 完成: {result['rows']} 行, 耗时 {result['duration']:.2f}秒")
            else:
                print(f"批次 {result['batch']} 失败: {result['rows']} 行, 耗时 {result['duration']:.2f}秒, 错误: {result.get('error', '未知错误')}")
            
            # 显示进度
            print(f"进度: {i+1}/{batches} 批次完成 ({((i+1)/batches*100):.1f}%)")
    
    # 统计结果
    successful_rows = sum(r["rows"] for r in results if r["success"])
    failed_rows = sum(r["rows"] for r in results if not r["success"])
    
    duration = time.time() - start_time
    rows_per_second = successful_rows / duration if duration > 0 else 0
    
    print(f"导入完成: 总计 {total} 行, 成功 {successful_rows} 行, 失败 {failed_rows} 行")
    print(f"总耗时: {duration:.2f}秒, 平均速度: {rows_per_second:.2f} 行/秒")
    
    return {
        "total_rows": total,
        "successful_rows": successful_rows,
        "failed_rows": failed_rows,
        "duration_seconds": duration,
        "rows_per_second": rows_per_second,
        "batch_results": results
    }

In [5]:
import pandas as pd
from tabulate import tabulate

df_documents = pd.read_parquet('../data/output/documents.parquet')  # 替换为实际路径

print(tabulate(df_documents, headers='keys', tablefmt='pretty', showindex=False, stralign='left', maxcolwidths=[20, 20, 20, 20, 20]))

+----------------------+-------------------+--------------------+----------------------+----------------------+---------------------------+----------+
| id                   | human_readable_id | title              | text                 | text_unit_ids        | creation_date             | metadata |
+----------------------+-------------------+--------------------+----------------------+----------------------+---------------------------+----------+
| 62107c728acaff87838b | 1                 | merged_reviews.csv | 非常满意这次购买，保 | ['6481820a1520a8b564 | 2025-03-20 12:34:12 +0800 |          |
| 33dd0602b27a2b7aa492 |                   |                    | 鲜功能出色，送货也很 | d5179d2b86ea6e448ffa |                           |          |
| 3343199e30c25c498c0f |                   |                    | 快。<ROW_SEP>使用了  | 9f90a132745fece1dc8b |                           |          |
| 9394476d9d2d35b792aa |                   |                    | 一个月，容量大，但是 | fb8ea3daa0c8c47891af |                  

In [4]:
import pandas as pd
from tabulate import tabulate

df_entities = pd.read_parquet('../data/output/text_units.parquet')  # 替换为实际路径

print(tabulate(df_entities, headers='keys', tablefmt='pretty', showindex=False, stralign='left', maxcolwidths=[20, 20, 20, 20, 20]))

+----------------------+----------------------+----------------------+----------+
| id                   | text                 | document_ids         | n_tokens |
+----------------------+----------------------+----------------------+----------+
| 6481820a1520a8b564d5 | 非常满意这次购买，保 | ['62107c728acaff8783 | 45       |
| 179d2b86ea6e448ffa9f | 鲜功能出色，送货也很 | 8b33dd0602b27a2b7aa4 |          |
| 90a132745fece1dc8bfb | 快。 使用了一个月，  | 923343199e30c25c498c |          |
| 8ea3daa0c8c47891af51 | 容量大，但是维修成本 | 0f9394476d9d2d35b792 |          |
| 9d90463daab9da36b4be | 高。                 | aa2f48893a4d325e6cab |          |
| a23ec40f99958b742e8c |                      | c25d20102ce69810c22d |          |
| e1a5c2ec             |                      | 8b3c577e25']         |          |
| f0f293939d6eee9c2700 | 智能冰箱的功能很全面 | ['62107c728acaff8783 | 33       |
| faa7a7868a7b5434280a | ，智能管理食材功能实 | 8b33dd0602b27a2b7aa4 |          |
| fb6f6c11cfcfa0f73bec | 用，适合家庭使用。   | 923343199e30c25c498c |          |
| 70

In [8]:
# 创建Document节点
def create_document_nodes(df_documents):
    # 首先创建唯一性约束
    with driver.session(database=NEO4J_DATABASE) as session:
        try:
            session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (d:__Document__) REQUIRE d.id IS UNIQUE")
        except Exception as e:
            print(f"创建约束时出错 (可能已存在): {e}")
    
    # 导入文档
    # MERGE与ON CREATE SET组合，是一种条件性属性设置方式：特点是：
    # 1. 只在节点首次创建时设置属性
    # 2. 如果节点已存在，不会修改现有属性
    # 适合初始导入场景，避免覆盖已有数据
    cypher_statement = """
    MERGE (d:__Document__ {id: value.id})
    ON CREATE SET 
        d.human_readable_id = value.human_readable_id,
        d.title = value.title,
        d.text = value.text,
        d.creation_date = value.creation_date,
        d.import_timestamp = timestamp()
    """
    
    return parallel_batched_import(cypher_statement, df_documents)

In [9]:
NEO4J_URI="bolt://localhost"
NEO4J_USERNAME="neo4j"
NEO4J_PASSWORD="Snowball2019"
NEO4J_DATABASE="neo4j"

driver = GraphDatabase.driver(
    NEO4J_URI, 
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD)
    )

# 执行 document.parquet 文件的导入
create_document_nodes(df_documents)

开始并行导入 5000 行数据，分为 50 个批次，每批 100 条
批次 0 完成: 100 行, 耗时 0.52秒
进度: 1/50 批次完成 (2.0%)
批次 8 完成: 100 行, 耗时 0.09秒
进度: 2/50 批次完成 (4.0%)
批次 9 完成: 100 行, 耗时 0.06秒
进度: 3/50 批次完成 (6.0%)
批次 10 完成: 100 行, 耗时 0.04秒
进度: 4/50 批次完成 (8.0%)
批次 11 完成: 100 行, 耗时 0.04秒
进度: 5/50 批次完成 (10.0%)
批次 12 完成: 100 行, 耗时 0.04秒
进度: 6/50 批次完成 (12.0%)
批次 13 完成: 100 行, 耗时 0.04秒
进度: 7/50 批次完成 (14.0%)
批次 14 完成: 100 行, 耗时 0.03秒
进度: 8/50 批次完成 (16.0%)
批次 15 完成: 100 行, 耗时 0.03秒
进度: 9/50 批次完成 (18.0%)
批次 16 完成: 100 行, 耗时 0.03秒
进度: 10/50 批次完成 (20.0%)
批次 17 完成: 100 行, 耗时 0.03秒
进度: 11/50 批次完成 (22.0%)
批次 18 完成: 100 行, 耗时 0.02秒
进度: 12/50 批次完成 (24.0%)
批次 19 完成: 100 行, 耗时 0.03秒
进度: 13/50 批次完成 (26.0%)
批次 20 完成: 100 行, 耗时 0.02秒
进度: 14/50 批次完成 (28.0%)
批次 21 完成: 100 行, 耗时 0.02秒
进度: 15/50 批次完成 (30.0%)
批次 22 完成: 100 行, 耗时 0.02秒
进度: 16/50 批次完成 (32.0%)
批次 23 完成: 100 行, 耗时 0.02秒
进度: 17/50 批次完成 (34.0%)
批次 24 完成: 100 行, 耗时 0.02秒
进度: 18/50 批次完成 (36.0%)
批次 25 完成: 100 行, 耗时 0.03秒
进度: 19/50 批次完成 (38.0%)
批次 26 完成: 100 行, 耗时 0.02秒
进度: 20/50 批次完成 (40.0%)
批

{'total_rows': 5000,
 'successful_rows': 5000,
 'failed_rows': 0,
 'duration_seconds': 2.109909772872925,
 'rows_per_second': 2369.7695817541194,
 'batch_results': [{'batch': 0,
   'rows': 100,
   'success': True,
   'duration': 0.5203673839569092,
   'counters': {'_contains_updates': True, 'labels_added': 100, 'nodes_created': 100, 'properties_set': 600}},
  {'batch': 8,
   'rows': 100,
   'success': True,
   'duration': 0.09414362907409668,
   'counters': {'_contains_updates': True, 'labels_added': 100, 'nodes_created': 100, 'properties_set': 600}},
  {'batch': 9,
   'rows': 100,
   'success': True,
   'duration': 0.05940580368041992,
   'counters': {'_contains_updates': True, 'labels_added': 100, 'nodes_created': 100, 'properties_set': 600}},
  {'batch': 10,
   'rows': 100,
   'success': True,
   'duration': 0.04066181182861328,
   'counters': {'_contains_updates': True, 'labels_added': 100, 'nodes_created': 100, 'properties_set': 600}},
  {'batch': 11,
   'rows': 100,
   'success':

In [12]:
import pandas as pd
from tabulate import tabulate

df_entities = pd.read_parquet('../data/output/entities.parquet')  # 替换为实际路径

print(tabulate(df_entities, headers='keys', tablefmt='pretty', showindex=False, stralign='left', maxcolwidths=[20, 20, 20, 20, 20]))

+----------------------+-------------------+----------------------+----------+----------------------+--------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+---+---+
| id                   | human_readable_id | title                | type     | description          | text_unit_ids                                                                                                                        | frequency | degree | x | y |
+----------------------+-------------------+----------------------+----------+----------------------+--------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+---+---+
| 2b8eb51f-f8af-4188-9 | 0                 | 良诺传媒有限公司电子 | CUSTOMER | 良诺传媒有限公司电子 | ['c66ccf6c63d7fc82b22070dd8569386db5259594f63a803d98026bd89213f0e354b02226762cf3c3524a825d87d005419b6250ae9009f57be548ca