# Neo4j Data Loading w/ Drivers

Quick example of best practice principles for loading data into Neo4j with Drivers.

We will be using csv tables from the `data/` folder to create a graph with the follwing schema

![data model](img/data-model.png)

In [1]:
%%capture
%pip install pandas neo4j python-dotenv tqdm

## Naive First Try (This Will Be Super Slow )
As a first try we will attempt a naive approach of merging node and relationship records row by row.

In [2]:
from dotenv import load_dotenv
import os

env_file = '.env'
if os.path.exists(env_file):
    load_dotenv(env_file, override=True)

    HOST = os.getenv('NEO4J_URI')
    USERNAME = os.getenv('NEO4J_USERNAME')
    PASSWORD = os.getenv('NEO4J_PASSWORD')

else:
    print(f"File {env_file} not found.")

In [3]:
from neo4j import Query, GraphDatabase, RoutingControl, Result

driver = GraphDatabase.driver(HOST, auth=(USERNAME, PASSWORD))

read article records

In [4]:
import csv

articles = []
with open('data/article.csv', mode="r", encoding="utf-8") as file:
    reader = csv.DictReader(file)
    for row in reader:
        articles.append(row)
articles[:3]

[{'articleId': '108775015',
  'productCode': '108775',
  'departmentNo': '1676',
  'prodName': 'Strap top',
  'productTypeName': 'Vest top',
  'graphicalAppearanceNo': '1010016',
  'graphicalAppearanceName': 'Solid',
  'colourGroupCode': '9',
  'colourGroupName': 'Black'},
 {'articleId': '108775044',
  'productCode': '108775',
  'departmentNo': '1676',
  'prodName': 'Strap top',
  'productTypeName': 'Vest top',
  'graphicalAppearanceNo': '1010016',
  'graphicalAppearanceName': 'Solid',
  'colourGroupCode': '10',
  'colourGroupName': 'White'},
 {'articleId': '110065001',
  'productCode': '110065',
  'departmentNo': '1339',
  'prodName': 'OP T-shirt (Idro)',
  'productTypeName': 'Bra',
  'graphicalAppearanceNo': '1010016',
  'graphicalAppearanceName': 'Solid',
  'colourGroupCode': '9',
  'colourGroupName': 'Black'}]

Attempt to Create Article Nodes

In [5]:
from tqdm import tqdm

query = """
   CREATE (a:Article {articleId: $articleId})
   SET a.graphicalAppearanceNo = $graphicalAppearanceNo,
       a.graphicalAppearanceName = $graphicalAppearanceName,
       a.colourGroupCode = $colourGroupCode,
       a.colourGroupName = $colourGroupName
"""

with driver.session() as session:
    for row in tqdm(articles):
        session.run(query,
                articleId=row["articleId"],
                graphicalAppearanceNo=row["graphicalAppearanceNo"],
                graphicalAppearanceName=row["graphicalAppearanceName"],
                colourGroupCode=row["colourGroupCode"],
                colourGroupName=row["colourGroupName"]
            )
print(f"Loaded {len(articles)} articles into Neo4j")


  5%|▌         | 691/13351 [00:55<16:55, 12.46it/s]


KeyboardInterrupt: 

Attempt to Create VARIANT_OF Relationships

In [6]:
#Similarly for writing relationships
from tqdm import tqdm

query = """
   MERGE (a:Article {articleId: $articleId}) //merge will create if doesn't exist
   MERGE (p:Product {productCode: $productCode})
   CREATE (a)-[r:VARIANT_OF]->(p)
"""

with driver.session() as session:
    for row in tqdm(articles):
        session.run(query,
                articleId=row["articleId"],
                productCode=row["productCode"],
            )
print(f"Loaded {len(articles)} VARIANT_OF relationships into Neo4j")

 10%|█         | 1358/13351 [01:40<14:49, 13.48it/s]


KeyboardInterrupt: 

The above takes forever for two reasons, common pitfalls:
1. Records are not batched into transactions - sending only one record at a time with a transaction for each
2. There are no Node constraints or indexes meaning which will make MERGE and MATCH statements very inefficient

