In [1]:
"""
This notebook contains code that loads the data in batches to neo4j
It is dependent on the intermediate csv files created and stored in raw_data/nodes and raw_data/relationships
"""

'\nThis notebook contains code that loads the data in batches to neo4j\nIt is dependent on the intermediate csv files created and stored in raw_data/nodes and raw_data/relationships\n'

In [29]:
from neo4j import GraphDatabase
import os
import pandas as pd
import time
import json
import random
print("Import Successful")

Import Successful


In [30]:
# Gets the values from .env file
URI = os.environ["NEO4J_URI"]
USER=os.environ["NEO4J_USER_NAME"]
PASSWORD=os.environ["NEO4J_PASSWD"]
AUTH = (os.environ["NEO4J_USER_NAME"], os.environ["NEO4J_PASSWD"])

In [31]:
# Neo4J connect and Query Boilerplate

class Neo4jConnection:
    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
            
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
            
    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            #response = (session.run(query, parameters))
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        
        #return pd.DataFrame([r.values() for r in response], columns=response.keys())
        return response
        
    def multi_query(self, multi_line_query, parameters=None, db=None):
        for li in multi_line_query.splitlines():
                print(li)
                result=self.query(li, parameters=None, db=None)
                print(result)

In [32]:
# Make a default connection and it should return `[<Record count(n)=0>]`for empty database
conn = Neo4jConnection(uri=URI, 
                       user=USER,
                       pwd=PASSWORD)

#if db is empty, then seed with init values 
res=conn.query('MATCH (n) RETURN count(n)')
print(res)

[<Record count(n)=0>]


In [6]:
# Set the constraint so that nodes with same IDs are merged
# This is to be done for each node types present
# NOTE: THIS NEEDS TO BE EXECUTED ONLY ONCE PER DATABASE
constraint_queries =[
"CREATE CONSTRAINT ingredient_id FOR (ing:Ingredient) REQUIRE ing.itemID IS UNIQUE;",
"CALL db.awaitIndexes();"
]

for cquery in constraint_queries:
    try:
        res = conn.query(cquery)
        print(f"Executed successfully: {cquery}")
    except Exception as e:
        print(f"Error executing query: {cquery}\n{e}")

Query failed: {code: Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists} {message: An equivalent constraint already exists, 'Constraint( id=4, name='ingredient_id', type='UNIQUENESS', schema=(:Ingredient {itemID}), ownedIndex=3 )'.}
Executed successfully: CREATE CONSTRAINT ingredient_id FOR (ing:Ingredient) REQUIRE ing.itemID IS UNIQUE;
Executed successfully: CALL db.awaitIndexes();


