# Load London Transport Network to Neo4j

This notebook loads the **complete London Transport Network** dataset from CSV files into Neo4j via Delta Lake tables.

## What This Notebook Does

1. **Loads CSVs** — Reads London stations and tube lines from Unity Catalog Volume
2. **Creates Delta Tables** — Stores data in Delta Lake for validation and transformation
3. **Creates Nodes** — Writes Station nodes to Neo4j
4. **Creates Relationships** — Establishes tube line connections with dynamic relationship types for each line
5. **Validates Data** — Provides comprehensive queries to verify successful loading

## Data Overview

- **302 London stations** with names, zones, postcodes, and coordinates
- **All tube line connections** with bidirectional relationships
- **Dynamic relationship types** for each tube line (e.g., :BAKERLOO, :CENTRAL, :CIRCLE, :DISTRICT, etc.)

## Implementation Strategy

- **Part 1-2:** Load stations to Delta Lake and Neo4j
- **Part 3-4:** Load Bakerloo line as proof-of-concept and validate
- **Part 5-6:** Load all remaining tube lines with dynamic relationships and final validation

---

## Prerequisites

Before running this notebook:

1. Neo4j database created (Aura or self-hosted)
2. Databricks Secrets configured (`neo4j` scope with `password` key)
3. Databricks cluster configured with:
   - **Access mode**: Dedicated (formerly: Single user) - REQUIRED
   - **Runtime**: 13.3 LTS or higher (Spark 3.x)
   - **Neo4j Spark Connector** library installed (Maven)
      - **Maven**: `org.neo4j:neo4j-connector-apache-spark_2.12:5.3.1_for_spark_3`
   - **Neo4j Python Driver** library installed (PyPI)
      - **PyPI**: `neo4j==6.0.2`
   - **Important:** The Neo4j Spark Connector requires "Dedicated" access mode and will NOT work in Shared mode.
4. Unity Catalog Volume created with CSV files uploaded



---

## Section 1: Configure Connection and Paths

Use interactive widgets to configure Neo4j connection and Unity Catalog paths with sensible defaults.

In [None]:
# Remove existing widgets if any
dbutils.widgets.removeAll()

# Neo4j connection widgets
dbutils.widgets.text("neo4j_url", "bolt://localhost:7687", "Neo4j URL")
dbutils.widgets.text("neo4j_username", "neo4j", "Neo4j Username")
dbutils.widgets.text("neo4j_database", "neo4j", "Neo4j Database")

# Unity Catalog and Delta Lake widgets
dbutils.widgets.text("catalog_name", "london_catalog", "Catalog Name")
dbutils.widgets.text("schema_name", "london_schema", "Schema Name")
dbutils.widgets.text("volume_name", "london_transport", "Volume Name")

print("✓ Widgets created successfully")
print("\nConfigure the widgets above with your specific values, then run the next cell.")

In [None]:
# Get widget values
NEO4J_URL = dbutils.widgets.get("neo4j_url")
NEO4J_USER = dbutils.widgets.get("neo4j_username")
NEO4J_DB = dbutils.widgets.get("neo4j_database")
CATALOG = dbutils.widgets.get("catalog_name")
SCHEMA = dbutils.widgets.get("schema_name")
VOLUME = dbutils.widgets.get("volume_name")

# Retrieve Neo4j password from Databricks Secrets
# Note: Password is kept in secrets for security
NEO4J_PASS = dbutils.secrets.get(scope="neo4j", key="password")

# Configure Spark session for Neo4j
spark.conf.set("neo4j.url", NEO4J_URL)
spark.conf.set("neo4j.authentication.basic.username", NEO4J_USER)
spark.conf.set("neo4j.authentication.basic.password", NEO4J_PASS)
spark.conf.set("neo4j.database", NEO4J_DB)

# Unity Catalog Volume path
BASE_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"

# Delta Lake table names
STATIONS_TABLE = f"{CATALOG}.{SCHEMA}.london_stations"
TUBE_LINES_TABLE = f"{CATALOG}.{SCHEMA}.london_tube_lines"