Below we show how to easily optimize this so you can load all the records in `/data` in a matter of seconds

### Cleanup
Lets remove the data we just loaded and try again with an optimized ingest

In [7]:
# remove all data
delete_chunk_size = 500
with driver.session() as session:
    # Delete all nodes and relationships
    session.run(f'''
    MATCH (n)
    CALL (n){{
      DETACH DELETE n
    }} IN TRANSACTIONS OF {delete_chunk_size} ROWS;
    ''')

driver.execute_query("CALL apoc.schema.assert({},{},true) YIELD label, key RETURN *")

EagerResult(records=[], summary=<neo4j._work.summary.ResultSummary object at 0x14b6fb850>, keys=['key', 'label'])

## Optimizing Neo4j Ingests

### Understanding the types of queries we will run

We will use `MERGE` patterns to load both node and relationship data. This has the benefit of being less error prone and also *idempotent* - meaning that no matter how many times these commands are executed, the resulting data in the graph will remain the same, without creating duplicate nodes, relationships, or altering existing data unnecessarily.

Specifically:

__for nodes:__

```cypher
UNWIND $records AS rec
MERGE(n:<Label> {<id>: rec.id}
SET n += rec.properties
```

__for relationships:__

```cypher
UNWIND $records AS rec
MERGE(n1:<StartNodeLabel> {<StartNodeId>: rec.start_node_id})
MERGE(n2:<EndNodeLabel> {<EndNodeId>: rec.end_node_id})
MERGE(n1)-[<RelRType>]->(n2)
SET n += rec.properties
```

__for parallel relationships:__
Where multiple relationships of the same type can exist between the same two start and end nodes:

```cypher
UNWIND $records AS rec
MERGE(n1:<StartNodeLabel> {<StartNodeId>: rec.start_node_id})
MERGE(n2:<EndNodeLabel> {<EndNodeId>: rec.end_node_id})
MERGE(n1)-[<RelRType> {<relKeyProp>: rec.rel_key}]->(n2)
SET n += rec.properties
```

This ingest makes some assumptions

1. __Grouping Nodes and Relationship Records by Type:__ `MERGE(n:<Label> {<id>: rec.id}` and similar merge statements in relationships assume single node labels and relationships at a time. Constructing queries this way enables us to batch effectively (covered below) while also ensure efficient use of the Neo4j query planner so ingestion is fast.
2. __Unique Id Keys for Every Node:__ `MERGE(n:<Label> {<id>: rec.id}` and similar merge statements in relationships assume a unique id property for every node.  These will only run efficiently if we first set Node constraints which is shown below
3. __Batching:__ `UNWIND $records AS rec` Assumes we are batching records into a records parameter to send to each query.  This is done for efficiency

Lets set this up in the below and then run the resulting code


### Step 1: Grouping Nodes and Relationships by Type
Your going to want to decide on your graph data model prior to loading data, then organize your data into node and relationship groups of records based on node labels and relationship type + start/end node labels respectively.

To best fit the query patterns above we will use the following data structures:

__for nodes:__
Map each node record into the below format

```
{
    "id": <node_id_field>,
    "properties": {"prop1":<val1>, "prop2":<val2>,...}
}
```

__for relationships (non-parallel):__
Map each record into the below format

```
{
    "start_node_id": <id value>,
    "end_node_id": <id value>,
    "properties": {"prop1":<val1>, "prop2":<val2>,...}
}
```

__for parallel relationships:__
Map each record into the below format

```
{
    "start_node_id": <id value>,
    "end_node_id": <id value>,
    "rel_key": <value>,
    "properties": {"prop1":<val1>, "prop2":<val2>,...}
}
```


Lets create some helper functions for this

In [8]:
from typing import List, Optional


def csv_to_node_records(file_path:str, id_column:str, include_properties: Optional[List[str]]=None):
    #read the csv
    df = pd.read_csv(file_path)
    #select relevant columns and drop duplicates
    properties = [id_column]
    if include_properties:
        properties = properties + [col for col in include_properties if col != id_column]
    df = df[properties].drop_duplicates()

    # Create the "properties" by dropping the id column, then converting rows to dictionaries
    property_records = df.drop(columns=[id_column]).to_dict(orient='records') if include_properties else [dict() for i in range(df.shape[0])]

    # Combine "id" and "properties" into the desired format
    return pd.DataFrame({
        "id": df[id_column],
        "properties": property_records
    }).to_dict(orient='records')

