# Silver Layer: Transform Bronze to Silver Delta Tables

## Purpose
This notebook transforms raw Bronze data into cleaned, normalized Silver layer tables following dimensional modeling principles.

## Inputs
- **Source**: `bronze_replays_raw` Delta table in `tm2020_bronze` Lakehouse
- **Processing**: Read incremental data since last run (watermark-based)

## Outputs
- **Target Lakehouse**: `tm2020_silver`
- **Tables**:
  - `silver_replays`: Cleaned replay metadata (fact table)
  - `silver_ghost_samples`: Normalized telemetry samples (fact table)
  - `silver_maps`: Map dimension table (SCD Type 2)
  - `silver_players`: Player dimension table (SCD Type 2)

## Data Quality Rules
1. **Mandatory fields**: player_login, map_uid, race_time_ms must not be null
2. **Deduplication**: Keep latest record per replay_id
3. **Type validation**: Ensure numeric fields are valid
4. **Range checks**: race_time_ms > 0, speeds within reasonable bounds
5. **Standardization**: Normalize player nicknames, map names

In [None]:
# Initialize Spark session and imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, explode, current_timestamp, coalesce, lit, 
    row_number, size, monotonically_increasing_id, concat, md5
)
from pyspark.sql.window import Window
from pyspark.sql.types import BooleanType

# Initialize Spark session (pre-configured in Fabric)
spark = SparkSession.builder.appName("Silver_Transformation").getOrCreate()

print("Spark session initialized")
print(f"Spark version: {spark.version}")

In [None]:
# Define table names
# TODO: Ensure both Bronze and Silver Lakehouses are attached to this notebook
bronze_table = "tm2020_bronze.bronze_replays_raw"
silver_replays_table = "silver_replays"
silver_ghost_samples_table = "silver_ghost_samples"
silver_maps_table = "silver_maps"
silver_players_table = "silver_players"

print(f"Bronze source: {bronze_table}")
print(f"Silver tables: {silver_replays_table}, {silver_ghost_samples_table}, {silver_maps_table}, {silver_players_table}")

In [None]:
# Read from Bronze layer
# TODO: Implement incremental processing using watermarks
# For now, read all data - optimize later with delta log processing

try:
    df_bronze = spark.table(bronze_table)
    print(f"Records read from Bronze: {df_bronze.count()}")
    
    # Show sample
    df_bronze.select(
        "replay_id", 
        "ingestion_timestamp", 
        "metadata.player_login", 
        "metadata.map_uid", 
        "metadata.race_time_ms"
    ).show(5, truncate=False)
    
except Exception as e:
    print(f"Error reading Bronze table: {e}")
    print("Ensure Bronze ingestion notebook has been run successfully")

In [None]:
# Data quality validation and cleaning
# TODO: Implement comprehensive data quality rules

try:
    # Add validation flag
    df_validated = df_bronze.withColumn(
        "is_valid",
        (
            col("metadata.player_login").isNotNull() &
            col("metadata.map_uid").isNotNull() &
            col("metadata.race_time_ms").isNotNull() &
            (col("metadata.race_time_ms") > 0)
        ).cast(BooleanType())
    )
    
    # Report validation results
    total_records = df_validated.count()
    valid_records = df_validated.filter(col("is_valid") == True).count()
    invalid_records = total_records - valid_records
    
    print(f"Total records: {total_records}")
    print(f"Valid records: {valid_records} ({valid_records/total_records*100:.2f}%)")
    print(f"Invalid records: {invalid_records} ({invalid_records/total_records*100:.2f}%)")
    
    # Keep only valid records for Silver layer
    df_clean = df_validated.filter(col("is_valid") == True)
    
except Exception as e:
    print(f"Error in validation: {e}")

In [None]:
# Transform to Silver replays table
# TODO: Implement deduplication and additional transformations

try:
    df_silver_replays = df_clean.select(
        col("replay_id"),
        col("metadata.player_login").alias("player_login"),
        col("metadata.player_nickname").alias("player_nickname"),
        col("metadata.map_uid").alias("map_uid"),
        col("metadata.race_time_ms").alias("race_time_ms"),
        coalesce(col("metadata.num_respawns"), lit(0)).alias("num_respawns"),
        size(col("metadata.checkpoints")).alias("num_checkpoints"),
        col("metadata.game_version").alias("game_version"),
        col("metadata.title_id").alias("title_id"),
        col("ingestion_timestamp").cast("date").alias("ingestion_date"),
        col("is_valid")
    )
    
    # Deduplication: Keep latest record per replay_id
    window_spec = Window.partitionBy("replay_id").orderBy(col("ingestion_date").desc())
    df_silver_replays_dedup = df_silver_replays \
        .withColumn("row_num", row_number().over(window_spec)) \
        .filter(col("row_num") == 1) \
        .drop("row_num")
    
    print(f"Silver replays records: {df_silver_replays_dedup.count()}")
    df_silver_replays_dedup.show(5, truncate=False)
    
