# Neo4j integration
_TODO: ADD DESC_

### Initial configuration

In [0]:
!pip install python-dotenv neo4j

In [0]:
from neo4j import GraphDatabase
import os
from dotenv import load_dotenv
from pyspark.sql.functions import explode, col, lower, trim
from neo4j import GraphDatabase

## Connection

### Retrieve credentials

In [0]:
load_dotenv(".env")

NEO4J_URI      = os.getenv("NEO4J_URI")
NEO4J_USER     = os.getenv("NEO4J_USER")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")

### Test connection

In [0]:
def test_connection(driver):
    print("Testing Neo4j connection...")
    with driver.session() as session:
        result = session.run("MATCH (n) RETURN n LIMIT 1")
        for record in result:
            print("Record:", record)
    print("Connection test completed.")

In [0]:

print("Creating Neo4j driver instance...")
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

test_connection(driver)

print("Closing Neo4j driver...")
driver.close()
print("Driver closed.")

In [0]:
print("Deleting all nodes and relationships in Neo4j...")
with driver.session() as session:
    session.run("MATCH (n) DETACH DELETE n")
print("Deletion completed.")

## Schema definition

In [0]:
molecules_df = spark.table("wine_harmonization.datasets.molecules")
wines_df = spark.table("wine_harmonization.datasets.wines") \
    .dropDuplicates(["title"])
ingredients_df = spark.table("wine_harmonization.datasets.ingredients_molecules") \
    .withColumn("entity_alias_readable", lower(col("entity_alias_readable")))
recipes_df = spark.table("wine_harmonization.datasets.recipes")

In [0]:
# Databricks → Neo4j bulk insert (batch = 100, logs mínimos, compatível DBX Free)
# Preencha suas credenciais👇
uri      = "neo4j+s://37870b65.databases.neo4j.io"
user     = "neo4j"
password = "RfxKV1GMD0iIUmCRnKrAtxpSGB4Q3HKvoEAllGH_qx0"

from pyspark.sql.functions import explode, col, lower, trim
from neo4j import GraphDatabase
from neo4j.exceptions import TransientError
import time

batch_size = 100
max_retries = 3
retry_delay = 1.5  # segundos

def make_processor(build_item_fn, cypher):
    def _proc(rows):
        driver = GraphDatabase.driver(uri, auth=(user, password))
        with driver.session() as sess:
            batch = []
            inserted = 0
            for row in rows:
                item = build_item_fn(row)
                if item is None:
                    continue
                batch.append(item)
                if len(batch) == batch_size:
                    _execute_with_retry(sess, cypher, batch)
                    inserted += len(batch)
                    print(f"  → {inserted} inseridos")
                    batch = []
            if batch:
                _execute_with_retry(sess, cypher, batch)
                inserted += len(batch)
                print(f"  → {inserted} inseridos (final)")
        driver.close()
    return _proc

def _execute_with_retry(sess, cypher, batch):
    for attempt in range(max_retries):
        try:
            sess.run(cypher, batch=batch)
            return
        except TransientError as e:
            print(f"⚠️  Deadlock detectado. Tentando novamente ({attempt + 1})...")
            time.sleep(retry_delay * (attempt + 1))
        except Exception as e:
            print(f"❌ Erro inesperado: {str(e)}")
            break

# ────────────────────────────────────────────────────────────────────────────────
# 1. Constraints (uma vez só) ────────────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────
with GraphDatabase.driver(uri, auth=(user, password)) as drv:
    with drv.session() as s:
        for cmd in [
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Wine)       REQUIRE n.title       IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Flavor)     REQUIRE n.name        IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Molecule)   REQUIRE n.pubchem_id  IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Ingredient) REQUIRE n.name        IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Recipe)     REQUIRE n.id          IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Grape)      REQUIRE n.name        IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:PriceRange) REQUIRE n.name        IS UNIQUE",
        ]:
            s.run(cmd)
print("✅ Constraints criadas")

# ────────────────────────────────────────────────────────────────────────────────
# 2. PriceRange fixos ────────────────────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────
with GraphDatabase.driver(uri, auth=(user, password)) as drv:
    with drv.session() as s:
        s.run("UNWIND $x AS name MERGE (:PriceRange {name:name})", x=["Budget","Mid-range","Premium","Luxury"])
print("✅ PriceRange inseridos")


# ────────────────────────────────────────────────────────────────────────────────
# 3. Flavors ─────────────────────────────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────
flavor_df = (
    molecules_df.select(explode("flavor_array").alias("fl"))
    .union(wines_df.select(explode("Characteristics").alias("fl")))
    .withColumn("fl", trim(lower(col("fl"))))
    .filter(col("fl") != "")
    .distinct()
)

