# Metadata Sync: Materialized Delta Tables

This notebook materializes Neo4j node labels and relationship types as **managed Delta tables**
in Unity Catalog. When data is written as a Delta table, UC automatically registers the full
schema metadata — column names, types, nullability, row counts, and statistics — making it
browsable in **Catalog Explorer** and queryable via `INFORMATION_SCHEMA`.

**What this proves:** Neo4j graph schema can be synchronized into Unity Catalog with zero
custom API calls. The Spark Connector infers the schema, and `saveAsTable()` does the rest.

### Steps

1. Load configuration from Databricks Secrets
2. Verify Neo4j connectivity
3. Discover all node labels and their properties
4. Create target UC schemas (catalog must already exist — see README)
5. Materialize a single label as a Delta table (test)
6. Verify metadata in `INFORMATION_SCHEMA`
7. Materialize all discovered labels
8. Materialize relationship types
9. Final verification and summary

### Prerequisites

- **Target catalog `neo4j_metadata` must already exist** — see `METADATA_SYNC_README.md`
- `neo4j-uc-creds` secret scope configured via `setup.sh`
- Neo4j Spark Connector installed on cluster (`org.neo4j:neo4j-connector-apache-spark_2.12:5.4.0_for_spark_3`)
- Neo4j Python driver installed (`neo4j`)
- **Single user** access mode cluster (required by Spark Connector)

---

## Configuration

In [None]:
# =============================================================================
# CONFIGURATION — Loaded from Databricks Secrets
# =============================================================================

import time
import re
from collections import defaultdict
from neo4j import GraphDatabase

SCOPE_NAME = "neo4j-uc-creds"

# Neo4j credentials
NEO4J_HOST = dbutils.secrets.get(SCOPE_NAME, "host")
NEO4J_USER = dbutils.secrets.get(SCOPE_NAME, "user")
NEO4J_PASSWORD = dbutils.secrets.get(SCOPE_NAME, "password")

try:
    NEO4J_DATABASE = dbutils.secrets.get(SCOPE_NAME, "database")
except Exception:
    NEO4J_DATABASE = "neo4j"

# Derived URLs
NEO4J_BOLT_URI = f"neo4j+s://{NEO4J_HOST}"

# Target catalog and schema for materialized tables
# Change these to match your environment
TARGET_CATALOG = "neo4j_metadata"
NODES_SCHEMA = "nodes"
RELATIONSHIPS_SCHEMA = "relationships"

# Initialize cross-cell variables to avoid NameError if cells are skipped
discovered_labels = defaultdict(list)
discovered_relationships = defaultdict(list)
label_results = []
rel_results = []
table_name = None
full_table = None

# Set Neo4j credentials at session level so they don't appear in Spark UI query plans
spark.conf.set("neo4j.url", NEO4J_BOLT_URI)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", NEO4J_USER)
spark.conf.set("neo4j.authentication.basic.password", NEO4J_PASSWORD)
spark.conf.set("neo4j.database", NEO4J_DATABASE)

print("Configuration loaded from Databricks Secrets:")
print(f"  Secret Scope: {SCOPE_NAME}")
print(f"  Neo4j Host: {NEO4J_HOST}")
print(f"  Bolt URI: {NEO4J_BOLT_URI}")
print(f"  Database: {NEO4J_DATABASE}")
print(f"  Target Catalog: {TARGET_CATALOG}")
print(f"  Nodes Schema: {NODES_SCHEMA}")
print(f"  Relationships Schema: {RELATIONSHIPS_SCHEMA}")
print(f"  Neo4j credentials: set at session level (not per-query)")

---

## Step 1: Verify Neo4j Connectivity

In [None]:
# =============================================================================
# VERIFY NEO4J CONNECTIVITY
# =============================================================================

print("=" * 60)
print("VERIFY NEO4J CONNECTIVITY")
print("=" * 60)

try:
    with GraphDatabase.driver(NEO4J_BOLT_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) as driver:
        driver.verify_connectivity()
        print("\n[PASS] Driver connectivity verified")

        with driver.session(database=NEO4J_DATABASE) as session:
            result = session.run("RETURN 1 AS test")
            record = result.single()
            print(f"[PASS] Query executed: RETURN 1 = {record['test']}")

            result = session.run("CALL dbms.components() YIELD name, versions RETURN name, versions")
            for record in result:
                print(f"[INFO] Connected to: {record['name']} {record['versions']}")

    print("\nStatus: PASS")

