In [48]:
# utils/silver_attributes.py

import os
import re
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, to_date, regexp_replace, when, lit
from pyspark.sql.types import StringType, IntegerType, DateType

# Configuration & rules
column_type_map = {
    "Customer_ID":   StringType(),
    "Name":          StringType(),
    "Age":           IntegerType(),
    "SSN":           StringType(),
    "Occupation":    StringType(),
    "snapshot_date": DateType(),
}
AGE_MIN = 15
AGE_MAX = 100
SSN_PATTERN = r"^\d{3}-\d{2}-\d{4}$"
OCCUPATION_PATTERN = r"^[A-Za-z ]+$"

def clean_ssn(df: DataFrame) -> DataFrame:
    df = df.withColumn("SSN", regexp_replace(col("SSN"), r"[^0-9\-]", ""))
    return df.withColumn(
        "SSN",
        when(col("SSN").rlike(SSN_PATTERN), col("SSN")).otherwise(lit(None))
    )

def enforce_age(df: DataFrame) -> DataFrame:
    return df.withColumn(
        "Age",
        when(
            (col("Age").cast(IntegerType()) < AGE_MIN) |
            (col("Age").cast(IntegerType()) > AGE_MAX),
            lit(0)
        ).otherwise(col("Age").cast(IntegerType()))
    )

def clean_occupation(df: DataFrame) -> DataFrame:
    return df.withColumn(
        "Occupation",
        when(col("Occupation").rlike(OCCUPATION_PATTERN), col("Occupation")).otherwise(lit(None))
    )

def cast_columns(df: DataFrame) -> DataFrame:
    for c, dtype in column_type_map.items():
        if c not in df.columns:
            continue
        if isinstance(dtype, DateType):
            df = df.withColumn(c, to_date(col(c), "yyyy-MM-dd"))
        else:
            df = df.withColumn(c, col(c).cast(dtype))
    return df

def process_silver_attributes(
    spark: SparkSession,
    csv_path: str,
    silver_base: str
) -> None:
    """
    Bronze→Silver for attributes:
      - Load Bronze CSV
      - Clean SSN, Age, Occupation
      - Cast all columns
      - Extract date from filename (bronze_feature_attributes_YYYY_MM_DD.csv)
      - Write to silver_base/attributes/silver_attributes_<YYYY_MM_DD>.parquet
    """
    df = spark.read.csv(csv_path, header=True, inferSchema=False)
    df = clean_ssn(df)
    df = enforce_age(df)
    df = clean_occupation(df)
    df = cast_columns(df)

    fname = os.path.basename(csv_path)
    m = re.search(r"(\d{4}_\d{2}_\d{2})", fname)
    if not m:
        raise ValueError(f"Cannot parse date from filename {fname}")
    date_str = m.group(1)

    out_dir = os.path.join(silver_base, "attributes", f"silver_attributes_{date_str}.parquet")
    df.repartition(1).write.mode("overwrite").parquet(out_dir)
    print(f"Wrote Attributes → {out_dir}")


In [49]:
# utils/silver_financials.py

import os
import re
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, to_date, regexp_replace, when, lit
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

# Configuration & rules
column_type_map = {
    "Customer_ID":              StringType(),
    "Annual_Income":            FloatType(),
    "Monthly_Inhand_Salary":    FloatType(),
    "Num_Bank_Accounts":        IntegerType(),
    "Num_Credit_Card":          IntegerType(),
    "Interest_Rate":            IntegerType(),
    "Num_of_Loan":              IntegerType(),
    "Type_of_Loan":             StringType(),
    "Delay_from_due_date":      IntegerType(),
    "Num_of_Delayed_Payment":   IntegerType(),
    "Changed_Credit_Limit":     FloatType(),
    "Num_Credit_Inquiries":     IntegerType(),
    "Credit_Mix":               StringType(),
    "Outstanding_Debt":         FloatType(),
    "Credit_Utilization_Ratio": FloatType(),
    "Credit_History_Age":       StringType(),  # raw at Silver
    "Payment_of_Min_Amount":    StringType(),  # raw at Silver
    "Total_EMI_per_month":      FloatType(),
    "Amount_invested_monthly":  FloatType(),
    "Payment_Behaviour":        StringType(),
    "Monthly_Balance":          FloatType(),
    "snapshot_date":            DateType(),
}
INTEREST_MIN = 0
INTEREST_MAX = 100
PAYBEH_PATTERN = r"^[A-Za-z0-9 _]+$"