flavor_cypher = "UNWIND $batch AS name MERGE (:Flavor {name:name})"
flavor_df.foreachPartition(
    make_processor(lambda r: r.fl, flavor_cypher)
)
print("✅ Flavors inseridos")

# ────────────────────────────────────────────────────────────────────────────────
# 4. Grapes ──────────────────────────────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────

grape_df = wines_df.select(trim(col("Grape")).alias("gr")).filter(col("gr") != "").distinct()

grape_cypher = "UNWIND $batch AS name MERGE (:Grape {name:name})"
grape_df.foreachPartition(
    make_processor(lambda r: r.gr, grape_cypher)
)
print("✅ Grapes inseridos")

# ────────────────────────────────────────────────────────────────────────────────
# 5. Molecules + HAS_FLAVOR_PROFILE ─────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────

def mol_item(r):
    if r.pubchem_id is None:
        return None
    return {
        "pubchem_id": int(r.pubchem_id),
        "name": r.common_name,
        "mw": float(r.molecular_weight) if r.molecular_weight is not None else None,
        "fl": [x.strip().lower() for x in r.flavor_array] if r.flavor_array else []
    }

mol_cypher = """
UNWIND $batch AS row
MERGE (m:Molecule {pubchem_id:row.pubchem_id})
SET m.name = row.name, m.molecular_weight = row.mw
WITH m, row.fl AS flavors
UNWIND flavors AS f
MATCH (fl:Flavor {name:f})
MERGE (m)-[:HAS_FLAVOR_PROFILE]->(fl)
"""

molecules_df.select("common_name","pubchem_id","molecular_weight","flavor_array").foreachPartition(
    make_processor(mol_item, mol_cypher)
)
print("✅ Molecules inseridos")

# ────────────────────────────────────────────────────────────────────────────────
# 6. Ingredients + CONTAINS_MOLECULE ─────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────

def ing_item(r):
    if not r.entity_alias_readable or not r.pubchem_ids:
        return None
    return {"name": r.entity_alias_readable, "ids": [int(x) for x in r.pubchem_ids]}

ing_cypher = """
UNWIND $batch AS row
MERGE (i:Ingredient {name:row.name})
WITH i, row.ids AS ids
UNWIND ids AS pid
MATCH (m:Molecule {pubchem_id:pid})
MERGE (i)-[:CONTAINS_MOLECULE]->(m)
"""

ingredients_df.select("entity_alias_readable","pubchem_ids").foreachPartition(
    make_processor(ing_item, ing_cypher)
)
print("✅ Ingredients inseridos")

# ────────────────────────────────────────────────────────────────────────────────
# 7. Recipes + CONTAINS_INGREDIENT ───────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────

def rec_item(r):
    if r.id is None or r.title is None:
        return None
    return {"id": int(r.id), "title": r.title, "ings": r.ingredients_list or []}

rec_cypher = """
UNWIND $batch AS row
MERGE (r:Recipe {id: row.id})
SET r.title = row.title
WITH r, coalesce(row.ings, []) AS ings
UNWIND ings AS ing
MERGE (i:Ingredient {name: ing})
MERGE (r)-[:CONTAINS_INGREDIENT]->(i);
"""

recipes_df.select("id","title","ingredients_list").foreachPartition(
    make_processor(rec_item, rec_cypher)
)
print("✅ Recipes inseridos")

# ────────────────────────────────────────────────────────────────────────────────
# 8. Wines + relações ────────────────────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────────────

def wine_item(r):
    if r.Title is None:
        return None
    price = float(r.Price) if r.Price is not None else 0.0
    if price <= 100:
        pr = "Budget"
    elif price <= 200:
        pr = "Mid-range"
    elif price <= 500:
        pr = "Premium"
    else:
        pr = "Luxury"
    return {
        "title": r.Title,
        "type": r.Type,
        "price": price,
        "grape": r.Grape,
        "chars": [x.strip().lower() for x in r.Characteristics] if r.Characteristics else [],
        "pr": pr
    }

wine_cypher = """
UNWIND $batch AS row
MERGE (w:Wine {title:row.title})
SET w.type = row.type, w.price = row.price
WITH w, row
MATCH (g:Grape {name: row.grape})
MERGE (w)-[:MADE_FROM]->(g)
WITH w, row
UNWIND row.chars AS ch
MATCH (f:Flavor {name:ch})
MERGE (w)-[:HAS_CHARACTERISTIC]->(f)
WITH w, row
MATCH (pr:PriceRange {name:row.pr})
MERGE (w)-[:IN_PRICE_RANGE]->(pr)
"""

wines_df.select("Title","Type","Price","Grape","Characteristics").foreachPartition(
    make_processor(wine_item, wine_cypher)
)
print("✅ Wines inseridos | Processo completo 🎉")

## Node insertion