In [2]:
"""
Silver Layer: Data Cleaning and Transformation
Purpose: Apply data quality checks and split into required tables
Requirements:
1. No null property_id, rooms, or price
2. Positive price only
3. Valid month format (YYYY-MM)
4. Split into: HousePriceTimeSeries and Houses (Type 2 SCD)
"""

from pyspark.sql.functions import col, when, trim, lag, sum, first, min, max, count, avg, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import DecimalType, IntegerType

# Step 1: Read data from Bronze layer
df_bronze = spark.read.table("houses_price_bronze")
initial_count = df_bronze.count()
print(f"Reading from Bronze layer: {initial_count} rows")

# Initialize cleaned dataframe
df_clean = df_bronze
quality_report = {}

# Step 2: Apply data quality checks
# Check 1: No null property_id
if "property_id" in df_clean.columns:
    null_count = df_clean.filter(col("property_id").isNull() | (trim(col("property_id")) == "")).count()
    if null_count > 0:
        df_clean = df_clean.filter(col("property_id").isNotNull() & (trim(col("property_id")) != ""))
        quality_report["null_property_id"] = null_count
        print(f"Removed {null_count} rows with null/empty property_id")
# Check 2: No null price
if "price" in df_clean.columns:
    null_count = df_clean.filter(col("price").isNull() | (trim(col("price")) == "")).count()
    if null_count > 0:
        df_clean = df_clean.filter(col("price").isNotNull() & (trim(col("price")) != ""))
        quality_report["null_price"] = null_count
        print(f"Removed {null_count} rows with null/empty price")
# Check 3: No null room-related fields
room_fields = ["bedrooms", "bathrooms", "guestroom"]
for field in room_fields:
    if field in df_clean.columns:
        null_count = df_clean.filter(col(field).isNull() | (trim(col(field)) == "")).count()
        if null_count > 0:
            df_clean = df_clean.filter(col(field).isNotNull() & (trim(col(field)) != ""))
            quality_report[f"null_{field}"] = null_count
            print(f"Removed {null_count} rows with null/empty {field}")
# Check 4: Positive price only
if "price" in df_clean.columns:
    # Convert to decimal for numeric comparison
    df_clean = df_clean.withColumn("price_decimal", col("price").cast(DecimalType(15, 2)))
    non_positive = df_clean.filter(col("price_decimal") <= 0).count()
    if non_positive > 0:
        df_clean = df_clean.filter(col("price_decimal") > 0)
        quality_report["non_positive_price"] = non_positive
        print(f"Removed {non_positive} rows with non-positive price")
    df_clean = df_clean.withColumn("price", col("price_decimal")).drop("price_decimal")
# Check 5: Valid month format (YYYY-MM)
if "month" in df_clean.columns:
    invalid_count = df_clean.filter(~col("month").rlike("^\\d{4}-\\d{2}$")).count()
    if invalid_count > 0:
        df_clean = df_clean.filter(col("month").rlike("^\\d{4}-\\d{2}$"))
        quality_report["invalid_month"] = invalid_count
        print(f"Removed {invalid_count} rows with invalid month format")

# Quality check summary
final_count = df_clean.count()
print(f"\nData quality summary:")
print(f"Initial rows: {initial_count}")
print(f"Rows removed: {initial_count - final_count}")
print(f"Clean rows: {final_count}")

# Step 3: Convert data types from string to proper types
# It depends on business system data type. I just saw the raw data to convert to proper types
type_conversions = {
    "property_id": IntegerType(),
    "price": DecimalType(12, 2),
    "bedrooms": IntegerType(),
    "bathrooms": IntegerType(),
    "area": IntegerType(),
    "stories": IntegerType(),
    "parking": IntegerType(),
    "mainroad": IntegerType(),
    "guestroom": IntegerType(),
    "basement": IntegerType(),
    "hotwaterheating": IntegerType(),
    "airconditioning": IntegerType(),
    "prefarea": IntegerType(),
    "furnishingstatus": IntegerType()
}

for column, target_type in type_conversions.items():
    if column in df_clean.columns:
        df_clean = df_clean.withColumn(column, col(column).cast(target_type))

print("Data type conversions completed")

# Step 4: Create HousePriceTimeSeries table
print("Creating HousePriceTimeSeries table...")
#define an expected columns array
time_series_columns = [
    "property_id", "month", "price",
    "area", "bedrooms", "bathrooms", "stories",
    "mainroad", "guestroom", "basement",
    "hotwaterheating", "airconditioning",
    "parking", "prefarea", "furnishingstatus"
]