In [33]:
# Query to load the nodes
load_ingredients = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/nodes/ingredient_nodes.csv') yield map as row", 
"MERGE (ing:Ingredient {itemID: row.id}) 
ON CREATE SET ing.name = row.name, ing.category = row.category, ing.add_info = row.ing_info", 
{batchSize:1000, iterateList:true, parallel:true})"""


# ------ NOTE: Breakdown of query elements -------
# apoc.periodic.iterate - internal neo4j library to help load data in batches
# {batchSize:1000, iterateList:true, parallel:true} - parameters to load data in parallel
# '/nodes/ingredient_nodes.csv' - path of the file from neo4j_raw_data folder as this folder is mounted in docker
# (ing:Ingredient {itemID: row.id}) - Ingredient:Node type, ing:variable, itemID:node property, row:csv file variable, row.id:id is the csv column name
# ing.name = row.name - name column in csv file is set as name property for nodes where ing is the node variable


In [34]:
# Run the query to ingest the nodes to neo4j
res_pipeline=conn.query(load_ingredients)
print("Ingredient nodes loaded. Time taken:" + str(res_pipeline[0][2]) + ' seconds. Committed Operations: ' + str(res_pipeline[0][3]) + '. Failed Operations:' + str(res_pipeline[0][4]))

# NOTE: If you see failed operations, just execute the same cell again and again untill all operations are successful.

Ingredient nodes loaded. Time taken:0 seconds. Committed Operations: 11. Failed Operations:0


In [35]:
# Query to load the relationships and set properties for relationships
load_ing_subs="""
CALL apoc.periodic.iterate("CALL apoc.load.csv('/relationships/pairs.csv') yield map as row", 
"MATCH (ing1:Ingredient {itemID: row.src_id})
MATCH (ing2:Ingredient {itemID: row.target_id})
MERGE (ing1)-[rel:hasSubstitute]->(ing2)
SET rel.source = row.source, 
    rel.prop1 = row.some_prop", 
{batchSize:10000, iterateList:true, parallel:true})
"""

In [36]:
# Run the query to ingest the relationships to neo4j
res_pipeline=conn.query(load_ing_subs)
print("Relationships loaded. Time taken:" + str(res_pipeline[0][2]) + ' seconds. Committed Operations: ' + str(res_pipeline[0][3]) + '. Failed Operations:' + str(res_pipeline[0][4]))


Relationships loaded. Time taken:0 seconds. Committed Operations: 9. Failed Operations:0


In [37]:
# To load image embeddings into nodes

# Helper functions
def convert_to_json(result):
    data_dict = {}
    for item in result:
        curr_dict = dict(item[0])
        item_id = curr_dict['itemID']
        data_dict[item_id] = curr_dict
    return data_dict
    
# 1. Get all the nodes
query_str = """MATCH (n:Ingredient) return properties(n)"""

# 2. Execute the query
response = conn.query(query_str)

# 3. Convert to json for processing
json_res = convert_to_json(response)
print(json_res)

# 4. Create embeddings for each item in the dict. We need the itemID to load the embeddings to those nodes
for itemID in json_res:
    # creating a random embedding, replace with your actual embedding
    embedding = [random.gauss(0, 1) for _ in range(300)]
    # convert this to a retrievable string, as it is one of the best formats to load to Neo4j
    serialized_embed = json.dumps(embedding)
    # Load it to Neo4j
    update_query = """MERGE (ing:Ingredient {itemID: $id})
                      SET ing.embedding = $embed"""
    parameters = {'id':itemID, 'embed':serialized_embed}
    res = conn.query(update_query, parameters)

print("\nEmbeddings loaded")

# To query embedding and convert it back,
for itemID in json_res:
    query_str = """MATCH (n:Ingredient {itemID:$id}) RETURN n.embedding"""
    parameters = {'id': itemID}
    res = conn.query(query_str, parameters)
    embedding = json.loads(res[0]['n.embedding'])
    # Convert to torch or numpy to do whatever you need to do

    
    

{'8599098409': {'category': 'Vegetables and Vegetable Products', 'itemID': '8599098409', 'name': 'amaranth greens', 'add_info': 'https://fdc.nal.usda.gov/food-details/168385/nutrients'}, '4948256787': {'category': 'Vegetables and Vegetable Products', 'itemID': '4948256787', 'name': 'beet greens', 'add_info': 'https://fdc.nal.usda.gov/food-details/170375/nutrients'}, '7418228164': {'category': 'Vegetables and Vegetable Products', 'itemID': '7418228164', 'name': 'turnip greens', 'add_info': 'https://fdc.nal.usda.gov/food-details/170061/nutrients'}, '3273737018': {'category': 'Vegetables and Vegetable Products', 'itemID': '3273737018', 'name': 'kale', 'add_info': 'https://fdc.nal.usda.gov/food-details/168421/nutrients'}, '1588096030': {'category': 'nan', 'itemID': '1588096030', 'name': 'sorrel', 'add_info': 'nan'}, '1202552441': {'category': 'Legumes and Legume Products', 'itemID': '1202552441', 'name': 'tofu', 'add_info': 'https://fdc.nal.usda.gov/food-details/174291/nutrients'}, '276432