# Map BigQuery to Neo4j Graph

![bq-to-neo4j](img/bq-to-neo4j.png)

## Setup

In [37]:
!gcloud auth application-default login > /dev/null 2>&1

In [1]:
from dotenv import load_dotenv
import os

env_file = '.env'
if os.path.exists(env_file):
    load_dotenv(env_file, override=True)

    NEO4J_URI = os.getenv('NEO4J_URI')
    NEO4J_USERNAME = os.getenv('NEO4J_USERNAME')
    NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')

else:
    print(f"File {env_file} not found.")

In [2]:
from neo4j import GraphDatabase
from google.cloud import bigquery

# Construct neo4j driver
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))

# Construct a BigQuery client object.
bq_client = bigquery.Client()

In [3]:
# Helper functions for loading Neo4j data
from typing import List, Optional
from tqdm import tqdm
import pandas as pd

def df_to_node_records(df:pd.DataFrame, id_column:str, include_properties: Optional[List[str]]=None):
    #select relevant columns and drop duplicates
    properties = [id_column]
    if include_properties:
        properties = properties + [col for col in include_properties if col != id_column]
    df = df[properties].drop_duplicates()

    # Create the "properties" by dropping the id column, then converting rows to dictionaries
    property_records = df.drop(columns=[id_column]).to_dict(orient='records') if include_properties else [dict() for i in range(df.shape[0])]

    # Combine "id" and "properties" into the desired format
    return pd.DataFrame({
        "id": df[id_column],
        "properties": property_records
    }).to_dict(orient='records')

def df_to_relationship_records(df:pd.DataFrame, start_node_id:str, end_node_id:str, rel_key: Optional[str]=None, include_properties: Optional[List[str]]=None):
    #select relevant columns and drop duplicates
    id_props = [start_node_id, end_node_id]
    if rel_key:
        id_props.append(rel_key)

    properties = id_props.copy()
    if include_properties:
        properties = properties + [col for col in include_properties if col != start_node_id and col != end_node_id and col != rel_key]
    df = df[properties].drop_duplicates()

    # Create the "properties" by dropping the id columns, then converting rows to dictionaries
    property_records = df.drop(columns=id_props).to_dict(orient='records') if include_properties else [dict() for i in range(df.shape[0])]

    # Combine "id" and "properties" into the desired format
    if rel_key is None:
        res = pd.DataFrame({
            "start_node_id": df[start_node_id],
            "end_node_id": df[end_node_id],
            "properties": property_records
        }).to_dict(orient='records')
    else:
        res = pd.DataFrame({
            "start_node_id": df[start_node_id],
            "end_node_id": df[end_node_id],
            "rel_key": df[rel_key],
            "properties": property_records
        }).to_dict(orient='records')
    return res

def chunks(xs, n=5_000):
    """
    Splits a list into smaller chunks of a given size.

    This function takes a list `xs` and divides it into smaller list chunks, each
    of size `n` (except possibly the last chunk, which may have fewer elements).
    :param xs: The list to be split into smaller chunks.
    :param n: The size of each chunk. Defaults to 1,000.
    :return: A list of smaller list chunks, each containing `n` or fewer elements.
    :rtype: list
    """
    n = max(1, n)
    return [xs[i:i + n] for i in range(0, len(xs), n)]


def merge_nodes(label, id_name, records):
    """
    Merges node records into a Neo4j graph database.
    :param label: The label to assign to the node in the graph database.
    :param id_name: The name of the unique identifier property for the node i.e. the node id
    :param records: A list of node records to be merged into the graph database.
    """
    query = f"""
    UNWIND $records AS rec
    MERGE(n:{label} {{{id_name}: rec.id}})
    SET n += rec.properties
    """
    for recs in tqdm(chunks(records), desc="merging nodes"):
        driver.execute_query(query, records=recs)

class IncidentNode():
    """
    Represents a start or end node for a relationship.
    :ivar label: The label of the node.
    :ivar idName: The unique identifier for the node  i.e. the node id
    """
    label:str
    idName:str
    def __init__(self, label, id_name):
        self.label = label
        self.idName = id_name


