In [0]:
# Install required packages
%pip install ucimlrepo

Collecting ucimlrepo
  Downloading ucimlrepo-0.0.7-py3-none-any.whl.metadata (5.5 kB)
Downloading ucimlrepo-0.0.7-py3-none-any.whl (8.0 kB)
Installing collected packages: ucimlrepo
Successfully installed ucimlrepo-0.0.7
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

CATALOG_NAME = "main"
SCHEMA_NAME = "retail_pipeline"
SOURCE_PATH = "uci_retail_data"

print("🏗️  INITIALIZING DATA PIPELINE")
print(f"Catalog: {CATALOG_NAME}")
print(f"Schema: {SCHEMA_NAME}")

🏗️  INITIALIZING DATA PIPELINE
Catalog: main
Schema: retail_pipeline


Data Ingestion Layer

In [0]:
def ingest_raw_data():
    """
    Ingest raw data from source system
    In production: This would connect to APIs, databases, file systems etc
    """
    print("📥 BRONZE LAYER: Raw Data Ingestion")

    from ucimlrepo import fetch_ucirepo

    online_retail = fetch_ucirepo(id=352)
    raw_df = spark.createDataFrame(online_retail.data.features)

    spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME}")
    print(f"✅ Database {CATALOG_NAME}.{SCHEMA_NAME} ready")

    # get data from uci
    from ucimlrepo import fetch_ucirepo

    online_retail = fetch_ucirepo(id=352)

    raw_df = spark.createDataFrame(online_retail.data.features)
    

    print("Raw data structures")
    display(raw_df)
    print("Raw data sample")
    display(raw_df.limit(10))


    # Add metadata
    ingestion_df = raw_df \
        .withColumn("ingestion_timestamp", current_timestamp()) \
        .withColumn("source_system", lit("UCI")) \
        .withColumn("batch_id", lit("batch_001"))
                    
    # Write data to brone table
    ingestion_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option(mergeSchema=True) \
        .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_raw_transactions")


    record_count = ingestion_df.count()
    print(f"✅ {record_count} records ingested to {CATALOG_NAME}.{SCHEMA_NAME}.bronze_raw_transactions")
    return record_count

# Execute
print("Starting Data Ingestion...")
bronze_ingestion_count = ingest_raw_data()
print(f"Data Ingestion Complete: {bronze_ingestion_count} records ingested")