except Exception as e:
    print(f"\n[FAIL] Connection failed: {e}")
    print("\nStatus: FAIL")

---

## Step 2: Discover Node Labels and Properties

In [None]:
# =============================================================================
# DISCOVER NEO4J SCHEMA
# =============================================================================
# Uses db.schema.nodeTypeProperties() and db.schema.relTypeProperties()
# Built-in procedures, no APOC required.

print("=" * 60)
print("DISCOVER NEO4J SCHEMA")
print("=" * 60)

multi_label_skipped = 0

try:
    with GraphDatabase.driver(NEO4J_BOLT_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) as driver:
        with driver.session(database=NEO4J_DATABASE) as session:
            # Discover node label properties
            print("\n[INFO] Running CALL db.schema.nodeTypeProperties()...")
            result = session.run("CALL db.schema.nodeTypeProperties()")
            for record in result:
                # Skip properties with null name (labels with no properties)
                if record["propertyName"] is None:
                    continue
                labels = record["nodeLabels"]
                if len(labels) == 1:
                    label = labels[0]
                    discovered_labels[label].append({
                        "name": record["propertyName"],
                        "types": record["propertyTypes"],
                        "mandatory": record["mandatory"]
                    })
                else:
                    multi_label_skipped += 1

            # Discover relationship type properties
            print("[INFO] Running CALL db.schema.relTypeProperties()...")
            result = session.run("CALL db.schema.relTypeProperties()")
            for record in result:
                # Parse relType: format is `:`REL_TYPE`` — remove leading :` and trailing `
                raw = record["relType"]
                rel_type = re.sub(r'^:`|`$', '', raw)
                discovered_relationships[rel_type].append({
                    "name": record["propertyName"],
                    "types": record["propertyTypes"],
                    "mandatory": record["mandatory"]
                })

    # Display discovered schema
    print(f"\nNODE LABELS ({len(discovered_labels)} discovered):")
    print("-" * 50)
    for label, props in sorted(discovered_labels.items()):
        print(f"  {label}: {len(props)} properties")
        for p in props[:5]:
            types_str = ", ".join(p["types"])
            mandatory_str = " [mandatory]" if p["mandatory"] else ""
            print(f"    - {p['name']}: {types_str}{mandatory_str}")
        if len(props) > 5:
            print(f"    ... and {len(props) - 5} more")

    if multi_label_skipped > 0:
        print(f"\n[WARN] Skipped {multi_label_skipped} multi-label node type entries")
        print("  Multi-label nodes (e.g., :Person:Employee) are not materialized separately.")
        print("  Their properties appear under each individual label.")

    print(f"\nRELATIONSHIP TYPES ({len(discovered_relationships)} discovered):")
    print("-" * 50)
    for rel_type, props in sorted(discovered_relationships.items()):
        prop_count = len([p for p in props if p["name"] is not None])
        print(f"  {rel_type}: {prop_count} properties")

    print(f"\n[PASS] Schema discovery complete")

except Exception as e:
    print(f"\n[FAIL] Schema discovery failed: {e}")
    import traceback
    traceback.print_exc()

---

## Step 3: Create Target Schemas

The target catalog (`neo4j_metadata`) must already exist. See the README for setup instructions.
This step creates the `nodes` and `relationships` schemas within that catalog.

In [None]:
# =============================================================================
# CREATE TARGET SCHEMAS
# =============================================================================
# The catalog must already exist — see METADATA_SYNC_README.md for setup.

print("=" * 60)
print("CREATE TARGET SCHEMAS")
print("=" * 60)

# Verify catalog exists
try:
    spark.sql(f"USE CATALOG `{TARGET_CATALOG}`")
    print(f"\n[PASS] Catalog '{TARGET_CATALOG}' exists")
except Exception as e:
    print(f"\n[FAIL] Catalog '{TARGET_CATALOG}' not found: {e}")
    print("[INFO] Create it first — see METADATA_SYNC_README.md for instructions.")

try:
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{TARGET_CATALOG}`.`{NODES_SCHEMA}`")
    print(f"[PASS] Schema '{TARGET_CATALOG}.{NODES_SCHEMA}' exists")
except Exception as e:
    print(f"[FAIL] Could not create schema: {e}")

try:
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{TARGET_CATALOG}`.`{RELATIONSHIPS_SCHEMA}`")
    print(f"[PASS] Schema '{TARGET_CATALOG}.{RELATIONSHIPS_SCHEMA}' exists")
