# üìä Data Pipeline with Metrics Tracking

This notebook demonstrates a complete **Bronze ‚Üí Silver ‚Üí Gold** data pipeline with `dst_metrics` tracking at each layer.

## What You'll Learn

| Layer | Purpose | Metrics Logged |
|-------|---------|----------------|
| **Bronze** | Raw data ingestion | Files processed, rows loaded |
| **Silver** | Data quality & cleaning | Null counts, duplicate counts, valid rows |
| **Gold** | Business transformations | Join results, aggregations |

All metrics are stored in a Delta table for auditing and monitoring.


## üîß Setup


In [None]:
from shared.spark_connector import SparkConnector
from dst_metrics import df_count, df_avg, count_files
from pyspark.sql.functions import col, when, count as spark_count, sum as spark_sum, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Create Spark session with S3/MinIO connectivity
connector = SparkConnector(size="M")
spark = connector.session

# Define storage paths (using MinIO S3)
BRONZE_PATH = "s3a://polaris/demo/bronze"
SILVER_PATH = "s3a://polaris/demo/silver"
GOLD_PATH = "s3a://polaris/demo/gold"
# Metrics automatically write to s3a://<bucket>/system/metrics/activity_log

print("‚úÖ Spark session created")
print(f"üìÅ Bronze: {BRONZE_PATH}")
print(f"üìÅ Silver: {SILVER_PATH}")
print(f"üìÅ Gold: {GOLD_PATH}")
print(f"üìä Metrics: automatic (via connector.metric_context)")


## üì¶ Create Sample Raw Data

Let's create realistic sample data with some quality issues (nulls, duplicates) to demonstrate the pipeline.


In [None]:
# Sample Customers data (with some nulls and issues)
customers_data = [
    (1, "Alice", "alice@example.com", "Copenhagen", 28),
    (2, "Bob", "bob@example.com", "Aarhus", 35),
    (3, "Charlie", None, "Odense", 42),  # Missing email
    (4, "Diana", "diana@example.com", None, 31),  # Missing city
    (5, "Eve", "eve@example.com", "Aalborg", None),  # Missing age
    (6, "Frank", "frank@example.com", "Copenhagen", 29),
    (7, "Grace", "grace@example.com", "Aarhus", 45),
    (1, "Alice", "alice@example.com", "Copenhagen", 28),  # Duplicate!
    (8, "Henry", "henry@example.com", "Odense", 38),
    (9, None, "unknown@example.com", "Copenhagen", 25),  # Missing name
]

customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("city", StringType(), True),
    StructField("age", IntegerType(), True),
])

# Sample Orders data
orders_data = [
    (101, 1, "2024-01-15", 150.00, "completed"),
    (102, 2, "2024-01-16", 250.50, "completed"),
    (103, 1, "2024-01-17", 75.00, "completed"),
    (104, 3, "2024-01-18", 320.00, "pending"),
    (105, 4, "2024-01-19", 180.00, "completed"),
    (106, 5, "2024-01-20", 95.50, "cancelled"),
    (107, 6, "2024-01-21", 420.00, "completed"),
    (108, 2, "2024-01-22", 55.00, "completed"),
    (109, 7, "2024-01-23", 290.00, "pending"),
    (110, 8, "2024-01-24", 175.00, "completed"),
]

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("status", StringType(), True),
])

# Create DataFrames
customers_raw = spark.createDataFrame(customers_data, customers_schema)
orders_raw = spark.createDataFrame(orders_data, orders_schema)

print("üìä Sample data created:")
print(f"   Customers: {customers_raw.count()} rows")
print(f"   Orders: {orders_raw.count()} rows")


---
# ü•â Layer 1: Bronze (Raw Ingestion)

The Bronze layer stores raw data exactly as received. We track:
- Number of source files/records
- Total rows ingested
- Ingestion timestamp