print("Configuration loaded from widgets:")
print(f"✓ Neo4j URL: {NEO4J_URL}")
print(f"✓ Neo4j User: {NEO4J_USER}")
print(f"✓ Database: {NEO4J_DB}")
print(f"✓ Catalog: {CATALOG}")
print(f"✓ Schema: {SCHEMA}")
print(f"✓ Volume: {VOLUME}")
print(f"✓ Data path: {BASE_PATH}")
print(f"✓ Stations table: {STATIONS_TABLE}")
print(f"✓ Tube lines table: {TUBE_LINES_TABLE}")

## Section 2: Test Neo4j Connection

Verify connectivity before proceeding with data loading.

In [None]:
# Test connection by attempting to read from Neo4j
try:
    test_df = (
        spark.read.format("org.neo4j.spark.DataSource")
        .option("url", NEO4J_URL)
        .option("authentication.basic.username", NEO4J_USER)
        .option("authentication.basic.password", NEO4J_PASS)
        .option("labels", "Station")
        .load()
        .limit(5)
    )
    
    count = test_df.count()
    print(f"✓ Connection successful! Found {count} Station nodes.")
    if count > 0:
        print("\nSample data:")
        display(test_df)
    else:
        print("\n(No data loaded yet - this is expected on first run)")
        
except Exception as e:
    print(f"✗ Connection failed: {str(e)}")
    print("\nPlease verify:")
    print("  1. Neo4j database is running")
    print("  2. Connection URL is correct")
    print("  3. Credentials are valid")
    print("  4. Network/firewall allows connection")

---

# Part 1: Load CSVs to Delta Lake Tables

Read CSV files from Unity Catalog Volume and write to Delta Lake tables for intermediate storage and validation.

---

## Step 1: Verify CSV Files

Check that CSV files are accessible in the Unity Catalog Volume.

In [None]:
# List CSV files in the volume
try:
    files = dbutils.fs.ls(BASE_PATH)
    print(f"Files in {BASE_PATH}:")
    for file in sorted(files):
        print(f"  - {file.name}")
except Exception as e:
    print(f"✗ Error listing files: {str(e)}")
    print(f"\nPlease verify:")
    print(f"  1. Volume path is correct: {BASE_PATH}")
    print(f"  2. CSV files have been uploaded to the volume")

## Step 2: Load Stations CSV

Read London_stations.csv and preview the data.

In [None]:
from pyspark.sql import functions as F

# Read stations CSV with header
stations_df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{BASE_PATH}/London_stations.csv")
)

print(f"Stations loaded: {stations_df.count()} rows")
print("\nSchema:")
stations_df.printSchema()
print("\nSample data:")
stations_df.show(5, truncate=False)

## Step 3: Transform Stations Data

Select and rename columns to match Neo4j schema, and ensure proper data types.

In [None]:
# Select relevant columns and rename for Neo4j
stations_clean = (
    stations_df
    .select(
        F.col("ID").cast("integer").alias("station_id"),
        F.col("Station_Name").alias("name"),
        F.col("Latitude").cast("double").alias("latitude"),
        F.col("Longitude").cast("double").alias("longitude"),
        F.col("Zone").alias("zone"),
        F.col("Postcode").alias("postcode")
    )
)

print("Transformed schema:")
stations_clean.printSchema()
print("\nSample transformed data:")
stations_clean.show(5, truncate=False)

## Step 4: Write Stations to Delta Lake Table

Create Delta Lake table for stations data.

In [None]:
# Write to Delta Lake table
print(f"Writing to Delta table: {STATIONS_TABLE}")
(
    stations_clean
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(STATIONS_TABLE)
)

# Verify table creation
stations_delta = spark.table(STATIONS_TABLE)
print(f"\n✓ Delta table created: {stations_delta.count()} rows")
print("\nSample data from Delta table:")
stations_delta.show(5, truncate=False)

## Step 5: Load Tube Lines CSV

Read London_tube_lines.csv and preview the data.

In [None]:
# Read tube lines CSV with header
tube_lines_df = (
    spark.read
    .option("header", "true")
    .csv(f"{BASE_PATH}/London_tube_lines.csv")
)

print(f"Tube line connections loaded: {tube_lines_df.count()} rows")
print("\nSchema:")
tube_lines_df.printSchema()
print("\nSample data:")
tube_lines_df.show(10, truncate=False)

