# Complete Pipeline Test

This notebook demonstrates the complete data pipeline from ingestion to validation using sample data.

## Pipeline Overview
1. **Data Ingestion** (Bronze Layer)
2. **Data Transformation** (Silver Layer) 
3. **Data Aggregation** (Gold Layer)
4. **Data Quality Validation**

## Sample Data
We'll use a small sample dataset to test the entire pipeline.


## Step 1: Data Ingestion (Bronze Layer)


In [None]:
# Load sample data (Bronze Layer)
from pyspark.sql.functions import current_timestamp, lit

# Try to load from sample data first, fallback to databricks-datasets
try:
    df = spark.read.csv("/FileStore/sample_airlines.csv", header=True, inferSchema=True)
    print("✅ Loaded sample data from FileStore")
except:
    try:
        df = spark.read.csv("/databricks-datasets/airlines", header=True, inferSchema=True)
        print("✅ Loaded data from databricks-datasets")
    except:
        # Create sample data if neither works
        sample_data = [
            (2001, 1, 1, 848, 8, 923, 3, "AA", 1, "N319AA", "JFK", "LAX", 339, 2475, 14, 8),
            (2001, 1, 1, 850, 10, 1006, 6, "AA", 2, "N319AA", "JFK", "LAX", 336, 2475, 14, 10),
            (2001, 1, 1, 923, 23, 1004, 4, "AA", 3, "N319AA", "JFK", "LAX", 321, 2475, 15, 23),
            (2001, 1, 1, 1007, 7, 1130, 0, "AA", 4, "N319AA", "JFK", "LAX", 323, 2475, 16, 7),
            (2001, 1, 1, 1249, 9, 1518, 8, "AA", 5, "N319AA", "JFK", "LAX", 329, 2475, 20, 49)
        ]
        columns = ["year", "month", "day", "dep_time", "dep_delay", "arr_time", "arr_delay", 
                  "carrier", "flight", "tailnum", "origin", "dest", "air_time", "distance", "hour", "minute"]
        df = spark.createDataFrame(sample_data, columns)
        print("✅ Created sample data")

# Add metadata columns for Bronze layer
bronze_df = df.withColumn("ingestion_timestamp", current_timestamp())
bronze_df = bronze_df.withColumn("data_source", lit("sample_data"))
bronze_df = bronze_df.withColumn("layer", lit("bronze"))

print(f"Bronze Layer - Raw Data:")
print(f"Records: {bronze_df.count()}")
print(f"Columns: {len(bronze_df.columns)}")
bronze_df.show(5)


In [None]:
# Save Bronze layer
bronze_df.write.format("delta").mode("overwrite").save("/delta/airlines_bronze")
print("✅ Bronze layer saved successfully!")


## Step 2: Data Transformation (Silver Layer)


In [None]:
# Load Bronze data and clean it (Silver Layer)
from pyspark.sql.functions import col, when, isnan, isnull, trim, upper

bronze_df = spark.read.format("delta").load("/delta/airlines_bronze")

print("Data Quality Checks:")
print(f"Total records: {bronze_df.count()}")
print(f"Records with null carrier: {bronze_df.filter(bronze_df.carrier.isNull()).count()}")
print(f"Records with null flight: {bronze_df.filter(bronze_df.flight.isNull()).count()}")

# Clean and validate data
silver_df = (bronze_df.filter(
    col("carrier").isNotNull() & 
    col("flight").isNotNull() &
    col("origin").isNotNull() &
    col("dest").isNotNull()
)
.withColumn("carrier", trim(upper(col("carrier"))))
.withColumn("origin", trim(upper(col("origin"))))
.withColumn("dest", trim(upper(col("dest"))))
.withColumn("layer", lit("silver"))
.withColumn("processing_timestamp", current_timestamp()))

print(f"\nSilver Layer - Cleaned Data:")
print(f"Records after cleaning: {silver_df.count()}")
silver_df.show(5)


In [None]:
# Save Silver layer
silver_df.write.format("delta").mode("overwrite").save("/delta/airlines_silver")
print("✅ Silver layer saved successfully!")


## Step 3: Data Aggregation (Gold Layer)


In [None]:
# Load Silver data and create business metrics (Gold Layer)
from pyspark.sql.functions import count, avg, sum, max, min

silver_df = spark.read.format("delta").load("/delta/airlines_silver")

# Create carrier performance metrics
carrier_metrics = (silver_df.groupBy("carrier").agg(
    count("*").alias("total_flights"),
    avg("dep_delay").alias("avg_departure_delay"),
    avg("arr_delay").alias("avg_arrival_delay"),
    avg("distance").alias("avg_distance")
)
.withColumn("layer", lit("gold"))
.withColumn("aggregation_timestamp", current_timestamp()))

print("Gold Layer - Carrier Metrics:")
carrier_metrics.show()

# Save Gold layer
carrier_metrics.write.format("delta").mode("overwrite").save("/delta/airlines_gold")
print("✅ Gold layer saved successfully!")


## Step 4: Data Quality Validation


In [None]:
# Data Quality Validation
from pyspark.sql.functions import col, count, isnan, isnull

# Load Gold data for validation
gold_df = spark.read.format("delta").load("/delta/airlines_gold")

print("=== DATA QUALITY REPORT ===")
print(f"Table: /delta/airlines_gold")
print(f"Total records: {gold_df.count()}")
print(f"Columns: {len(gold_df.columns)}")

# Check for null values
print("\n1. NULL VALUE ANALYSIS:")
for column in gold_df.columns:
    null_count = gold_df.filter(col(column).isNull()).count()
    total_count = gold_df.count()
    null_percentage = (null_count / total_count) * 100 if total_count > 0 else 0
    print(f"  {column}: {null_count} nulls ({null_percentage:.2f}%)")

# Check for duplicates
print("\n2. DUPLICATE ANALYSIS:")
duplicate_count = gold_df.count() - gold_df.dropDuplicates().count()
print(f"  Duplicate records: {duplicate_count}")

# Data freshness check
print("\n3. DATA FRESHNESS:")
if "aggregation_timestamp" in gold_df.columns:
    latest_timestamp = gold_df.select("aggregation_timestamp").orderBy(col("aggregation_timestamp").desc()).first()[0]
    print(f"  Latest aggregation time: {latest_timestamp}")
else:
    print("  No timestamp column found")

# Show final results
print("\n4. FINAL DATA:")
gold_df.show()

print("\n✅ Pipeline completed successfully!")
print("📊 Summary:")
print(f"  - Bronze records: {spark.read.format('delta').load('/delta/airlines_bronze').count()}")
print(f"  - Silver records: {spark.read.format('delta').load('/delta/airlines_silver').count()}")
print(f"  - Gold records: {gold_df.count()}")