except Exception as e:
    print(f"[FAIL] Could not create schema: {e}")

# Verify
print(f"\n[INFO] Target structure:")
print(f"  {TARGET_CATALOG}")
print(f"  ├── {NODES_SCHEMA}")
print(f"  └── {RELATIONSHIPS_SCHEMA}")

---

## Step 4: Materialize One Label (Single Label Test)

Read the first discovered label from Neo4j via the Spark Connector and write it as a
managed Delta table. This validates the full pipeline before running it for all labels.

In [None]:
# =============================================================================
# MATERIALIZE ONE LABEL AS A DELTA TABLE
# =============================================================================

print("=" * 60)
print("MATERIALIZE SINGLE LABEL (TEST)")
print("=" * 60)

if not discovered_labels:
    print("\n[FAIL] No labels discovered — cannot proceed.")
    print("[INFO] Check the schema discovery cell above for errors.")
else:
    test_label = sorted(discovered_labels.keys())[0]
    table_name = test_label.lower()
    full_table = f"`{TARGET_CATALOG}`.`{NODES_SCHEMA}`.`{table_name}`"

    print(f"\n[INFO] Label: {test_label}")
    print(f"[INFO] Target table: {full_table}")

    try:
        # Read from Neo4j via Spark Connector
        # Credentials are set at session level in the config cell
        df = spark.read.format("org.neo4j.spark.DataSource") \
            .option("labels", f":{test_label}") \
            .load()

        print(f"\n[INFO] Inferred schema for :{test_label}:")
        df.printSchema()

        print(f"[INFO] Sample data (5 rows):")
        df.show(5, truncate=False)

        col_count = len(df.columns)

        # Write as managed Delta table with overwriteSchema for idempotent reruns
        df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(full_table)

        # Get row count from the written table (avoids double-scanning Neo4j)
        row_count = spark.sql(f"SELECT COUNT(*) AS cnt FROM {full_table}").collect()[0]["cnt"]

        print(f"\n[PASS] Materialized :{test_label} → {full_table}")
        print(f"  Rows: {row_count}")
        print(f"  Columns: {col_count}")

    except Exception as e:
        print(f"\n[FAIL] Materialization failed: {e}")
        import traceback
        traceback.print_exc()

---

## Step 5: Verify Metadata in INFORMATION_SCHEMA

Confirm that the materialized table and its columns are now visible in Unity Catalog's
`INFORMATION_SCHEMA`. This is the key proof that metadata sync worked — the table appears
in Catalog Explorer with full column definitions.

In [None]:
# =============================================================================
# VERIFY METADATA IN INFORMATION_SCHEMA
# =============================================================================

print("=" * 60)
print("VERIFY METADATA IN INFORMATION_SCHEMA")
print("=" * 60)

if table_name is None:
    print("\n[SKIP] No table was materialized — skipping verification.")
else:
    # Check tables
    print(f"\n[INFO] Tables in {TARGET_CATALOG}.{NODES_SCHEMA}:")
    spark.sql(f"""
        SELECT table_name, table_type, comment, created
        FROM `{TARGET_CATALOG}`.information_schema.tables
        WHERE table_schema = '{NODES_SCHEMA}'
        ORDER BY table_name
    """).show(truncate=False)

    # Check columns for the test table
    print(f"\n[INFO] Columns in {TARGET_CATALOG}.{NODES_SCHEMA}.{table_name}:")
    spark.sql(f"""
        SELECT ordinal_position, column_name, data_type, is_nullable
        FROM `{TARGET_CATALOG}`.information_schema.columns
        WHERE table_schema = '{NODES_SCHEMA}'
          AND table_name = '{table_name}'
        ORDER BY ordinal_position
    """).show(truncate=False)

    # Verify row count via SQL
    count_result = spark.sql(f"SELECT COUNT(*) AS cnt FROM {full_table}").collect()[0]["cnt"]
    print(f"[PASS] Table {full_table} has {count_result} rows")
    print(f"[PASS] Metadata is visible in INFORMATION_SCHEMA and Catalog Explorer")

---

## Step 6: Materialize All Discovered Labels

Loop through all discovered node labels and materialize each one as a managed Delta table.