except Exception as e:
    print(f"Error creating silver_replays: {e}")

In [None]:
# Transform to Silver ghost samples table (normalized telemetry)
# TODO: Implement ghost samples normalization

try:
    # Explode ghost_samples array into individual rows
    df_silver_ghost_samples = df_clean.select(
        col("replay_id"),
        explode(col("ghost_samples")).alias("sample")
    ).select(
        concat(col("replay_id"), lit("_"), col("sample.time_ms")).alias("sample_id"),
        col("replay_id"),
        col("sample.time_ms").alias("time_ms"),
        col("sample.position.x").alias("position_x"),
        col("sample.position.y").alias("position_y"),
        col("sample.position.z").alias("position_z"),
        col("sample.velocity.x").alias("velocity_x"),
        col("sample.velocity.y").alias("velocity_y"),
        col("sample.velocity.z").alias("velocity_z"),
        col("sample.speed").alias("speed")
    )
    
    print(f"Silver ghost samples records: {df_silver_ghost_samples.count()}")
    df_silver_ghost_samples.show(5, truncate=False)
    
except Exception as e:
    print(f"Error creating silver_ghost_samples: {e}")

In [None]:
# Create Maps dimension table
# TODO: Implement SCD Type 2 logic for slowly changing dimensions

try:
    df_silver_maps = df_clean.select(
        col("metadata.map_uid").alias("map_uid"),
        col("metadata.map_name").alias("map_name"),
        col("metadata.map_author").alias("map_author"),
        col("ingestion_timestamp")
    ).distinct()
    
    # Aggregate to get first and last seen timestamps
    df_silver_maps_agg = df_silver_maps.groupBy("map_uid", "map_name", "map_author").agg(
        col("ingestion_timestamp").alias("first_seen"),
        col("ingestion_timestamp").alias("last_seen")
    )
    
    print(f"Silver maps records: {df_silver_maps_agg.count()}")
    df_silver_maps_agg.show(5, truncate=False)
    
except Exception as e:
    print(f"Error creating silver_maps: {e}")

In [None]:
# Create Players dimension table
# TODO: Implement SCD Type 2 logic for slowly changing dimensions

try:
    df_silver_players = df_clean.select(
        col("metadata.player_login").alias("player_login"),
        col("metadata.player_nickname").alias("player_nickname"),
        col("ingestion_timestamp")
    ).distinct()
    
    # Aggregate to get first and last seen timestamps
    df_silver_players_agg = df_silver_players.groupBy("player_login", "player_nickname").agg(
        col("ingestion_timestamp").alias("first_seen"),
        col("ingestion_timestamp").alias("last_seen")
    )
    
    print(f"Silver players records: {df_silver_players_agg.count()}")
    df_silver_players_agg.show(5, truncate=False)
    
except Exception as e:
    print(f"Error creating silver_players: {e}")

In [None]:
# Write to Silver Delta tables
# TODO: Implement merge logic for incremental updates
# For now using overwrite mode - optimize later with MERGE statements

try:
    # Write silver_replays
    df_silver_replays_dedup.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(silver_replays_table)
    print(f"Successfully wrote to {silver_replays_table}")
    
    # Write silver_ghost_samples
    df_silver_ghost_samples.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(silver_ghost_samples_table)
    print(f"Successfully wrote to {silver_ghost_samples_table}")
    
    # Write silver_maps
    df_silver_maps_agg.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(silver_maps_table)
    print(f"Successfully wrote to {silver_maps_table}")
    
    # Write silver_players
    df_silver_players_agg.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(silver_players_table)
    print(f"Successfully wrote to {silver_players_table}")
    
    print("\nAll Silver tables written successfully!")
    
except Exception as e:
    print(f"Error writing Silver tables: {e}")

In [None]:
# Data quality report for Silver layer
# TODO: Implement comprehensive data quality checks

try:
    print("=== Silver Layer Data Quality Report ===")
    
    # silver_replays
    df_replays_check = spark.table(silver_replays_table)
    print(f"\nsilver_replays: {df_replays_check.count()} records")
    print(f"  Unique players: {df_replays_check.select('player_login').distinct().count()}")
    print(f"  Unique maps: {df_replays_check.select('map_uid').distinct().count()}")
    
    # silver_ghost_samples
    df_samples_check = spark.table(silver_ghost_samples_table)
    print(f"\nsilver_ghost_samples: {df_samples_check.count()} records")
    print(f"  Unique replays: {df_samples_check.select('replay_id').distinct().count()}")
    
    # silver_maps
    df_maps_check = spark.table(silver_maps_table)
    print(f"\nsilver_maps: {df_maps_check.count()} records")
    
    # silver_players
    df_players_check = spark.table(silver_players_table)
    print(f"\nsilver_players: {df_players_check.count()} records")
    
except Exception as e:
    print(f"Error running data quality checks: {e}")