In [None]:
with connector.metric_context(
    layer=1,  # Bronze = Layer 1
    project="demo",
    dataset_year=2024,
    description="Bronze layer: Raw data ingestion",
    job_name="bronze_ingest"
) as ctx:
    
    # ========== CUSTOMERS ==========
    customers_count = df_count(customers_raw)
    
    # Log: Rows ingested
    ctx.log_metric(
        layer=1, project="demo", dataset_year=2024,
        description="Customers rows ingested",
        value=customers_count,
        unit="rows",
        function="count",
        job_name="bronze_customers_ingest",
        table_name="bronze_customers",
        source_path="raw/customers.csv"
    )
    
    # Write to Bronze
    customers_raw.write.format("delta").mode("overwrite").save(f"{BRONZE_PATH}/customers")
    print(f"‚úÖ Bronze customers: {customers_count} rows written")
    
    # ========== ORDERS ==========
    orders_count = df_count(orders_raw)
    
    # Log: Rows ingested
    ctx.log_metric(
        layer=1, project="demo", dataset_year=2024,
        description="Orders rows ingested",
        value=orders_count,
        unit="rows",
        function="count",
        job_name="bronze_orders_ingest",
        table_name="bronze_orders",
        source_path="raw/orders.csv"
    )
    
    # Write to Bronze
    orders_raw.write.format("delta").mode("overwrite").save(f"{BRONZE_PATH}/orders")
    print(f"‚úÖ Bronze orders: {orders_count} rows written")

print("\nü•â Bronze layer complete!")


---
# ü•à Layer 2: Silver (Data Quality & Cleaning)

The Silver layer handles data quality:
- Remove duplicates
- Handle nulls
- Validate data types
- Track quality metrics


In [None]:
with connector.metric_context(
    layer=2,  # Silver = Layer 2
    project="demo",
    dataset_year=2024,
    description="Silver layer: Data quality & cleaning",
    job_name="silver_clean"
) as ctx:
    
    # ========== CUSTOMERS QUALITY CHECK ==========
    customers_bronze = spark.read.format("delta").load(f"{BRONZE_PATH}/customers")
    
    # Count nulls per column
    null_counts = customers_bronze.select([
        spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) 
        for c in customers_bronze.columns
    ]).collect()[0]
    
    total_nulls = sum([null_counts[c] for c in customers_bronze.columns])
    
    # Log: Null count
    ctx.log_metric(
        layer=2, project="demo", dataset_year=2024,
        description="Customers null values found",
        value=total_nulls,
        unit="nulls",
        function="count",
        job_name="silver_customers_null_check",
        table_name="silver_customers"
    )
    print(f"‚ö†Ô∏è  Found {total_nulls} null values in customers")
    
    # Count duplicates
    total_rows = df_count(customers_bronze)
    unique_rows = customers_bronze.dropDuplicates(["customer_id"]).count()
    duplicate_count = total_rows - unique_rows
    
    # Log: Duplicate count
    ctx.log_metric(
        layer=2, project="demo", dataset_year=2024,
        description="Customers duplicates removed",
        value=duplicate_count,
        unit="duplicates",
        function="count",
        job_name="silver_customers_dedup",
        table_name="silver_customers"
    )
    print(f"üîÑ Removed {duplicate_count} duplicate rows")
    
    # Clean: Remove duplicates, fill nulls
    customers_silver = (
        customers_bronze
        .dropDuplicates(["customer_id"])
        .fillna({"name": "Unknown", "email": "no-email@unknown.com", "city": "Unknown", "age": 0})
    )
    
    # Log: Clean rows
    clean_count = df_count(customers_silver)
    ctx.log_metric(
        layer=2, project="demo", dataset_year=2024,
        description="Customers clean rows",
        value=clean_count,
        unit="rows",
        function="count",
        job_name="silver_customers_output",
        table_name="silver_customers"
    )
    
    # Write to Silver
    customers_silver.write.format("delta").mode("overwrite").save(f"{SILVER_PATH}/customers")
    print(f"‚úÖ Silver customers: {clean_count} clean rows written")
    
    # ========== ORDERS QUALITY CHECK ==========
    orders_bronze = spark.read.format("delta").load(f"{BRONZE_PATH}/orders")
    
    # Orders are clean in this demo, just pass through
    orders_silver = orders_bronze
    orders_clean_count = df_count(orders_silver)
    
    ctx.log_metric(
        layer=2, project="demo", dataset_year=2024,
        description="Orders validated rows",
        value=orders_clean_count,
        unit="rows",
        function="count",
        job_name="silver_orders_output",
        table_name="silver_orders"
    )
    
    orders_silver.write.format("delta").mode("overwrite").save(f"{SILVER_PATH}/orders")
    print(f"‚úÖ Silver orders: {orders_clean_count} rows written")

print("\nü•à Silver layer complete!")