# Show distinct tube lines
print("\nDistinct tube lines:")
tube_lines_df.select("Tube_Line").distinct().orderBy("Tube_Line").show(truncate=False)

## Step 6: Write Tube Lines to Delta Lake Table

Create Delta Lake table for tube lines data.

In [None]:
# Write to Delta Lake table
print(f"Writing to Delta table: {TUBE_LINES_TABLE}")
(
    tube_lines_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(TUBE_LINES_TABLE)
)

# Verify table creation
tube_lines_delta = spark.table(TUBE_LINES_TABLE)
print(f"\n✓ Delta table created: {tube_lines_delta.count()} rows")
print("\nSample data from Delta table:")
tube_lines_delta.show(10, truncate=False)

---

# Part 2: Load Stations to Neo4j

Write Station nodes from Delta Lake to Neo4j graph database.

---

## Step 7: Write Station Nodes to Neo4j

Load stations from Delta table and write to Neo4j as Station nodes.

In [None]:
print("="*80)
print("WRITING STATION NODES TO NEO4J")
print("="*80)

# Read from Delta table
stations = spark.table(STATIONS_TABLE)

print(f"\nWriting {stations.count()} Station nodes to Neo4j...")

# Write to Neo4j
(
    stations
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Station")
    .option("node.keys", "station_id")
    .save()
)

print("\n✓ Station nodes written successfully!")
print("="*80)

## Step 8: Create Index on Station ID

Create index for performance optimization.

In [None]:
# Create index on station_id for better query performance
index_query = "CREATE INDEX station_id IF NOT EXISTS FOR (s:Station) ON (s.station_id)"

print("Creating index on station_id...")
try:
    spark.read.format("org.neo4j.spark.DataSource") \
        .option("url", NEO4J_URL) \
        .option("authentication.basic.username", NEO4J_USER) \
        .option("authentication.basic.password", NEO4J_PASS) \
        .option("database", NEO4J_DB) \
        .option("query", index_query) \
        .load() \
        .collect()
    print("✓ Index created successfully")
except Exception as e:
    print(f"Index creation result: {str(e)}")

## Step 9: Verify Station Nodes

Query Neo4j to confirm stations were loaded correctly.

In [None]:
# Count stations in Neo4j
count_query = "MATCH (s:Station) RETURN count(s) as station_count"

station_count_df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("url", NEO4J_URL)
    .option("authentication.basic.username", NEO4J_USER)
    .option("authentication.basic.password", NEO4J_PASS)
    .option("query", count_query)
    .load()
)

print("Station count in Neo4j:")
station_count_df.show()

# Sample stations from Neo4j
sample_query = "MATCH (s:Station) RETURN s.station_id AS station_id, s.name AS name, s.zone AS zone, s.latitude AS latitude, s.longitude AS longitude"

sample_stations_df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("url", NEO4J_URL)
    .option("authentication.basic.username", NEO4J_USER)
    .option("authentication.basic.password", NEO4J_PASS)
    .option("query", sample_query)
    .load()
)

print("\nSample stations from Neo4j:")
sample_stations_df.show(truncate=False)

---

# Part 3: Create Tube Line Relationships

Create bidirectional relationships between stations for each tube line.

**Note:** Starting with Bakerloo line as proof-of-concept. Extend to all lines after validation.

---

## Step 10: Prepare Relationship Data

Filter tube lines data for relationship creation.

In [None]:
# Read tube lines from Delta table
tube_lines = spark.table(TUBE_LINES_TABLE)

# Filter to Bakerloo line for proof-of-concept
bakerloo_lines = tube_lines.filter(F.col("Tube_Line") == "Bakerloo")

print(f"Bakerloo line connections: {bakerloo_lines.count()}")
print("\nSample connections:")
bakerloo_lines.show(10, truncate=False)

## Step 11: Create Relationships Using Custom Cypher

Use custom Cypher query to create bidirectional relationships.

In [None]:
print("="*80)
print("CREATING BAKERLOO LINE RELATIONSHIPS")
print("="*80)

# Prepare data for relationship creation
bakerloo_data = bakerloo_lines.select(
    F.col("From_Station").alias("from_station"),
    F.col("To_Station").alias("to_station")
)

print(f"\nCreating relationships for {bakerloo_data.count()} connections...")