def csv_to_relationship_records(file_path:str, start_node_id:str, end_node_id:str, rel_key: Optional[str]=None, include_properties: Optional[List[str]]=None):
    #read the csv
    df = pd.read_csv(file_path)

    #select relevant columns and drop duplicates
    id_props = [start_node_id, end_node_id]
    if rel_key:
        id_props.append(rel_key)

    properties = id_props.copy()
    if include_properties:
        properties = properties + [col for col in include_properties if col != start_node_id and col != end_node_id and col != rel_key]
    df = df[properties].drop_duplicates()

    # Create the "properties" by dropping the id columns, then converting rows to dictionaries
    property_records = df.drop(columns=id_props).to_dict(orient='records') if include_properties else [dict() for i in range(df.shape[0])]

    # Combine "id" and "properties" into the desired format
    if rel_key is None:
        res = pd.DataFrame({
            "start_node_id": df[start_node_id],
            "end_node_id": df[end_node_id],
            "properties": property_records
        }).to_dict(orient='records')
    else:
        res = pd.DataFrame({
            "start_node_id": df[start_node_id],
            "end_node_id": df[end_node_id],
            "rel_key": df[rel_key],
            "properties": property_records
        }).to_dict(orient='records')
    return res

We can test on some examples

In [9]:
import pandas as pd

id_column = 'articleId'
include_properties = ['graphicalAppearanceNo', 'graphicalAppearanceName', 'colourGroupCode', 'colourGroupName']

article_node_records = csv_to_node_records('data/article.csv', id_column, include_properties)

article_node_records[:3]


[{'id': 108775015,
  'properties': {'graphicalAppearanceNo': 1010016,
   'graphicalAppearanceName': 'Solid',
   'colourGroupCode': 9,
   'colourGroupName': 'Black'}},
 {'id': 108775044,
  'properties': {'graphicalAppearanceNo': 1010016,
   'graphicalAppearanceName': 'Solid',
   'colourGroupCode': 10,
   'colourGroupName': 'White'}},
 {'id': 110065001,
  'properties': {'graphicalAppearanceNo': 1010016,
   'graphicalAppearanceName': 'Solid',
   'colourGroupCode': 9,
   'colourGroupName': 'Black'}}]

In [10]:
start_node_id = 'articleId'
end_node_id = "productCode"
article_variant_of_prod_records = csv_to_relationship_records('data/article.csv', start_node_id, end_node_id)

article_variant_of_prod_records[:3]

[{'start_node_id': 108775015, 'end_node_id': 108775, 'properties': {}},
 {'start_node_id': 108775044, 'end_node_id': 108775, 'properties': {}},
 {'start_node_id': 110065001, 'end_node_id': 110065, 'properties': {}}]

Now lets crerate records for each node and relationship pattern

In [11]:
import json

node_records = {}

# Articles
id_column = 'articleId'
include_properties = ['graphicalAppearanceNo', 'graphicalAppearanceName', 'colourGroupCode', 'colourGroupName']
node_records['Article'] = {
        'idName':id_column,
        'records': csv_to_node_records('data/article.csv', id_column, include_properties)
}

# Products
id_column = 'productCode'
include_properties = ['prodName','productTypeNo','productTypeName','productGroupName','garmentGroupNo','garmentGroupName','detailDesc']
node_records['Product'] = {
        'idName':id_column,
        'records': csv_to_node_records('data/product.csv', id_column, include_properties)
}

# Departments
id_column = 'departmentNo'
include_properties = ['departmentName','sectionNo','sectionName']
node_records['Department'] = {
        'idName':id_column,
        'records': csv_to_node_records('data/department.csv', id_column, include_properties)
}

