# Aircraft ETL to Neo4j

This notebook loads Aircraft, System, and Component data from Databricks Unity Catalog into Neo4j Aura using the Neo4j Spark Connector.

## What You'll Learn
- How to read CSV data from Unity Catalog Volumes
- How to transform tabular data for graph loading
- How to write nodes and relationships using the Neo4j Spark Connector
- How to verify data with Cypher queries from Databricks

## Prerequisites
- Neo4j Aura credentials from Lab 1
- Access to the workshop Databricks cluster (with Neo4j Spark Connector installed)

## Instructions
1. Clone this notebook to your personal folder
2. Enter your Neo4j credentials in the Configuration cell below
3. Run all cells in order (Shift+Enter or Run All)
4. Verify the results in the final cells

## Section 1: Configuration

Enter your Neo4j Aura connection details below. You received these credentials when you created your Neo4j Aura instance in Lab 1.

In [None]:
# ============================================
# CONFIGURATION - Enter your Neo4j credentials
# ============================================

NEO4J_URI = ""  # e.g., "neo4j+s://xxxxxxxx.databases.neo4j.io"
NEO4J_USERNAME = "neo4j"
NEO4J_PASSWORD = ""  # Your password from Lab 1

# Unity Catalog Volume path (pre-configured by workshop admin)
DATA_PATH = "/Volumes/aws-databricks-neo4j-lab/lab-schema/lab-volume"

# Validate configuration
if not NEO4J_URI or not NEO4J_PASSWORD:
    print("WARNING: Please enter your Neo4j credentials above before running the notebook!")
else:
    print("Configuration ready!")
    print(f"Neo4j URI: {NEO4J_URI}")
    print(f"Data Path: {DATA_PATH}")

In [None]:
# Configure Neo4j Spark Connector
spark.conf.set("neo4j.url", NEO4J_URI)
spark.conf.set("neo4j.authentication.basic.username", NEO4J_USERNAME)
spark.conf.set("neo4j.authentication.basic.password", NEO4J_PASSWORD)
spark.conf.set("neo4j.database", "neo4j")

print("Spark configured for Neo4j connection")

## Section 2: Data Preview

Let's examine the CSV files that we'll load into Neo4j. The data represents an Aircraft Digital Twin with three entity types:

- **Aircraft**: Fleet of 20 aircraft with tail numbers, models, and operators
- **System**: Major systems on each aircraft (engines, avionics, hydraulics)
- **Component**: Parts within each system (fans, compressors, turbines, etc.)

The graph structure will be:
```
(Aircraft) -[:HAS_SYSTEM]-> (System) -[:HAS_COMPONENT]-> (Component)
```

In [None]:
# Helper function to read CSV files
def read_csv(filename):
    """Read a CSV file from the Unity Catalog Volume."""
    path = f"{DATA_PATH}/{filename}"
    return spark.read.option("header", "true").csv(path)

In [None]:
# Read all CSV files
aircraft_df = read_csv("nodes_aircraft.csv")
systems_df = read_csv("nodes_systems.csv")
components_df = read_csv("nodes_components.csv")

print("=" * 60)
print("DATA LOADED FROM UNITY CATALOG")
print("=" * 60)
print(f"Aircraft:   {aircraft_df.count()} rows")
print(f"Systems:    {systems_df.count()} rows")
print(f"Components: {components_df.count()} rows")
print("=" * 60)

### Aircraft Data

Each aircraft has a unique ID, tail number, model, manufacturer, and operator.

In [None]:
display(aircraft_df)

### Systems Data

Each system belongs to an aircraft (via `aircraft_id`) and has a type (Engine, Avionics, Hydraulics).

In [None]:
display(systems_df)

### Components Data

Each component belongs to a system (via `system_id`) and has a type (Fan, Compressor, Turbine, etc.).

In [None]:
display(components_df)

## Section 3: Load Nodes to Neo4j

Now we'll write the data to Neo4j as graph nodes. The Neo4j Spark Connector writes DataFrames directly to Neo4j.

**Key Concepts:**
- Each DataFrame row becomes a node
- Column values become node properties
- The `labels` option sets the node label (e.g., `:Aircraft`)
- The `node.keys` option identifies the unique key property