def clean_numerics(df: DataFrame) -> DataFrame:
    cols = [
        "Annual_Income", "Monthly_Inhand_Salary", "Changed_Credit_Limit",
        "Outstanding_Debt", "Credit_Utilization_Ratio",
        "Total_EMI_per_month", "Amount_invested_monthly", "Monthly_Balance"
    ]
    for c in cols:
        if c in df.columns:
            df = df.withColumn(c, regexp_replace(col(c), r"[^0-9\.\-]", ""))
    return df

def enforce_interest_rate(df: DataFrame) -> DataFrame:
    if "Interest_Rate" in df.columns:
        df = df.withColumn(
            "Interest_Rate",
            when(
                (col("Interest_Rate").cast(IntegerType()) < INTEREST_MIN) |
                (col("Interest_Rate").cast(IntegerType()) > INTEREST_MAX),
                lit(0)
            ).otherwise(col("Interest_Rate").cast(IntegerType()))
        )
    return df

def clean_payment_behaviour(df: DataFrame) -> DataFrame:
    if "Payment_Behaviour" in df.columns:
        df = df.withColumn(
            "Payment_Behaviour",
            when(col("Payment_Behaviour").rlike(PAYBEH_PATTERN), col("Payment_Behaviour")).otherwise(lit(None))
        )
    return df

def cast_columns(df: DataFrame) -> DataFrame:
    for c, dtype in column_type_map.items():
        if c not in df.columns:
            continue
        if isinstance(dtype, DateType):
            df = df.withColumn(c, to_date(col(c), "yyyy-MM-dd"))
        else:
            df = df.withColumn(c, col(c).cast(dtype))
    return df

def process_silver_financials(
    spark: SparkSession,
    csv_path: str,
    silver_base: str
) -> None:
    """
    Bronze→Silver for financials:
      - Load Bronze CSV
      - Clean numerics, enforce interest rate, clean payment behaviour
      - Cast all columns
      - Extract date from filename (bronze_feature_financials_YYYY_MM_DD.csv)
      - Write to silver_base/financials/silver_financials_<YYYY_MM_DD>.parquet
    """
    df = spark.read.csv(csv_path, header=True, inferSchema=False)
    df = clean_numerics(df)
    df = enforce_interest_rate(df)
    df = clean_payment_behaviour(df)
    df = cast_columns(df)

    fname = os.path.basename(csv_path)
    m = re.search(r"(\d{4}_\d{2}_\d{2})", fname)
    if not m:
        raise ValueError(f"Cannot parse date from filename {fname}")
    date_str = m.group(1)

    out_dir = os.path.join(silver_base, "financials", f"silver_financials_{date_str}.parquet")
    df.repartition(1).write.mode("overwrite").parquet(out_dir)
    print(f"Wrote Financials → {out_dir}")


In [50]:
# utils/silver_clickstream.py

import os
import re
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, to_date, when, lit, floor
from pyspark.sql.types import IntegerType, StringType, DateType

# Configuration & rules
column_type_map = {
    **{f"fe_{i}": IntegerType() for i in range(1,21)},
    "Customer_ID":   StringType(),
    "snapshot_date": DateType(),
}

def cast_feature_columns(df: DataFrame) -> DataFrame:
    for c in column_type_map:
        if c.startswith("fe_") and c in df.columns:
            df = df.withColumn(c, col(c).cast(IntegerType()))
    return df

def cast_meta_columns(df: DataFrame) -> DataFrame:
    if "Customer_ID" in df.columns:
        df = df.withColumn("Customer_ID", col("Customer_ID").cast(StringType()))
    if "snapshot_date" in df.columns:
        df = df.withColumn("snapshot_date", to_date(col("snapshot_date"), "yyyy-MM-dd"))
    return df