---
# ü•á Layer 3: Gold (Business Transformations)

The Gold layer creates business-ready datasets:
- Join customers with orders
- Calculate aggregations
- Create summary tables


In [None]:
with connector.metric_context(
    layer=3,  # Gold = Layer 3
    project="demo",
    dataset_year=2024,
    description="Gold layer: Business transformations",
    job_name="gold_transform"
) as ctx:
    
    # Load Silver data
    customers = spark.read.format("delta").load(f"{SILVER_PATH}/customers")
    orders = spark.read.format("delta").load(f"{SILVER_PATH}/orders")
    
    # ========== JOIN: Customer Orders ==========
    customer_orders = customers.join(orders, "customer_id", "inner")
    join_count = df_count(customer_orders)
    
    ctx.log_metric(
        layer=3, project="demo", dataset_year=2024,
        description="Customer-Orders join result",
        value=join_count,
        unit="rows",
        function="count",
        job_name="gold_customer_orders_join",
        table_name="gold_customer_orders"
    )
    
    customer_orders.write.format("delta").mode("overwrite").save(f"{GOLD_PATH}/customer_orders")
    print(f"‚úÖ Gold customer_orders: {join_count} rows")
    
    # ========== AGGREGATION: Customer Summary ==========
    from pyspark.sql.functions import sum as spark_sum, count as spark_count, avg as spark_avg
    
    customer_summary = (
        customer_orders
        .groupBy("customer_id", "name", "city")
        .agg(
            spark_count("order_id").alias("total_orders"),
            spark_sum("amount").alias("total_spent"),
            spark_avg("amount").alias("avg_order_value")
        )
    )
    
    summary_count = df_count(customer_summary)
    
    ctx.log_metric(
        layer=3, project="demo", dataset_year=2024,
        description="Customer summary records",
        value=summary_count,
        unit="customers",
        function="count",
        job_name="gold_customer_summary",
        table_name="gold_customer_summary"
    )
    
    # Log: Average order value (business metric)
    avg_order = df_avg(customer_orders, "amount")
    ctx.log_metric(
        layer=3, project="demo", dataset_year=2024,
        description="Average order value",
        value=avg_order,
        unit="DKK",
        function="avg",
        job_name="gold_avg_order_metric",
        table_name="gold_customer_orders"
    )
    
    # Log: Total revenue (business metric)
    total_revenue = customer_orders.agg(spark_sum("amount")).collect()[0][0]
    ctx.log_metric(
        layer=3, project="demo", dataset_year=2024,
        description="Total revenue",
        value=total_revenue,
        unit="DKK",
        function="sum",
        job_name="gold_revenue_metric",
        table_name="gold_customer_orders"
    )
    
    customer_summary.write.format("delta").mode("overwrite").save(f"{GOLD_PATH}/customer_summary")
    print(f"‚úÖ Gold customer_summary: {summary_count} customers")
    print(f"\nüí∞ Total Revenue: {total_revenue:.2f} DKK")
    print(f"üìä Avg Order Value: {avg_order:.2f} DKK")

print("\nü•á Gold layer complete!")


---
# üìà Pipeline Data Summary

Let's see how the data transforms through each layer of the pipeline.


In [None]:
# ============================================
# ü•â BRONZE LAYER - Raw Data (with quality issues)
# ============================================
print("ü•â BRONZE: Raw Customers (notice nulls and duplicate customer_id=1)")
print("=" * 80)
bronze_customers = spark.read.format("delta").load(f"{BRONZE_PATH}/customers")
bronze_customers.show(truncate=False)

print(f"\nü•â BRONZE: Raw Orders")
print("=" * 80)
bronze_orders = spark.read.format("delta").load(f"{BRONZE_PATH}/orders")
bronze_orders.show(truncate=False)

# ============================================
# ü•à SILVER LAYER - Cleaned Data
# ============================================
print("\nü•à SILVER: Cleaned Customers (duplicates removed, nulls filled)")
print("=" * 80)
silver_customers = spark.read.format("delta").load(f"{SILVER_PATH}/customers")
silver_customers.show(truncate=False)

print(f"\nü•à SILVER: Validated Orders")
print("=" * 80)
silver_orders = spark.read.format("delta").load(f"{SILVER_PATH}/orders")
silver_orders.show(truncate=False)