# Customers
id_column = 'customerId'
include_properties = ['age', 'postalCode']
node_records['Customer'] = {
        'idName':id_column,
        'records': csv_to_node_records('data/customer.csv', id_column, include_properties)
}

In [12]:
relationship_records = {}

# (Article)-[VARIANT_OF]->(Product)
start_node_id = 'articleId'
end_node_id = "productCode"
relationship_records[('Article','VARIANT_OF', 'Product')] = {
    'startNodeId':start_node_id,
    'endNodeId':end_node_id,
    'records': csv_to_relationship_records('data/article.csv', start_node_id, end_node_id)
}

# (Article)-[FROM_DEPARTMENT]->(Department)
start_node_id = 'articleId'
end_node_id = "departmentNo"
relationship_records[('Article','FROM_DEPARTMENT', 'Department')] = {
    'startNodeId':start_node_id,
    'endNodeId':end_node_id,
    'records': csv_to_relationship_records('data/article.csv', start_node_id, end_node_id)
}

# (Customer)-[PURCHASED]->(Article)
# here we have parallel relationships and relationship properties
start_node_id = 'customerId'
end_node_id = 'articleId'
rel_key = 'txId'
include_properties = ['tDat', 'price', 'salesChannelId']
relationship_records[('Customer','PURCHASED', 'Article')] = {
    'startNodeId':start_node_id,
    'endNodeId':end_node_id,
    'relKey': rel_key,
    'records': csv_to_relationship_records('data/transaction.csv', start_node_id, end_node_id, rel_key, include_properties)
}

In [63]:
#relationship_records[('Customer','PURCHASED', 'Article')]

