In [1]:
import os
from typing import Dict, List, Union

from neo4j import GraphDatabase
import pandas as pd
from dotenv import load_dotenv

load_dotenv()

True

### Neo4j Driver

In [2]:
driver = GraphDatabase.driver(uri=os.environ.get("NEO4J_URI"), auth=(os.environ.get("NEO4J_USERNAME"), os.environ.get("NEO4J_PASSWORD")))
driver.verify_authentication()
driver.verify_connectivity()

In [3]:
ravi_query: str = """
// Ravi's
MATCH (n:{node_type})
WHERE n.name ='{node_name}'
WITH n
MATCH p=()-[*0..]->(n)
WITH n, nodes(p) AS nodes, relationships(p) AS rels
with n, { node: head(nodes) , rel: head(rels) } AS root
WITH n, COLLECT(DISTINCT root) AS roots
MATCH p=(n)-[*0..]->()
WITH n, roots, nodes(p) AS nodes, relationships(p) AS rels
with n, roots, { node: tail(nodes) , rel: CASE WHEN size(rels) > 1
THEN tail(rels) ELSE head(rels) END } AS leaf
WITH {node: n, rel: []} as n1, roots, COLLECT(DISTINCT leaf)
AS leaves
RETURN n1 + roots + leaves as data
"""

updated_query: str = """
MATCH (n:dbt_table_or_consumption_view{{name: 'prod_khc_sales.kroger.kroger_daily_point_of_sale_fact'}})
// Get Roots
CALL {{
    WITH n
    CALL apoc.path.expandConfig(n, {{
	    relationshipFilter: "<",
        minLevel: {min_level},
        maxLevel: {max_level},
        uniqueness: "RELATIONSHIP_GLOBAL"
    }})
    YIELD path AS p
    WITH nodes(p) AS nodes, 
         relationships(p) AS rels
    WITH {{ node: head(nodes) , rel: head(rels) }} AS root
    WITH COLLECT(DISTINCT root) AS roots
    RETURN roots
}}
WITH n, roots
// Get Leafs
CALL {{
    WITH n
    CALL apoc.path.expandConfig(n, {{
	    relationshipFilter: ">",
        minLevel: {min_level},
        maxLevel: {max_level},
        uniqueness: "RELATIONSHIP_GLOBAL"
    }})
    YIELD path AS p
    WITH n, nodes(p) AS nodes, relationships(p) AS rels
    WITH n, {{ node: tail(nodes) , rel: CASE WHEN size(rels) > 1
    THEN tail(rels) ELSE head(rels) END }} AS leaf
    RETURN {{node: n, rel: []}} as n1, COLLECT(DISTINCT leaf)
AS leaves
}}
WITH n1, roots, leaves
WITH id(n1['node']) AS sourceId, [n IN roots | id(n['node'])] AS rootIds, [n IN leaves WHERE isEmpty(n['node']) = false| n['node'][0]] AS leafNodeList
RETURN sourceId, rootIds, [n IN leafNodeList | id(n)] AS leafIds
"""

In [4]:
def run_query(query_template: str, min_level: int, max_level: int) -> List[Union[List[str], str]]:
    formatted_query = query_template.format(min_level=min_level, max_level=max_level)
    with driver.session() as session:
        return session.run(formatted_query).values()[0]