In [None]:
# =============================================================================
# MATERIALIZE ALL NODE LABELS
# =============================================================================

print("=" * 60)
print(f"MATERIALIZE ALL NODE LABELS ({len(discovered_labels)} labels)")
print("=" * 60)

label_results = []

for label in sorted(discovered_labels.keys()):
    tbl_name = label.lower()
    full_tbl = f"`{TARGET_CATALOG}`.`{NODES_SCHEMA}`.`{tbl_name}`"

    start = time.time()
    try:
        df = spark.read.format("org.neo4j.spark.DataSource") \
            .option("labels", f":{label}") \
            .load()

        col_count = len(df.columns)

        df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(full_tbl)

        # Get row count from written table (avoids double Neo4j scan)
        row_count = spark.sql(f"SELECT COUNT(*) AS cnt FROM {full_tbl}").collect()[0]["cnt"]

        elapsed = time.time() - start
        label_results.append({
            "label": label,
            "table": tbl_name,
            "rows": row_count,
            "columns": col_count,
            "time_s": round(elapsed, 1),
            "status": "PASS"
        })
        print(f"  [PASS] :{label} → {full_tbl} ({row_count} rows, {col_count} cols, {elapsed:.1f}s)")

    except Exception as e:
        elapsed = time.time() - start
        error_msg = str(e).split('\n')[0][:80]
        label_results.append({
            "label": label,
            "table": tbl_name,
            "rows": 0,
            "columns": 0,
            "time_s": round(elapsed, 1),
            "status": f"FAIL: {error_msg}"
        })
        print(f"  [FAIL] :{label} — {error_msg}")

# Summary
passed = [r for r in label_results if r["status"] == "PASS"]
failed = [r for r in label_results if r["status"] != "PASS"]
total_rows = sum(r["rows"] for r in passed)
total_cols = sum(r["columns"] for r in passed)

print(f"\n" + "=" * 60)
print(f"NODE LABELS SUMMARY")
print(f"  Passed: {len(passed)}/{len(label_results)}")
print(f"  Total rows: {total_rows:,}")
print(f"  Total columns: {total_cols}")
if failed:
    print(f"  Failed: {', '.join(r['label'] for r in failed)}")

---

## Step 7: Materialize Relationship Types

Read relationship types from Neo4j and write them as Delta tables in the `relationships` schema.
The Spark Connector's `relationship` option requires `relationship.source.labels` and
`relationship.target.labels`, so we first discover the actual patterns via a `MATCH` query.

In [None]:
# =============================================================================
# MATERIALIZE RELATIONSHIP TYPES
# =============================================================================

print("=" * 60)
print("MATERIALIZE RELATIONSHIP TYPES")
print("=" * 60)

# Discover relationship patterns (source label, type, target label)
# db.schema.relTypeProperties() does NOT return source/target labels,
# so we use a MATCH query to discover actual patterns from the data.
rel_patterns = []
try:
    with GraphDatabase.driver(NEO4J_BOLT_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) as driver:
        with driver.session(database=NEO4J_DATABASE) as session:
            result = session.run("""
                MATCH (src)-[r]->(tgt)
                WITH type(r) AS relType, labels(src) AS srcLabels, labels(tgt) AS tgtLabels
                RETURN DISTINCT relType, srcLabels[0] AS sourceLabel, tgtLabels[0] AS targetLabel
                ORDER BY relType
            """)
            for record in result:
                rel_patterns.append({
                    "type": record["relType"],
                    "source": record["sourceLabel"],
                    "target": record["targetLabel"]
                })

    print(f"\n[INFO] Found {len(rel_patterns)} relationship patterns")
    for p in rel_patterns:
        print(f"  (:{p['source']})-[:{p['type']}]->(:{p['target']})")

except Exception as e:
    print(f"\n[FAIL] Could not discover relationship patterns: {e}")
    print("[INFO] Cannot proceed without source/target labels — the Spark Connector requires them.")
    import traceback
    traceback.print_exc()

rel_results = []