def validate_features(df: DataFrame) -> None:
    nulls = {c: df.filter(col(c).isNull()).count() for c in column_type_map if c.startswith("fe_")}
    floats= {c: df.filter(col(c).isNotNull() & (col(c)!=floor(col(c)))).count() for c in column_type_map if c.startswith("fe_")}
    print("Feature nulls:", nulls)
    print("Non-integer floats:", floats)

def process_silver_clickstream(
    spark: SparkSession,
    csv_path: str,
    silver_base: str
) -> None:
    """
    Bronze→Silver for clickstream:
      - Load Bronze CSV
      - Cast features & meta
      - Validate feature integrity
      - Extract date from filename (bronze_feature_clickstream_YYYY_MM_DD.csv)
      - Write to silver_base/clickstream/silver_clickstream_<YYYY_MM_DD>.parquet
    """
    df = spark.read.csv(csv_path, header=True, inferSchema=False)
    df = cast_feature_columns(df)
    df = cast_meta_columns(df)
    validate_features(df)

    fname = os.path.basename(csv_path)
    m = re.search(r"(\d{4}_\d{2}_\d{2})", fname)
    if not m:
        raise ValueError(f"Cannot parse date from filename {fname}")
    date_str = m.group(1)

    out_dir = os.path.join(silver_base, "clickstream", f"silver_clickstream_{date_str}.parquet")
    df.repartition(1).write.mode("overwrite").parquet(out_dir)
    print(f"Wrote Clickstream → {out_dir}")


In [None]:
# orchestrate_bronze_to_silver.py

import os
import sys
from pyspark.sql import SparkSession

# make sure utils/ is importable
sys.path.append("utils")
from silver_attributes import process_silver_attributes
from silver_financials import process_silver_financials
from silver_clickstream import process_silver_clickstream

BRONZE = "/app/datamart/bronze"
SILVER = "/app/datamart/silver"

spark = SparkSession.builder.appName("BronzeToSilverOrchestrator").getOrCreate()

# Attributes
for fname in sorted(os.listdir(f"{BRONZE}/attributes")):
    # print(fname)
    if fname.endswith(".csv") and "bronze_features_attributes" in fname:
        print('hello')
        path = f"{BRONZE}/attributes/{fname}"
        print("→ Attr:", fname)
        print('attri', path , SILVER)
        process_silver_attributes(spark, path, SILVER)

# Financials
for fname in sorted(os.listdir(f"{BRONZE}/financials")):
    if fname.endswith(".csv") and "bronze_features_financials" in fname: 
        path = f"{BRONZE}/financials/{fname}"
        print("→ Fin:", fname)
        print('fin', path , SILVER)

        process_silver_financials(spark, path, SILVER)

# Clickstream
for fname in sorted(os.listdir(f"{BRONZE}/clickstream")):
    if fname.endswith(".csv") and "bronze_feature_clickstream" in fname:
        path = f"{BRONZE}/clickstream/{fname}"
        print("→ Click:", fname)
        process_silver_clickstream(spark, path, SILVER)

spark.stop()
print("✅ Bronze→Silver pipeline finished.")


hello
→ Attr: bronze_features_attributes_2023_01_01.csv
attri /app/datamart/bronze/attributes/bronze_features_attributes_2023_01_01.csv /app/datamart/silver
Wrote Attributes → /app/datamart/silver/attributes/silver_attributes_2023_01_01.parquet
hello
→ Attr: bronze_features_attributes_2023_02_01.csv
attri /app/datamart/bronze/attributes/bronze_features_attributes_2023_02_01.csv /app/datamart/silver
Wrote Attributes → /app/datamart/silver/attributes/silver_attributes_2023_02_01.parquet
hello
→ Attr: bronze_features_attributes_2023_03_01.csv
attri /app/datamart/bronze/attributes/bronze_features_attributes_2023_03_01.csv /app/datamart/silver
Wrote Attributes → /app/datamart/silver/attributes/silver_attributes_2023_03_01.parquet
hello
→ Attr: bronze_features_attributes_2023_04_01.csv
attri /app/datamart/bronze/attributes/bronze_features_attributes_2023_04_01.csv /app/datamart/silver
Wrote Attributes → /app/datamart/silver/attributes/silver_attributes_2023_04_01.parquet
hello
→ Attr: bronze