def merge_relationships(start_node:IncidentNode, end_node:IncidentNode, rel_type, records, rel_key=None, chunk_size=5_000):
    """
    Generates a Cypher query string for creating or updating relationships between nodes in a Neo4j Database

    :param start_node:IncidentNode The starting node of the relationship. It must include the node's label and
        id name
    :param end_node: IncidentNode The ending node of the relationship. It must include the node's label and
        id name
    :param rel_type:str The type of relationship to be created between the nodes.
    :param records: A list of dictionaries containing the start node id, end node id, and relationship properties.
    :param rel_key: Optional[str] The name of the relationship property to be used as a key for parallel relationships. Default is None
    :param chunk_size: how many rel racerds to batch at once when merging. setting can effect ingest speed.  Default is 5,000.
    """

    if rel_key is None:
        query = f"""
        UNWIND $records AS rec
        MERGE(n1:{start_node.label} {{{start_node.idName}: rec.start_node_id}})
        MERGE(n2:{end_node.label} {{{end_node.idName}: rec.end_node_id}})
        MERGE(n1)-[r:{rel_type}]->(n2)
        SET r += rec.properties
        """
    else:
        query = f"""
        UNWIND $records AS rec
        MERGE(n1:{start_node.label} {{{start_node.idName}: rec.start_node_id}})
        MERGE(n2:{end_node.label} {{{end_node.idName}: rec.end_node_id}})
        MERGE(n1)-[r:{rel_type} {{{rel_key}: rec.rel_key}}]->(n2)
        SET r += rec.properties
        """
    for recs in tqdm(chunks(records, chunk_size), desc="merging relationships"):
        driver.execute_query(query, records=recs)


def bq_table_to_neo4j_node(bq_db_table:str, node_label:str, node_id_name:str, exclude_properties:Optional[List[str]]=None):
    """
    This function retrieves data from a BigQuery table, transforms it into a format compatible with Neo4j,
    creates a node constraint in Neo4j, and merges the transformed data as nodes into the Neo4j graph database.

    :param bq_db_table: The BigQuery fully qualified table name, including the project ID, database, and table name
    :param node_label: The label to be assigned to the nodes in the Neo4j graph.
    :param node_id_name: The key attribute used as a unique identifier for the nodes in Neo4j.
    :return: None
    """
    # get BQ data
    print(f"Retrieving {bq_db_table} from BigQuery...")
    df = bq_client.query_and_wait(f'SELECT * FROM {bq_db_table}').to_dataframe()

    # convert to records format
    print(f"Processing data...")
    include_props = [c for c in df.columns.tolist() if c not in exclude_properties] if exclude_properties else df.columns.tolist()
    recs = df_to_node_records(df, node_id_name, include_props)

    # create node constraint
    print(f"Setting node constraint for {node_label}...")
    driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{node_label}) REQUIRE (n.{node_id_name}) IS NODE KEY")

    #merge node records
    print(f"Merging {len(recs)} node records into Neo4j...")
    merge_nodes(node_label, node_id_name, recs)


In [4]:
bq_data_id = "neo4jeventdemos.bomv2"

## Load Nodes with Properties

In [44]:
# Supplier Nodes
bq_table_to_neo4j_node(f'{bq_data_id}.suppliers',
                       node_label='Supplier',
                       node_id_name='code',
                       exclude_properties=['category','latitude','longitude'])

Retrieving neo4jeventdemos.bomv2.suppliers from BigQuery...
Processing data...
Setting node constraint for Supplier...
Merging 13890 node records into Neo4j...


merging nodes: 100%|██████████| 3/3 [00:01<00:00,  2.33it/s]


In [5]:
# Item Nodes (Component + Product)
bq_table_to_neo4j_node(f'{bq_data_id}.items',
                       node_label='Item',
                       node_id_name='sku_id')

Retrieving neo4jeventdemos.bomv2.items from BigQuery...




Processing data...
Setting node constraint for Item...
Merging 233609 node records into Neo4j...


merging nodes: 100%|██████████| 47/47 [00:08<00:00,  5.28it/s]


In [46]:
# Customer Nodes
bq_table_to_neo4j_node(f'{bq_data_id}.customers',
                       node_label='Customer',
                       node_id_name='code',
                       exclude_properties=['cust_family_code','latitude','longitude'])