{'startNodeId': 'customerId',
 'endNodeId': 'articleId',
 'relKey': 'txId',
 'records': [{'start_node_id': '0ddcd6055c5830c1fda493843d051edb04ce1bf888aa4becf5b839628396541d',
   'end_node_id': 653428002,
   'rel_key': 2445,
   'properties': {'tDat': '2018-09-20',
    'price': 0.1355762711864406,
    'salesChannelId': 1}},
  {'start_node_id': '210f113fe87db5d6391e986dc06b8e4369e46284e3b98964bf41ced4199a551f',
   'end_node_id': 636587001,
   'rel_key': 6182,
   'properties': {'tDat': '2018-09-20',
    'price': 0.008457627118644,
    'salesChannelId': 1}},
  {'start_node_id': '210f113fe87db5d6391e986dc06b8e4369e46284e3b98964bf41ced4199a551f',
   'end_node_id': 640462002,
   'rel_key': 6183,
   'properties': {'tDat': '2018-09-20',
    'price': 0.0321864406779661,
    'salesChannelId': 1}},
  {'start_node_id': '211a2ef477fcfc8fc40a63ffa70bb41086dd06ca85d4af83875485dbdf3419e6',
   'end_node_id': 645422002,
   'rel_key': 6188,
   'properties': {'tDat': '2018-09-20',
    'price': 0.01438983050

### Step 2: Creating Constraints (with Indexes)
This will make for efficient `MATCH` and `MERGE` operations.

In [13]:
# Create node key: https://neo4j.com/docs/cypher-manual/current/constraints/managing-constraints/#create-key-constraints
# On Neo4j Community Edition you need to use uniqueness constraints instead: https://neo4j.com/docs/cypher-manual/current/constraints/managing-constraints/#create-property-uniqueness-constraints

for label, data in node_records.items():
    driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE (n.{data['idName']}) IS NODE KEY")

### Step 3: Chunking (A.K.A Batching)
We create a few helper functions here to help us chunk records into batches and generate the query logic by node and relationship record group.

In [14]:
# helper function for chunking list - takes a list and creates a list of chunks per specified
def chunks(xs, n=1_000):
    """
    Splits a list into smaller chunks of a given size.

    This function takes a list `xs` and divides it into smaller list chunks, each
    of size `n` (except possibly the last chunk, which may have fewer elements).
    :param xs: The list to be split into smaller chunks.
    :param n: The size of each chunk. Defaults to 1,000.
    :return: A list of smaller list chunks, each containing `n` or fewer elements.
    :rtype: list
    """
    n = max(1, n)
    return [xs[i:i + n] for i in range(0, len(xs), n)]


def merge_nodes(label, id_name, records):
    """
    Merges node records into a Neo4j graph database.
    :param label: The label to assign to the node in the graph database.
    :param id_name: The name of the unique identifier property for the node i.e. the node id
    :param records: A list of node records to be merged into the graph database.
    """
    query = f"""
    UNWIND $records AS rec
    MERGE(n:{label} {{{id_name}: rec.id}})
    SET n += rec.properties
    """
    for recs in chunks(records):
        driver.execute_query(query, records=recs)

class IncidentNode():
    """
    Represents a start or end node for a relationship.
    :ivar label: The label of the node.
    :ivar idName: The unique identifier for the node  i.e. the node id
    """
    label:str
    idName:str
    def __init__(self, label, id_name):
        self.label = label
        self.idName = id_name


def merge_relationships(start_node:IncidentNode, end_node:IncidentNode, rel_type, records, rel_key=None):
    """
    Generates a Cypher query string for creating or updating relationships between nodes in a Neo4j Database

    :param start_node:IncidentNode The starting node of the relationship. It must include the node's label and
        id name
    :param end_node: IncidentNode The ending node of the relationship. It must include the node's label and
        id name
    :param rel_type:str The type of relationship to be created between the nodes.
    :param records: A list of dictionaries containing the start node id, end node id, and relationship properties.
    :param rel_key: Optional[str] The name of the relationship property to be used as a key for parallel relationships. Default is None
    """

    if rel_key is None:
        query = f"""
        UNWIND $records AS rec
        MERGE(n1:{start_node.label} {{{start_node.idName}: rec.start_node_id}})
        MERGE(n2:{end_node.label} {{{end_node.idName}: rec.end_node_id}})
        MERGE(n1)-[r:{rel_type}]->(n2)
        SET r += rec.properties
        """
    else:
        query = f"""
        UNWIND $records AS rec
        MERGE(n1:{start_node.label} {{{start_node.idName}: rec.start_node_id}})
        MERGE(n2:{end_node.label} {{{end_node.idName}: rec.end_node_id}})
        MERGE(n1)-[r:{rel_type} {{{rel_key}: rec.rel_key}}]->(n2)
        SET r += rec.properties
        """
    for recs in chunks(records):
        driver.execute_query(query, records=recs)



### Now Lets Run The Optimized Code!
You will note this runs in a matter of seconds

In [15]:
# create nodes
from tqdm import tqdm

for label, data in tqdm(node_records.items(), desc="Merging Nodes", unit="node"):
    merge_nodes(label, data['idName'], data['records'])

for rel_pattern, data in tqdm(relationship_records.items(), desc="Merging Relationships", unit="relationship"):
    start_node = IncidentNode(rel_pattern[0], data['startNodeId'])
    end_node = IncidentNode(rel_pattern[2], data['endNodeId'])
    merge_relationships(start_node,
                        end_node,
                        rel_pattern[1],
                        data['records'],
                        rel_key=data['relKey'] if 'relKey' in data else None)

Merging Nodes: 100%|██████████| 4/4 [00:05<00:00,  1.34s/node]
Merging Relationships: 100%|██████████| 3/3 [00:12<00:00,  4.18s/relationship]


Once you have loaded you can navigate to the Neo4j Console to view the resulting graph

![resulting graph](img/result.png)

## Cleanup

In [16]:
# remove all data
delete_chunk_size = 500
with driver.session() as session:
    # Delete all nodes and relationships
    session.run(f'''
    MATCH (n)
    CALL (n){{
      DETACH DELETE n
    }} IN TRANSACTIONS OF {delete_chunk_size} ROWS;
    ''')
# remove constraints
driver.execute_query("CALL apoc.schema.assert({},{},true) YIELD label, key RETURN *")

EagerResult(records=[<Record key='articleId' label='Article'>, <Record key='productCode' label='Product'>, <Record key='departmentNo' label='Department'>, <Record key='customerId' label='Customer'>], summary=<neo4j._work.summary.ResultSummary object at 0x14e899650>, keys=['key', 'label'])