existing_columns = [c for c in time_series_columns if c in df_clean.columns]
df_time_series = df_clean.select(*existing_columns)

# Save to Silver layer
df_time_series.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("HousePriceTimeSeries")

time_series_count = df_time_series.count()
print(f"HousePriceTimeSeries saved: {time_series_count} rows")

# Step 5: Create Houses table (Type 2 SCD)
print("Creating Houses table (Type 2 SCD)...")

# Define attributes to track for SCD
scd_attributes = [
    "bedrooms", "bathrooms", "guestroom",
    "stories", "area", "basement",
    "mainroad", "hotwaterheating", "airconditioning",
    "parking", "prefarea", "furnishingstatus"
]

existing_attributes = [attr for attr in scd_attributes if attr in df_clean.columns]

# Create window for detecting changes
window_spec = Window.partitionBy("property_id").orderBy("month")

# Add previous values for comparison
df_with_lag = df_clean
for attribute in existing_attributes:
    df_with_lag = df_with_lag.withColumn(
        f"prev_{attribute}",
        lag(attribute, 1).over(window_spec)
    )

# Detect when any attribute changes
from functools import reduce
change_conditions = []
for attribute in existing_attributes:
    condition = (col(attribute) != col(f"prev_{attribute}")) | col(f"prev_{attribute}").isNull()
    change_conditions.append(condition)

combined_condition = reduce(lambda a, b: a | b, change_conditions)
df_with_changes = df_with_lag.withColumn(
    "is_attribute_change",
    when(combined_condition, 1).otherwise(0)
)

# Create version numbers
df_with_version = df_with_changes.withColumn(
    "scd_version",
    sum("is_attribute_change").over(window_spec).cast("int")
)

# Aggregate to create SCD table
agg_expressions = [first(attr).alias(attr) for attr in existing_attributes]
agg_expressions.extend([
    min("month").alias("valid_from"),
    max("month").alias("valid_to"),
    count("*").alias("months_in_version"),
    avg("price").alias("avg_price_in_version")
])

df_houses_scd = df_with_version.groupBy("property_id", "scd_version").agg(*agg_expressions)

# Mark current version
latest_month = df_clean.agg(max("month")).collect()[0][0]
df_houses_scd = df_houses_scd.withColumn(
    "is_current_version",
    when(col("valid_to") == latest_month, True).otherwise(False)
)

# Save Houses table
df_houses_scd.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("Houses")

houses_count = df_houses_scd.count()
print(f"Houses (Type 2 SCD) saved: {houses_count} version records")

# Show SCD example
print("SCD Type 2 example - Property 6:")
df_houses_scd.filter(col("property_id") == 6) \
    .select("property_id", "scd_version", "bedrooms", "valid_from", "valid_to", "is_current_version") \
    .orderBy("scd_version") \
    .show()

# Silver layer completion
print(f"Silver layer completed:")
print(f"- HousePriceTimeSeries: {time_series_count} rows")
print(f"- Houses (SCD Type 2): {houses_count} version records")
print(f"- Data quality: {final_count}/{initial_count} rows passed checks")

StatementMeta(, 148bf40b-026e-4600-aa96-7eda182f3bbb, 4, Finished, Available, Finished)

Reading from Bronze layer: 19620 rows
Removed 89 rows with null/empty bedrooms

Data quality summary:
Initial rows: 19620
Rows removed: 89
Clean rows: 19531
Data type conversions completed
Creating HousePriceTimeSeries table...
HousePriceTimeSeries saved: 19531 rows
Creating Houses table (Type 2 SCD)...
Houses (Type 2 SCD) saved: 551 version records
SCD Type 2 example - Property 6:
+-----------+-----------+--------+----------+--------+------------------+
|property_id|scd_version|bedrooms|valid_from|valid_to|is_current_version|
+-----------+-----------+--------+----------+--------+------------------+
|          6|          1|       4|   2023-01| 2025-05|             false|
|          6|          2|       5|   2025-06| 2025-12|              true|
+-----------+-----------+--------+----------+--------+------------------+

Silver layer completed:
- HousePriceTimeSeries: 19531 rows
- Houses (SCD Type 2): 551 version records
- Data quality: 19531/19620 rows passed checks