Retrieving neo4jeventdemos.bomv2.customers from BigQuery...
Processing data...
Setting node constraint for Customer...
Merging 5447 node records into Neo4j...


merging nodes: 100%|██████████| 2/2 [00:00<00:00,  4.57it/s]


## Load Relationships

In [47]:
bq_table = 'supplier_items'
rel_type = 'AT'
start_node = IncidentNode(label='Item', id_name='sku_id')
end_node= IncidentNode(label='Supplier', id_name='code')
start_node_id_table_name = 'sku_id'
end_node_id_table_name = 'supplier_code'

# get BQ data
print(f"Retrieving {bq_data_id}.{bq_table} from BigQuery...")
df = bq_client.query_and_wait(f'SELECT * FROM {bq_data_id}.{bq_table}').to_dataframe()

# convert to records format
print(f"Processing data...")
recs = df_to_relationship_records(df,
                                  start_node_id_table_name,
                                  end_node_id_table_name)

# create node constraints
print(f"Setting node constraint for {start_node.label} and {end_node.label}...")
driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{start_node.label}) REQUIRE (n.{start_node.idName}) IS NODE KEY")
driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{end_node.label}) REQUIRE (n.{end_node.idName}) IS NODE KEY")

#merge relationship records
print(f"Merging {len(recs)} relationship records into Neo4j...")
merge_relationships(
    start_node=start_node,
    end_node=end_node,
    rel_type=rel_type,
    records=recs)

Retrieving neo4jeventdemos.bomv2.supplier_items from BigQuery...




Processing data...
Setting node constraint for Item and Supplier...
Merging 280463 relationship records into Neo4j...


merging relationships: 100%|██████████| 57/57 [00:14<00:00,  3.96it/s]


In [48]:
bq_table = 'customer_items'
rel_type = 'AT'
start_node = IncidentNode(label='Item', id_name='sku_id')
end_node= IncidentNode(label='Customer', id_name='code')
start_node_id_table_name = 'sku_id'
end_node_id_table_name = 'customer_code'

# get BQ data
print(f"Retrieving {bq_data_id}.{bq_table} from BigQuery...")
df = bq_client.query_and_wait(f'SELECT * FROM {bq_data_id}.{bq_table}').to_dataframe()

# convert to records format
print(f"Processing data...")
recs = df_to_relationship_records(df,
                                  start_node_id_table_name,
                                  end_node_id_table_name)

# create node constraints
print(f"Setting node constraint for {start_node.label} and {end_node.label}...")
driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{start_node.label}) REQUIRE (n.{start_node.idName}) IS NODE KEY")
driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{end_node.label}) REQUIRE (n.{end_node.idName}) IS NODE KEY")

#merge relationship records
print(f"Merging {len(recs)} relationship records into Neo4j...")
merge_relationships(
    start_node=start_node,
    end_node=end_node,
    rel_type=rel_type,
    records=recs)

Retrieving neo4jeventdemos.bomv2.customer_items from BigQuery...




Processing data...
Setting node constraint for Item and Customer...
Merging 337810 relationship records into Neo4j...


merging relationships: 100%|██████████| 68/68 [00:16<00:00,  4.19it/s]


In [52]:
bq_table = 'bom_table'
rel_type = 'BOM'
start_node = end_node = IncidentNode(label='Item', id_name='sku_id')
start_node_id_table_name = 'parent_sku_id'
end_node_id_table_name = 'child_sku_id'

# get BQ data
print(f"Retrieving {bq_data_id}.{bq_table} from BigQuery...")
df = bq_client.query_and_wait(f'SELECT * FROM {bq_data_id}.{bq_table}').to_dataframe()

# convert to records format
print(f"Processing data...")
recs = df_to_relationship_records(df,
                                  start_node_id_table_name,
                                  end_node_id_table_name)

# create node constraints
print(f"Setting node constraint for {start_node.label} and {end_node.label}...")
driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{start_node.label}) REQUIRE (n.{start_node.idName}) IS NODE KEY")
driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{end_node.label}) REQUIRE (n.{end_node.idName}) IS NODE KEY")

#merge relationship records
print(f"Merging {len(recs)} relationship records into Neo4j...")
merge_relationships(
    start_node=start_node,
    end_node=end_node,
    rel_type=rel_type,
    records=recs,
    chunk_size=20_000)