In [None]:
# Helper function to write nodes
def write_nodes(df, label, id_column):
    """Write a DataFrame as nodes to Neo4j."""
    (df
     .write
     .format("org.neo4j.spark.DataSource")
     .mode("Overwrite")
     .option("labels", f":{label}")
     .option("node.keys", id_column)
     .save())
    count = df.count()
    print(f"Wrote {count} {label} nodes to Neo4j")
    return count

### Transform and Load Aircraft Nodes

In [None]:
# Transform: Rename the ID column from Neo4j import format to standard name
aircraft_clean = aircraft_df.withColumnRenamed(":ID(Aircraft)", "aircraft_id")

# Show the transformed schema
print("Aircraft schema:")
aircraft_clean.printSchema()

In [None]:
# Write Aircraft nodes to Neo4j
aircraft_count = write_nodes(aircraft_clean, "Aircraft", "aircraft_id")

### Transform and Load System Nodes

In [None]:
# Transform System data
systems_clean = systems_df.withColumnRenamed(":ID(System)", "system_id")

# Write System nodes to Neo4j
systems_count = write_nodes(systems_clean, "System", "system_id")

### Transform and Load Component Nodes

In [None]:
# Transform Component data
components_clean = components_df.withColumnRenamed(":ID(Component)", "component_id")

# Write Component nodes to Neo4j
components_count = write_nodes(components_clean, "Component", "component_id")

## Section 4: Load Relationships to Neo4j

Now we'll create the relationships that connect our nodes:
- `HAS_SYSTEM`: Connects Aircraft to their Systems
- `HAS_COMPONENT`: Connects Systems to their Components

**Key Concepts:**
- The `relationship` option specifies the relationship type
- The `relationship.save.strategy` of `keys` matches nodes by their key properties
- Source and target labels/keys identify which nodes to connect

In [None]:
# Helper function to write relationships
def write_relationships(df, rel_type, source_label, source_key, target_label, target_key):
    """Write relationships to Neo4j using keys strategy."""
    (df
     .write
     .format("org.neo4j.spark.DataSource")
     .mode("Overwrite")
     .option("relationship", rel_type)
     .option("relationship.save.strategy", "keys")
     .option("relationship.source.labels", f":{source_label}")
     .option("relationship.source.node.keys", source_key)
     .option("relationship.target.labels", f":{target_label}")
     .option("relationship.target.node.keys", target_key)
     .save())
    count = df.count()
    print(f"Wrote {count} {rel_type} relationships to Neo4j")
    return count

### Load HAS_SYSTEM Relationships

Connect each Aircraft to its Systems.

In [None]:
# Read and transform relationship data
aircraft_system_df = read_csv("rels_aircraft_system.csv")

# Rename columns to match our node keys
aircraft_system_clean = (aircraft_system_df
    .withColumnRenamed(":START_ID(Aircraft)", "aircraft_id")
    .withColumnRenamed(":END_ID(System)", "system_id"))

print("HAS_SYSTEM relationship data:")
display(aircraft_system_clean.limit(5))

In [None]:
# Write HAS_SYSTEM relationships
has_system_count = write_relationships(
    aircraft_system_clean,
    "HAS_SYSTEM",
    "Aircraft", "aircraft_id",
    "System", "system_id"
)

### Load HAS_COMPONENT Relationships

Connect each System to its Components.

In [None]:
# Read and transform relationship data
system_component_df = read_csv("rels_system_component.csv")

# Rename columns to match our node keys
system_component_clean = (system_component_df
    .withColumnRenamed(":START_ID(System)", "system_id")
    .withColumnRenamed(":END_ID(Component)", "component_id"))

print("HAS_COMPONENT relationship data:")
display(system_component_clean.limit(5))

In [None]:
# Write HAS_COMPONENT relationships
has_component_count = write_relationships(
    system_component_clean,
    "HAS_COMPONENT",
    "System", "system_id",
    "Component", "component_id"
)

## ETL Complete!

Summary of data loaded to Neo4j:

In [None]:
print("=" * 60)
print("ETL COMPLETE!")
print("=" * 60)
print()
print("NODES LOADED:")
print(f"  Aircraft:   {aircraft_count}")
print(f"  System:     {systems_count}")
print(f"  Component:  {components_count}")
print(f"  ---------------------")
print(f"  Total:      {aircraft_count + systems_count + components_count}")
print()
print("RELATIONSHIPS LOADED:")
print(f"  HAS_SYSTEM:    {has_system_count}")
print(f"  HAS_COMPONENT: {has_component_count}")
print(f"  ---------------------")
print(f"  Total:         {has_system_count + has_component_count}")
print()
print("=" * 60)

## Section 5: Verification Queries

Let's verify the data loaded correctly by running Cypher queries from Databricks.

In [None]:
# Helper function to run Cypher queries
def run_cypher(query):
    """Execute a Cypher query and return results as DataFrame."""
    return (spark.read
        .format("org.neo4j.spark.DataSource")
        .option("query", query)
        .load())

### Verify Node Counts

In [None]:
print("Node counts by label:")
result = run_cypher("""
    MATCH (n)
    RETURN labels(n)[0] AS NodeType, count(*) AS Count
    ORDER BY NodeType
""")
display(result)

### Verify Relationship Counts

In [None]:
print("Relationship counts by type:")
result = run_cypher("""
    MATCH ()-[r]->()
    RETURN type(r) AS RelType, count(*) AS Count
    ORDER BY RelType
""")
display(result)

### Sample Query: Aircraft Hierarchy

View the complete hierarchy for aircraft N95040A (a Boeing 737-800).

In [None]:
result = run_cypher("""
    MATCH (a:Aircraft {tail_number: 'N95040A'})-[:HAS_SYSTEM]->(s:System)
    OPTIONAL MATCH (s)-[:HAS_COMPONENT]->(c:Component)
    RETURN a.tail_number AS Aircraft,
           a.model AS Model,
           s.name AS System,
           s.type AS SystemType,
           collect(c.name) AS Components
    ORDER BY s.type, s.name
""")
display(result)

### Sample Query: Fleet by Manufacturer

In [None]:
result = run_cypher("""
    MATCH (a:Aircraft)
    RETURN a.manufacturer AS Manufacturer,
           count(a) AS AircraftCount,
           collect(a.model) AS Models
    ORDER BY AircraftCount DESC
""")
display(result)

### Sample Query: Component Distribution

In [None]:
result = run_cypher("""
    MATCH (c:Component)
    RETURN c.type AS ComponentType, count(c) AS Count
    ORDER BY Count DESC
""")
display(result)

## Section 6: Next Steps - Explore in Neo4j Aura

Now that the data is loaded, open your Neo4j Aura console to visualize the graph!

### How to Access Neo4j Aura
1. Go to [console.neo4j.io](https://console.neo4j.io)
2. Sign in with your Neo4j account
3. Click on your instance
4. Click **Query** to open the query interface

### Visualization Queries to Try

**See one aircraft's complete hierarchy:**
```cypher
MATCH (a:Aircraft {tail_number: 'N95040A'})-[r1:HAS_SYSTEM]->(s:System)-[r2:HAS_COMPONENT]->(c:Component)
RETURN a, r1, s, r2, c
```

**Compare aircraft by operator:**
```cypher
MATCH (a:Aircraft)
RETURN a.operator AS Operator, count(a) AS Count
```

**Find all engine components:**
```cypher
MATCH (s:System {type: 'Engine'})-[:HAS_COMPONENT]->(c:Component)
RETURN c.type AS ComponentType, count(c) AS Count
ORDER BY Count DESC
```

### Exploration Tips
- Click on nodes in the visualization to see their properties
- Double-click to expand connected nodes
- Use the styling panel to color nodes by property (e.g., manufacturer)

## Congratulations!

You have successfully:
- Read CSV data from Databricks Unity Catalog
- Transformed tabular data for graph loading
- Written nodes and relationships to Neo4j using the Spark Connector
- Verified the data with Cypher queries

**Next:** Open `02_load_neo4j_full` to load the complete dataset (Sensors, Flights, Airports, Delays, Maintenance Events, Removals) required for Lab 7.