for pattern in rel_patterns:
    rel_type = pattern["type"]
    source_label = pattern["source"]
    target_label = pattern["target"]
    tbl_name = rel_type.lower()
    full_tbl = f"`{TARGET_CATALOG}`.`{RELATIONSHIPS_SCHEMA}`.`{tbl_name}`"

    start = time.time()
    try:
        df = spark.read.format("org.neo4j.spark.DataSource") \
            .option("relationship", rel_type) \
            .option("relationship.source.labels", f":{source_label}") \
            .option("relationship.target.labels", f":{target_label}") \
            .load()

        col_count = len(df.columns)

        df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(full_tbl)

        # Get row count from written table
        row_count = spark.sql(f"SELECT COUNT(*) AS cnt FROM {full_tbl}").collect()[0]["cnt"]

        elapsed = time.time() - start
        rel_results.append({
            "type": rel_type,
            "pattern": f"(:{source_label})-[:{rel_type}]->(:{target_label})",
            "table": tbl_name,
            "rows": row_count,
            "columns": col_count,
            "time_s": round(elapsed, 1),
            "status": "PASS"
        })
        print(f"  [PASS] [:{rel_type}] → {full_tbl} ({row_count} rows, {col_count} cols, {elapsed:.1f}s)")

    except Exception as e:
        elapsed = time.time() - start
        error_msg = str(e).split('\n')[0][:80]
        rel_results.append({
            "type": rel_type,
            "pattern": f"(:{source_label})-[:{rel_type}]->(:{target_label})",
            "table": tbl_name,
            "rows": 0,
            "columns": 0,
            "time_s": round(elapsed, 1),
            "status": f"FAIL: {error_msg}"
        })
        print(f"  [FAIL] [:{rel_type}] — {error_msg}")

# Summary
passed_rels = [r for r in rel_results if r["status"] == "PASS"]
failed_rels = [r for r in rel_results if r["status"] != "PASS"]

print(f"\n" + "=" * 60)
print(f"RELATIONSHIP TYPES SUMMARY")
print(f"  Passed: {len(passed_rels)}/{len(rel_results)}")
print(f"  Total rows: {sum(r['rows'] for r in passed_rels):,}")
if failed_rels:
    print(f"  Failed: {', '.join(r['type'] for r in failed_rels)}")

---

## Step 8: Final Verification and Summary

Query `INFORMATION_SCHEMA` to show all tables created across both schemas. This is the
final proof that Neo4j metadata is synchronized into Unity Catalog.

In [None]:
# =============================================================================
# FINAL VERIFICATION AND SUMMARY
# =============================================================================

print("=" * 60)
print("FINAL VERIFICATION")
print("=" * 60)

# All tables in the catalog
print(f"\n[INFO] All tables in {TARGET_CATALOG}:")
all_tables_df = spark.sql(f"""
    SELECT table_schema, table_name, table_type
    FROM `{TARGET_CATALOG}`.information_schema.tables
    WHERE table_schema IN ('{NODES_SCHEMA}', '{RELATIONSHIPS_SCHEMA}')
    ORDER BY table_schema, table_name
""")
all_tables_df.show(50, truncate=False)

# Column counts per table
print(f"\n[INFO] Column counts per table:")
spark.sql(f"""
    SELECT table_schema, table_name, COUNT(*) AS column_count
    FROM `{TARGET_CATALOG}`.information_schema.columns
    WHERE table_schema IN ('{NODES_SCHEMA}', '{RELATIONSHIPS_SCHEMA}')
    GROUP BY table_schema, table_name
    ORDER BY table_schema, table_name
""").show(50, truncate=False)

# Overall summary
total_tables = all_tables_df.count()
node_tables = len([r for r in label_results if r["status"] == "PASS"])
rel_tables_count = len([r for r in rel_results if r["status"] == "PASS"])
total_data_rows = (sum(r['rows'] for r in label_results if r['status'] == 'PASS') +
                   sum(r['rows'] for r in rel_results if r['status'] == 'PASS'))

print("\n" + "=" * 60)
print("METADATA SYNC SUMMARY")
print("=" * 60)
print(f"  Target Catalog: {TARGET_CATALOG}")
print(f"  Node label tables: {node_tables} (in {NODES_SCHEMA})")
print(f"  Relationship tables: {rel_tables_count} (in {RELATIONSHIPS_SCHEMA})")
print(f"  Total tables: {total_tables}")
print(f"  Total data rows: {total_data_rows:,}")
print(f"\n  All tables are:")
print(f"    - Browsable in Catalog Explorer")
print(f"    - Visible in INFORMATION_SCHEMA")
print(f"    - Queryable via standard SQL")
print(f"    - Governed by UC permissions")
print(f"\n[PASS] Metadata synchronization complete")