Starting Data Ingestion...
📥 BRONZE LAYER: Raw Data Ingestion


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-8970861071262534>, line 50[0m
[1;32m     48[0m [38;5;66;03m# Execute[39;00m
[1;32m     49[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mStarting Data Ingestion...[39m[38;5;124m"[39m)
[0;32m---> 50[0m bronze_ingestion_count [38;5;241m=[39m ingest_raw_data()
[1;32m     51[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mData Ingestion Complete: [39m[38;5;132;01m{[39;00mbronze_ingestion_count[38;5;132;01m}[39;00m[38;5;124m records ingested[39m[38;5;124m"[39m)

File [0;32m<command-8970861071262534>, line 13[0m, in [0;36mingest_raw_data[0;34m()[0m
[1;32m     10[0m online_retail [38;5;241m=[39m fetch_ucirepo([38;5;28mid[39m[38;5;241m=[39m[38;5;241m352[39m)
[1;32m     11[0m raw_df [38;5;241m=[39m spark[38;5;241m.[39mcreateDataFram

Silver Layer Data Quality and Transformation

In [0]:
def create_silver_layer():
    """
    Clean and validate
    """
    print("🧹 SILVER LAYER: Data Cleansing and Validation")
    
    # Pull data in from the Bronze layer

    bronze_df = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_raw_transactions")

    print(f"Bronze records: {bronze_df.count()}")
    
    # Data Quality
    print("Applying rules")

    silver_df = bronze_df \
        .filter(col("InvoiceNo").cast("int") > 0) \
        .filter(col("Quantity")) \
        .filter(col("CustomerID").isNotNull()) \
        .filter(col("Quantity") > 0) \
        .filter(col("UnitPrice") > 0) \
        .filter(col("Description").isNotNull()) \
        .filter(col("InvoiceNo").isNotNull()) \
        .withColumn("InvoiceDateTime", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")) \
        .withColumn("Revenue", col("Quantity") * col("UnitPrice")) \
        .withColumn("Year", year("InvoiceDateTime")) \
        .withColumn("Month", month("InvoiceDateTime")) \
        .withColumn("DayOfWeek", dayofweek("InvoiceDateTime")) \
        .withColumn("Quarter", quarter("InvoiceDateTime")) \
        .withColumn("processed_timestamp", current_timestamp()) \
        .withColumn("data_quality_score", lit(100))

    # Additional business logic
    silver_enriched = silver_df \
        .withColumn("revenue_category",
                    when(col("Revenue") >= 1000, "High") \
                    .when(col("Revenue") >= 500, "Medium") \
                    .otherwise("Low")) \
        .withColumn("quantity_category",
                    when(col("Quantity") >= 100, "High") \
                    .when(col("Quantity") >= 50, "Medium") \
                    .otherwise("Low")) \
        .withColumn("season",
                    when(col("Month").isin(12, 1, 2), "Winter") 
                    .when(col("Month").isin(3, 4, 5), "Spring") 
                    .when(col("Month").isin(6, 7, 8), "Summer") 
                    .when(col("Month").isin(9, 10, 11), "Autumn")) 
        
    # write silver to the delta table
    silver_enriched.write \
        .format("delta") \
        .mode("overwrite") \
        .option(mergeSchema=True) \
        .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.silver_clean_transactions")

    # Data Quality Metrics
    bronze_count = bronze_df.count()
    silver_count = silver_enriched.count()
    quality_pass_rate = (silver_count / bronze_count) * 100

    print(f"✅ Silver layer created: {silver_count:,} records")
    print(f"📊 Data Quality Pass Rate: {quality_pass_rate:.1f}%")
    print(f"❌ Records filtered out: {bronze_count - silver_count:,}")
    
    # Show sample of cleaned data
    print("📋 Sample of Silver data:")
    silver_enriched.select("InvoiceNo", "Description", "Quantity", "Revenue", 
                          "revenue_category", "season", "Year", "Month").show(10)
    
    return silver_count, quality_pass_rate

# Execute Silver layer transformation
print("🚀 STARTING SILVER LAYER TRANSFORMATION...")
silver_count, quality_rate = create_silver_layer()
print(f"🎉 SILVER LAYER COMPLETE: {silver_count:,} clean records ({quality_rate:.1f}% pass rate)")


🚀 STARTING SILVER LAYER TRANSFORMATION...
🧹 SILVER LAYER: Data Cleansing and Validation


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-8765916674596184>, line 74[0m
[1;32m     72[0m [38;5;66;03m# Execute Silver layer transformation[39;00m
[1;32m     73[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124m🚀 STARTING SILVER LAYER TRANSFORMATION...[39m[38;5;124m"[39m)
[0;32m---> 74[0m silver_count, quality_rate [38;5;241m=[39m create_silver_layer()
[1;32m     75[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124m🎉 SILVER LAYER COMPLETE: [39m[38;5;132;01m{[39;00msilver_count[38;5;132;01m:[39;00m[38;5;124m,[39m[38;5;132;01m}[39;00m[38;5;124m clean records ([39m[38;5;132;01m{[39;00mquality_rate[38;5;132;01m:[39;00m[38;5;124m.1f[39m[38;5;132;01m}[39;00m[38;5;124m% pass rate)[39m[38;5;124m"[39m)

File [0;32m<command-8765916674596184>, line 9[0m, in [0;36mcreate_silver_layer[0

Data Quality Report

In [0]:
def gen_dq_report():
    """
    Generate data quality report
    """
    print("📊 GENERATING DATA QUALITY REPORT")  

    bronze_df = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_raw_transactions")
    silver_df = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.silver_clean_transactions")

    bronze_count = bronze_df.count()
    silver_count = silver_df.count()
    quality_pass_rate = (silver_count / bronze_count) * 100

    # Detailed quality checks on Bronze data
    quality_checks = {
        "null_customer_ids": bronze_df.filter(col("CustomerID").isNull()).count(),
        "negative_quantities": bronze_df.filter(col("Quantity") <= 0).count(),
        "zero_prices": bronze_df.filter(col("UnitPrice") <= 0).count(),
        "null_descriptions": bronze_df.filter(col("Description").isNull()).count(),
        "null_invoice_numbers": bronze_df.filter(col("InvoiceNo").isNull()).count()
    }

    print(f"\n📋 DATA QUALITY ISSUES IN BRONZE:")
    total_issues = 0
    for issue, count in quality_checks.items():
        percentage = (count / bronze_count) * 100
        print(f"  • {issue.replace('_', ' ').title()}: {count:,} ({percentage:.2f}%)")
        total_issues += count

        # Business metrics from Silver
    print(f"\n💰 BUSINESS METRICS FROM SILVER:")
    
    revenue_stats = silver_df.agg(
        sum("Revenue").alias("total_revenue"),
        avg("Revenue").alias("avg_revenue"),
        max("Revenue").alias("max_revenue"),
        min("Revenue").alias("min_revenue")
    ).collect()[0]
    
    print(f"  • Total Revenue: £{revenue_stats['total_revenue']:,.2f}")
    print(f"  • Average Transaction: £{revenue_stats['avg_revenue']:.2f}")
    print(f"  • Largest Transaction: £{revenue_stats['max_revenue']:.2f}")
    print(f"  • Smallest Transaction: £{revenue_stats['min_revenue']:.2f}")
    
    # Customer and product diversity
    unique_customers = silver_df.select("CustomerID").distinct().count()
    unique_products = silver_df.select("StockCode").distinct().count()
    unique_countries = silver_df.select("Country").distinct().count()
    
    print(f"\n🌍 DATA DIVERSITY:")
    print(f"  • Unique Customers: {unique_customers:,}")
    print(f"  • Unique Products: {unique_products:,}")
    print(f"  • Countries: {unique_countries:,}")
    
    # Revenue categories distribution
    print(f"\n📊 REVENUE DISTRIBUTION:")
    revenue_dist = silver_df.groupBy("revenue_category").count().orderBy(desc("count"))
    revenue_dist.show()
    
    return quality_checks, revenue_stats

# Generate the report
print("📋 GENERATING DATA QUALITY REPORT...")
dq_issues, business_metrics = gen_dq_report()



Gold Layer

In [0]:
def create_gold_layer():
    """
    Create business-ready aggregated tables for analytics and reporting
    """
    print("🏆 GOLD LAYER: Business Metrics & Aggregations")
    
    # Read from Silver layer
    silver_df = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.silver_clean_transactions")
    
    print(f"📊 Processing {silver_df.count():,} silver records...")
    
    print("📈 Creating monthly sales metrics...")
    
    monthly_metrics = silver_df \
        .groupBy("Year", "Month", "season") \
        .agg(
            sum("Revenue").alias("total_revenue"),
            sum("Quantity").alias("total_items_sold"),
            count("InvoiceNo").alias("total_transactions"),
            countDistinct("CustomerID").alias("unique_customers"),
            countDistinct("StockCode").alias("unique_products_sold"),
            avg("Revenue").alias("avg_transaction_value"),
            max("Revenue").alias("max_transaction"),
            min("Revenue").alias("min_transaction")
        ) \
        .withColumn("revenue_per_customer", col("total_revenue") / col("unique_customers")) \
        .withColumn("items_per_transaction", col("total_items_sold") / col("total_transactions")) \
        .withColumn("created_at", current_timestamp()) \
        .orderBy("Year", "Month")
    
    # Write monthly metrics
    monthly_metrics.write \
        .format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.gold_monthly_metrics")
    
    print("✅ Monthly metrics table created")
    
    print("👥 Creating customer segmentation...")
    
    customer_segments = silver_df \
        .groupBy("CustomerID", "Country") \
        .agg(
            sum("Revenue").alias("total_spent"),
            sum("Quantity").alias("total_items_purchased"),
            count("InvoiceNo").alias("total_orders"),
            countDistinct("StockCode").alias("unique_products_bought"),
            max("InvoiceDateTime").alias("last_purchase_date"),
            min("InvoiceDateTime").alias("first_purchase_date"),
            avg("Revenue").alias("avg_order_value")
        ) \
        .withColumn("customer_lifetime_days", 
                   datediff(col("last_purchase_date"), col("first_purchase_date"))) \
        .withColumn("customer_segment", 
                   when(col("total_spent") >= 2000, "VIP")
                   .when(col("total_spent") >= 1000, "High Value")
                   .when(col("total_spent") >= 500, "Medium Value")
                   .otherwise("Low Value")) \
        .withColumn("purchase_frequency_segment",
                   when(col("total_orders") >= 50, "Frequent")
                   .when(col("total_orders") >= 20, "Regular")
                   .when(col("total_orders") >= 10, "Occasional")
                   .otherwise("Rare")) \
        .withColumn("created_at", current_timestamp())
    
    # Write customer segments
    customer_segments.write \
        .format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.gold_customer_segments")
    
    print("✅ Customer segmentation table created")
    
    print("📦 Creating product performance metrics...")
    
    product_metrics = silver_df \
        .groupBy("StockCode", "Description") \
        .agg(
            sum("Revenue").alias("total_revenue"),
            sum("Quantity").alias("total_quantity_sold"),
            count("InvoiceNo").alias("times_ordered"),
            countDistinct("CustomerID").alias("unique_buyers"),
            avg("UnitPrice").alias("avg_unit_price"),
            max("UnitPrice").alias("max_unit_price"),
            min("UnitPrice").alias("min_unit_price")
        ) \
        .withColumn("revenue_per_unit", col("total_revenue") / col("total_quantity_sold")) \
        .withColumn("avg_quantity_per_order", col("total_quantity_sold") / col("times_ordered")) \
        .withColumn("product_popularity_score", 
                   (col("unique_buyers") * 0.4) + (col("times_ordered") * 0.6)) \
        .withColumn("product_performance_tier",
                   when(col("total_revenue") >= 10000, "Top Performer")
                   .when(col("total_revenue") >= 5000, "High Performer")
                   .when(col("total_revenue") >= 1000, "Medium Performer")
                   .otherwise("Low Performer")) \
        .withColumn("created_at", current_timestamp()) \
        .orderBy(desc("total_revenue"))
    
    # Write product metrics
    product_metrics.write \
        .format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.gold_product_metrics")
    
    print("✅ Product performance table created")
    
    print("🌍 Creating geographic performance metrics...")
    
    country_metrics = silver_df \
        .groupBy("Country") \
        .agg(
            sum("Revenue").alias("total_revenue"),
            sum("Quantity").alias("total_items_sold"),
            count("InvoiceNo").alias("total_transactions"),
            countDistinct("CustomerID").alias("unique_customers"),
            countDistinct("StockCode").alias("unique_products_sold"),
            avg("Revenue").alias("avg_transaction_value")
        ) \
        .withColumn("revenue_per_customer", col("total_revenue") / col("unique_customers")) \
        .withColumn("market_share_rank", row_number().over(Window.orderBy(desc("total_revenue")))) \
        .withColumn("created_at", current_timestamp()) \
        .orderBy(desc("total_revenue"))
    
    # Write country metrics
    country_metrics.write \
        .format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.gold_country_metrics")
    
    print("✅ Geographic metrics table created")
    
    # Summary of Gold layer
    tables_created = [
        "gold_monthly_metrics",
        "gold_customer_segments", 
        "gold_product_metrics",
        "gold_country_metrics"
    ]
    
    print(f"\n🎉 GOLD LAYER COMPLETE!")
    print("📊 Business-ready tables created:")
    for table in tables_created:
        table_count = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.{table}").count()
        print(f"  • {table}: {table_count:,} records")
    
    return tables_created

# Execute Gold layer creation
print("🚀 STARTING GOLD LAYER CREATION...")
gold_tables = create_gold_layer()
print(f"🏆 GOLD LAYER COMPLETE: {len(gold_tables)} business tables created!")

BI Views

In [0]:
# =============================================================================
# BUSINESS INTELLIGENCE VIEWS & INSIGHTS
# =============================================================================

def create_business_views():
    """
    Create views for executives, analysts, and dashboard consumption
    """
    print("📊 CREATING BUSINESS INTELLIGENCE VIEWS")
    
    # =================================================================
    # EXECUTIVE DASHBOARD VIEW
    # =================================================================
    spark.sql(f"""
    CREATE OR REPLACE VIEW {CATALOG_NAME}.{SCHEMA_NAME}.executive_dashboard AS
    SELECT 
        Year,
        Month,
        season,
        total_revenue,
        unique_customers,
        total_transactions,
        avg_transaction_value,
        revenue_per_customer,
        ROUND((total_revenue - LAG(total_revenue) OVER (ORDER BY Year, Month)) / 
              LAG(total_revenue) OVER (ORDER BY Year, Month) * 100, 2) as revenue_growth_pct,
        ROUND((unique_customers - LAG(unique_customers) OVER (ORDER BY Year, Month)) / 
              LAG(unique_customers) OVER (ORDER BY Year, Month) * 100, 2) as customer_growth_pct
    FROM {CATALOG_NAME}.{SCHEMA_NAME}.gold_monthly_metrics
    ORDER BY Year, Month
    """)
    
    # =================================================================
    # TOP PERFORMING CUSTOMERS VIEW
    # =================================================================
    spark.sql(f"""
    CREATE OR REPLACE VIEW {CATALOG_NAME}.{SCHEMA_NAME}.vip_customers AS
    SELECT 
        CustomerID,
        Country,
        total_spent,
        total_orders,
        avg_order_value,
        customer_segment,
        purchase_frequency_segment,
        unique_products_bought,
        DATEDIFF(CURRENT_DATE(), last_purchase_date) as days_since_last_purchase,
        CASE 
            WHEN DATEDIFF(CURRENT_DATE(), last_purchase_date) <= 30 THEN 'Active'
            WHEN DATEDIFF(CURRENT_DATE(), last_purchase_date) <= 90 THEN 'At Risk'
            ELSE 'Inactive'
        END as customer_status
    FROM {CATALOG_NAME}.{SCHEMA_NAME}.gold_customer_segments
    WHERE customer_segment IN ('VIP', 'High Value')
    ORDER BY total_spent DESC
    """)
    
    # =================================================================
    # PRODUCT PERFORMANCE DASHBOARD
    # =================================================================
    spark.sql(f"""
    CREATE OR REPLACE VIEW {CATALOG_NAME}.{SCHEMA_NAME}.top_products AS
    SELECT 
        StockCode,
        Description,
        total_revenue,
        total_quantity_sold,
        unique_buyers,
        product_performance_tier,
        ROUND(product_popularity_score, 2) as popularity_score,
        ROUND(revenue_per_unit, 2) as revenue_per_unit,
        ROUND(avg_quantity_per_order, 1) as avg_qty_per_order
    FROM {CATALOG_NAME}.{SCHEMA_NAME}.gold_product_metrics
    WHERE product_performance_tier IN ('Top Performer', 'High Performer')
    ORDER BY total_revenue DESC
    """)
    
    print("✅ Business Intelligence views created:")
    print("  • executive_dashboard")
    print("  • vip_customers") 
    print("  • top_products")

# Create BI views
create_business_views()