In [0]:
# -------------------------------
# Import Libraries
# --------------------------------
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from datetime import datetime
import uuid
import traceback

# --------------------------------
# Initialize Spark Session
# --------------------------------
spark = SparkSession.builder.appName("EnergyConsumptionSilverPipeline").getOrCreate()

# --------------------------------
# Job ID for this run
# --------------------------------
job_id = str(uuid.uuid4())

# --------------------------------
# Logging function
# --------------------------------
def log_step(step_name, status="INFO", message=""):
    log_df = spark.createDataFrame([
        Row(
            job_id=job_id,
            timestamp=datetime.utcnow().isoformat(),
            step=step_name,
            status=status,
            message=message
        )
    ])
    (
        log_df.write
        .format("delta")
        .mode("append")
        .saveAsTable("project_logs.silver_log.log")   # Silver log table
    )

# --------------------------------
# Config
# --------------------------------
source_catalog_s3 = "aws_dataingestion.bronze_aws.household"
target_catalog = "aws_refinement"
target_schema = "default"
target_table = "usage_cleaned"

table_name = "usage_records"
bronze_s3_path = source_catalog_s3
silver_table_full_name = f"{target_catalog}.{target_schema}.{target_table}"

# --------------------------------
# Silver ETL
# --------------------------------
try:
    log_step("START", "INFO", f"Starting Silver processing for {table_name} from {bronze_s3_path}")

    # 1. Read Bronze Delta data
    df = spark.read.table(bronze_s3_path)
    initial_count = df.count()
    log_step("LOAD", "INFO", f"Loaded {initial_count} rows from Bronze path")

    # 2. Combine Date + Time → timestamp
    df_transformed = df.withColumn(
        "timestamp",
        F.to_timestamp(F.concat_ws(" ", F.col("Date"), F.col("Time")), "d/M/yyyy H:m:s")
    )

    # 3. Cast numeric cols, handle '?'
    numeric_cols = [
        "Global_active_power", "Global_reactive_power", "Voltage",
        "Global_intensity", "Sub_metering_1", "Sub_metering_2", "Sub_metering_3"
    ]
    for col_name in numeric_cols:
        df_transformed = df_transformed.withColumn(
            col_name,
            F.when(F.col(col_name) == "?", None).otherwise(F.col(col_name)).cast(DoubleType())
        )
    log_step("CAST_NUMERIC", "INFO", "Numeric columns casted with '?' handled")

    # 4. Fill null values with defaults
    defaults = {
        "Global_active_power": 0.0, "Global_reactive_power": 0.0, "Voltage": 0.0,
        "Global_intensity": 0.0, "Sub_metering_1": 0.0, "Sub_metering_2": 0.0,
        "Sub_metering_3": 0.0
    }
    df_filled = df_transformed.fillna(defaults)

    # 5. Drop rows with null timestamp
    df_cleaned = df_filled.filter(F.col("timestamp").isNotNull())

    # 6. Deduplicate
    df_final = df_cleaned.dropDuplicates(["timestamp"])
    final_count = df_final.count()
    log_step("DEDUP", "INFO", f"Row count after cleaning/deduplication: {final_count}")

    # 7. Write Silver Table
    df_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_step("SAVE", "SUCCESS", f"Saved Silver table {silver_table_full_name}")

    log_step("END", "SUCCESS", f"Silver processing completed ✅. Rows: {final_count}")

except Exception as e:
    log_step("ERROR", "FAIL", traceback.format_exc())
    raise


In [0]:
from pyspark.sql.functions import year, month

# Add year and month columns for partitioning
df_final = df_final.withColumn("year", year(F.col("timestamp"))) \
                         .withColumn("month", month(F.col("timestamp")))

# Write to Silver Delta table with partitioning
df_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year", "month") \
    .saveAsTable(silver_table_full_name)

logger.info(f" Successfully saved Silver table with partitioning by year/month: {silver_table_full_name}")

In [0]:
def test_silver_vacuum(spark_session):
    silver_table_full_name = "default.silver_test"

    # Create table if not exists
    data = [("2025-08-23 10:00:00", 100.0)]
    df = spark_session.createDataFrame(data, ["timestamp", "value"])
    df.write.format("delta").mode("overwrite").saveAsTable(silver_table_full_name)

    # Run VACUUM
    spark_session.sql(f"VACUUM {silver_table_full_name} RETAIN 168 HOURS")

    # Validate table still exists
    count = spark_session.table(silver_table_full_name).count()
    assert count > 0
    print("VACUUM test passed")

# Call test function manually
test_silver_vacuum(spark)


In [0]:
# Manually call the function outside pytest
from pyspark.sql import SparkSession

# Create or reuse Spark session
spark = SparkSession.builder.appName("TestSilverOptimize").getOrCreate()

# Define the function
def test_silver_optimize(spark_session):
    silver_table_full_name = "silver_test"

    # Run OPTIMIZE
    spark_session.sql(f"OPTIMIZE {silver_table_full_name} ZORDER BY (timestamp)")

    # Just check table is still accessible after optimize
    count = spark_session.table(silver_table_full_name).count()
    assert count > 0
    print(f" OPTIMIZE successful. Row count: {count}")

# Call the function manually
test_silver_optimize(spark)


In [0]:
df_final.show(5)