# Write relationships using Neo4j Spark Connector
# The connector will create relationships for each row in the DataFrame
(
    bakerloo_data
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "BAKERLOO")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Station")
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.node.keys", "from_station:name")
    .option("relationship.target.labels", ":Station")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.node.keys", "to_station:name")
    .save()
)

# Create reverse relationships for bidirectional connections
(
    bakerloo_data
    .select(
        F.col("to_station").alias("from_station"),
        F.col("from_station").alias("to_station")
    )
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "BAKERLOO")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Station")
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.node.keys", "from_station:name")
    .option("relationship.target.labels", ":Station")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.node.keys", "to_station:name")
    .save()
)

print("\n✓ Bakerloo line relationships created successfully!")
print("="*80)

## Step 12: Verify Relationships

Query Neo4j to confirm relationships were created correctly.

In [None]:
# Count Bakerloo relationships
rel_count_query = "MATCH ()-[r:BAKERLOO]->() RETURN count(r) as bakerloo_count"

rel_count_df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("url", NEO4J_URL)
    .option("authentication.basic.username", NEO4J_USER)
    .option("authentication.basic.password", NEO4J_PASS)
    .option("query", rel_count_query)
    .load()
)

print("Bakerloo relationship count:")
rel_count_df.show()

# Sample relationship paths
path_query = """
MATCH (from:Station)-[:BAKERLOO]->(to:Station)
RETURN from.name as from_station, to.name as to_station
LIMIT 10
"""

path_df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("url", NEO4J_URL)
    .option("authentication.basic.username", NEO4J_USER)
    .option("authentication.basic.password", NEO4J_PASS)
    .option("query", path_query)
    .load()
)

print("\nSample Bakerloo line connections:")
path_df.show(truncate=False)

---

# Part 4: Validation of Bakerloo Line Proof-of-Concept

Validate the Bakerloo line implementation before processing all tube lines.

---

## Bakerloo Line Validation

Verify the Bakerloo line was loaded correctly.

In [None]:
print("="*80)
print("BAKERLOO LINE VALIDATION")
print("="*80)

# Overall graph statistics
# FIXED: Using modern COUNT{} subqueries instead of OPTIONAL MATCH to avoid Cartesian product
# The previous query had uncorrelated patterns which created a cross product (every station × every relationship)
# COUNT{} subqueries are more efficient and clearer in intent - each subquery runs independently
stats_query = """
RETURN
  count{MATCH (s:Station)} as total_stations,
  count{MATCH ()-[r:BAKERLOO]->()} as total_bakerloo_relationships
"""

stats_df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("url", NEO4J_URL)
    .option("authentication.basic.username", NEO4J_USER)
    .option("authentication.basic.password", NEO4J_PASS)
    .option("query", stats_query)
    .load()
)

print("\nGraph Statistics:")
stats_df.show()

# Stations with most connections
# IMPROVED: Using modern COUNT{} subquery syntax for explicit, readable counting
# This makes it clear we're counting the pattern (s)-[:BAKERLOO]-() for each station
# Also supports future optimization by Neo4j query planner
connections_query = """
MATCH (s:Station)
WHERE count{(s)-[:BAKERLOO]-()} > 0
RETURN s.name as station, count{(s)-[:BAKERLOO]-()} as connections
ORDER BY connections DESC
LIMIT 5
"""

connections_df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("url", NEO4J_URL)
    .option("authentication.basic.username", NEO4J_USER)
    .option("authentication.basic.password", NEO4J_PASS)
    .option("query", connections_query)
    .load()
)

print("\nStations with Most Bakerloo Connections:")
connections_df.show(truncate=False)

print("\n" + "="*80)
print("✅ BAKERLOO LINE VALIDATED!")
print("="*80)

---

# Part 5: Load All Tube Lines

Now that Bakerloo line is validated, process all remaining tube lines with dynamic relationship types.

---

## Step 13: Get All Tube Lines

Retrieve all unique tube lines for processing.

In [None]:
# Get all unique tube lines (excluding Bakerloo since it's already done)
tube_lines_list = (
    tube_lines
    .select("Tube_Line")
    .distinct()
    .filter(F.col("Tube_Line") != "Bakerloo")  # Exclude Bakerloo - already loaded
    .rdd
    .flatMap(lambda x: x)
    .collect()
)