Retrieving neo4jeventdemos.bomv2.bom_table from BigQuery...




Processing data...
Setting node constraint for Item and Item...
Merging 7478476 relationship records into Neo4j...


merging relationships: 100%|██████████| 374/374 [06:39<00:00,  1.07s/it]


## Load GeoLocations

In [49]:
print(f"Setting GeoLocation node constraint and geo_point index....")
driver.execute_query(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:GeoLocation) REQUIRE (n.geo_point) IS NODE KEY")
driver.execute_query(f"CREATE POINT INDEX geo_location FOR (n:GeoLocation) ON (n.geo_point)")

Setting GeoLocation node constraint and geo_point index....


EagerResult(records=[], summary=<neo4j._work.summary.ResultSummary object at 0x11e91f190>, keys=[])

In [50]:
# Customer GeoLocations

# get BQ data
print(f"Retrieving {bq_data_id}.customers from BigQuery...")
recs = bq_client.query_and_wait(f'''
SELECT code, latitude, longitude
FROM {bq_data_id}.customers
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
''').to_dataframe().to_dict(orient='records')
print(f"Sample of records: \n{recs[:5]}...")

# load data
for rec_chunks in tqdm(chunks(recs), desc="Loading Customer GeoLocations"):
    driver.execute_query("""
    UNWIND $records AS rec
    WITH rec.code AS code,
        rec.latitude AS latitude,
        rec.longitude AS longitude,
        point({latitude: rec.latitude, longitude: rec.longitude}) AS geo_point

    MATCH(c:Customer {code:code})
    MERGE(n:GeoLocation {geo_point: geo_point})
    MERGE (c)-[:LOCATED_AT]->(n)

    SET n.latitude = latitude,
        n.longitude = longitude
    """, records = rec_chunks)


Retrieving neo4jeventdemos.bomv2.customers from BigQuery...
Sample of records: 
[{'code': 103055, 'latitude': 89.3055799, 'longitude': -179.9253201}, {'code': 102737, 'latitude': -89.1069501, 'longitude': -179.91195}, {'code': 98610, 'latitude': -89.7258501, 'longitude': -179.84485}, {'code': 98650, 'latitude': -89.7258501, 'longitude': -179.84485}, {'code': 81170, 'latitude': -86.6504901, 'longitude': -179.83836}]...


Loading Customer GeoLocations: 100%|██████████| 2/2 [00:00<00:00,  2.63it/s]


In [51]:
# Supplier GeoLocations

# get BQ data
print(f"Retrieving {bq_data_id}.suppliers from BigQuery...")
recs = bq_client.query_and_wait(f'''
SELECT code, latitude, longitude
FROM {bq_data_id}.suppliers
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
''').to_dataframe().to_dict(orient='records')
print(f"Sample of records: \n{recs[:5]}...")

# load data
for rec_chunks in tqdm(chunks(recs), desc="Loading Supplier GeoLocations"):
    driver.execute_query("""
    UNWIND $records AS rec
    WITH rec.code AS code,
        rec.latitude AS latitude,
        rec.longitude AS longitude,
        point({latitude: rec.latitude, longitude: rec.longitude}) AS geo_point

    MATCH(s:Supplier {code:code})
    MERGE(n:GeoLocation {geo_point: geo_point})
    MERGE (s)-[:LOCATED_AT]->(n)

    SET n.latitude = latitude,
        n.longitude = longitude
    """, records = rec_chunks)

Retrieving neo4jeventdemos.bomv2.suppliers from BigQuery...
Sample of records: 
[{'code': '1BYKA5', 'latitude': -71.85468, 'longitude': 71.681478}, {'code': '0Q6W9M', 'latitude': -71.85468, 'longitude': 71.681478}, {'code': 'U5TLHT', 'latitude': -71.85468, 'longitude': 71.681478}, {'code': '1QZ9ME', 'latitude': -71.85468, 'longitude': 71.681478}, {'code': 'WH3ZVG', 'latitude': -71.6294551, 'longitude': 70.89779}]...


Loading Supplier GeoLocations: 100%|██████████| 3/3 [00:01<00:00,  2.44it/s]