# ============================================
# ü•á GOLD LAYER - Business-Ready Aggregations
# ============================================
print("\nü•á GOLD: Customer Orders (joined)")
print("=" * 80)
gold_orders = spark.read.format("delta").load(f"{GOLD_PATH}/customer_orders")
gold_orders.show(truncate=False)

print(f"\nü•á GOLD: Customer Summary (aggregated)")
print("=" * 80)
gold_summary = spark.read.format("delta").load(f"{GOLD_PATH}/customer_summary")
gold_summary.orderBy("total_spent", ascending=False).show(truncate=False)

# ============================================
# üìä Layer Row Counts Summary
# ============================================
print("\n" + "=" * 80)
print("üìä PIPELINE SUMMARY - Row Counts")
print("=" * 80)
print(f"ü•â Bronze: {bronze_customers.count()} customers, {bronze_orders.count()} orders")
print(f"ü•à Silver: {silver_customers.count()} customers, {silver_orders.count()} orders")
print(f"ü•á Gold:   {gold_orders.count()} customer-orders, {gold_summary.count()} customer summaries")


---
# üîç Analytics: Query the Metrics Table

All metrics from the pipeline are stored in a Delta table. Let's query it!


In [None]:
print("üìä All Pipeline Metrics:")
print("=" * 80)

# Metrics are automatically stored by connector.metric_context()
METRICS_PATH = connector.metrics_path  # e.g., s3a://polaris/system/metrics/activity_log
print(f"üìÅ Reading from: {METRICS_PATH}")

metrics_df = spark.read.format("delta").load(METRICS_PATH)

metrics_df.select(
    "event_timestamp",
    "layer",
    "job_name",
    "description",
    "metric_value",
    "metric_unit",
    "status"
).orderBy("event_timestamp").show(50, truncate=False)


## üìä Metrics by Layer


In [None]:
print("\nü•â BRONZE Layer Metrics:")
metrics_df.filter(col("layer") == "1").select(
    "job_name", "description", "metric_value", "metric_unit", "table_name"
).show(truncate=False)

print("\nü•à SILVER Layer Metrics:")
metrics_df.filter(col("layer") == "2").select(
    "job_name", "description", "metric_value", "metric_unit", "table_name"
).show(truncate=False)

print("\nü•á GOLD Layer Metrics:")
metrics_df.filter(col("layer") == "3").select(
    "job_name", "description", "metric_value", "metric_unit", "table_name"
).show(truncate=False)


## ‚è±Ô∏è Job Execution History


In [None]:
print("\n‚è±Ô∏è Job Completion History:")

metrics_df.filter(col("metric_function") == "completion").select(
    "event_timestamp",
    "job_name",
    "layer",
    "status",
    "duration_ms"
).orderBy("event_timestamp").show(truncate=False)


---
# üéØ Summary

This notebook demonstrated a complete **Bronze ‚Üí Silver ‚Üí Gold** data pipeline:

| Layer | What Happened | Key Metrics |
|-------|---------------|-------------|
| ü•â **Bronze** | Raw data ingestion | 10 customers (with issues), 10 orders |
| ü•à **Silver** | Data quality & cleaning | 4 nulls fixed, 1 duplicate removed ‚Üí 9 clean customers |
| ü•á **Gold** | Business transformations | Customer-order joins, revenue aggregations |

### Key `SparkConnector` Features Used:

```python
from shared.spark_connector import SparkConnector

connector = SparkConnector(size="M")
spark = connector.session

# Metrics write to s3a://<bucket>/system/metrics/activity_log automatically
with connector.metric_context(
    layer=3,
    project="demo",
    dataset_year=2024,
    description="Customer ETL",
    job_name="customer_etl"
) as ctx:
    df = spark.read.csv("/path/to/data")
    ctx.log_metric(
        layer=3, project="demo", dataset_year=2024,
        description="Rows loaded",
        value=df.count(),
        unit="rows",
        function="count"
    )
```

### Storage Locations (MinIO S3):

| Path | Contents |
|------|----------|
| `s3a://polaris/demo/bronze/` | Raw data (customers, orders) |
| `s3a://polaris/demo/silver/` | Cleaned data |
| `s3a://polaris/demo/gold/` | Business-ready aggregations |
| `s3a://<bucket>/system/metrics/activity_log` | Pipeline metrics (automatic) |


In [None]:
connector.stop()
print("üèÅ Demo complete!")