print(f"Found {len(tube_lines_list)} remaining tube lines to process:")
for line in sorted(tube_lines_list):
    print(f"  - {line}")

## Step 14: Create Dynamic Relationships for All Tube Lines

Process each tube line and create relationships with dynamic relationship types.

In [None]:
print("="*80)
print("CREATING RELATIONSHIPS FOR ALL TUBE LINES")
print("="*80)

# Process each tube line
for line in sorted(tube_lines_list):
    print(f"\nProcessing {line} line...")
    
    # Filter data for this line and prepare columns
    line_data = (
        tube_lines
        .filter(F.col("Tube_Line") == line)
        .select(
            F.col("From_Station").alias("from_station"),
            F.col("To_Station").alias("to_station")
        )
    )
    
    connection_count = line_data.count()
    
    # Create relationship type (sanitize line name for Neo4j)
    # Replace spaces and special characters with underscores
    rel_type = line.upper().replace(" ", "_").replace("&", "AND")
    
    # Write relationships using Neo4j Spark Connector
    (
        line_data
        .write
        .format("org.neo4j.spark.DataSource")
        .mode("Append")
        .option("relationship", rel_type)
        .option("relationship.save.strategy", "keys")
        .option("relationship.source.labels", ":Station")
        .option("relationship.source.save.mode", "Match")
        .option("relationship.source.node.keys", "from_station:name")
        .option("relationship.target.labels", ":Station")
        .option("relationship.target.save.mode", "Match")
        .option("relationship.target.node.keys", "to_station:name")
        .save()
    )
    
    # Create reverse relationships for bidirectional connections
    (
        line_data
        .select(
            F.col("to_station").alias("from_station"),
            F.col("from_station").alias("to_station")
        )
        .write
        .format("org.neo4j.spark.DataSource")
        .mode("Append")
        .option("relationship", rel_type)
        .option("relationship.save.strategy", "keys")
        .option("relationship.source.labels", ":Station")
        .option("relationship.source.save.mode", "Match")
        .option("relationship.source.node.keys", "from_station:name")
        .option("relationship.target.labels", ":Station")
        .option("relationship.target.save.mode", "Match")
        .option("relationship.target.node.keys", "to_station:name")
        .save()
    )
    
    print(f"✓ Created {connection_count} bidirectional connections for {line} line (:{rel_type})")

print("\n" + "="*80)
print(f"✅ ALL {len(tube_lines_list)} TUBE LINES LOADED SUCCESSFULLY!")
print("="*80)

---

# Part 6: Final Validation

Validate the complete London Transport Network graph with all tube lines.

---

## Step 15: Complete Graph Validation

Run comprehensive validation queries across all tube lines.

In [None]:
---

## Troubleshooting

**Connection errors:**
- Verify Neo4j is running
- Check firewall/network settings
- Confirm credentials in Databricks Secrets
- Ensure cluster access mode is "Dedicated"

**File not found errors:**
- Verify Unity Catalog volume path
- Confirm CSV files are uploaded
- Check catalog/schema names

**Relationship creation issues:**
- Ensure Station nodes exist before creating relationships
- Verify station names match exactly between CSV files
- Check for case sensitivity in station names

**Library issues:**
- Verify Neo4j Spark Connector is installed (Maven)
- Verify Neo4j Python Driver is installed (PyPI)
- Restart cluster if libraries were just installed

---

---

## Next Steps

To extend this implementation:

1. **Add more tube lines** - Modify Step 10 to process all tube lines instead of just Bakerloo
2. **Create dynamic relationship types** - Use the tube line name to create different relationship types (e.g., :CENTRAL, :DISTRICT)
3. **Add relationship properties** - Include additional metadata on relationships
4. **Optimize performance** - Batch process large relationship sets

---

## Troubleshooting

**Connection errors:**
- Verify Neo4j is running
- Check firewall/network settings
- Confirm credentials in Databricks Secrets

**File not found errors:**
- Verify Unity Catalog volume path
- Confirm CSV files are uploaded
- Check catalog/schema names

**Relationship creation issues:**
- Ensure Station nodes exist before creating relationships
- Verify station names match exactly between CSV files
- Check for case sensitivity in station names

---