In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
# -------------------------------------------------------------------
# REUSABLE FUNCTION TO FILL NON-BUSINESS NULLS WITH DEFAULT VALUES
# -------------------------------------------------------------------
from pyspark.sql.types import (
    StringType, IntegerType, LongType,
    DoubleType, TimestampType, DateType
)
from pyspark.sql import functions as F

def fill_non_business_nulls(df, business_keys):
    fill_map = {}
    timestamp_cols = []

    for field in df.schema.fields:
        col_name = field.name
        col_type = field.dataType

        if col_name not in business_keys:
            if isinstance(col_type, StringType):
                fill_map[col_name] = "UNKNOWN"

            elif isinstance(col_type, (IntegerType, LongType)):
                fill_map[col_name] = 0

            elif isinstance(col_type, DoubleType):
                fill_map[col_name] = 0.0

            elif isinstance(col_type, (TimestampType, DateType)):
                timestamp_cols.append(col_name)

    # ----------------------------
    # Fill NON-BUSINESS NULLS
    # ----------------------------
    df = df.fillna(fill_map)

    # ------------------------------------------
    # Fill TIMESTAMPS with CURRENT_TIMESTAMP
    # ------------------------------------------
    for col in timestamp_cols:
        df = df.withColumn(
            col,
            F.when(F.col(col).isNull(), F.current_timestamp())
             .otherwise(F.col(col))
        )

    return df


In [0]:
# ------------------------------------------
# ENFORCE SCHEMA AND DEFINE BUSINEES KEYS
# ------------------------------------------
customers_keys = ["customer_id"]
customers_schema = {
    "customer_id": "string",
    "customer_unique_id": "string",
    "customer_zip_code_prefix": "int",
    "customer_city": "string",
    "customer_state": "string"
}

products_keys = ["product_id"]
products_schema = {
    "product_id": "string",
    "product_category_name": "string",
    "product_name_length": "int",
    "product_description_length": "int",
    "product_photos_qty": "int",
    "product_weight_g": "int",
    "product_length_cm": "int",
    "product_height_cm": "int",
    "product_width_cm": "int"
}

order_items_keys = ["order_id","order_item_id"]
order_items_schema = {
    "order_id": "string",
    "order_item_id": "string",
    "product_id": "string",
    "seller_id": "string",
    "shipping_limit_date" : "timestamp",
    "price": "double",
    "freight_value": "double"
}

orders_keys = ["order_id", "customer_id"]
orders_schema = {
    "order_id": "string",
    "customer_id": "string",
    "order_status": "string",
    "order_purchase_timestamp": "timestamp",
    "order_approved_at": "timestamp",
    "order_delivered_carrier_date": "timestamp",
    "order_delivered_customer_date": "timestamp",
    "order_estimated_delivery_date": "timestamp"
}

orders_payments_keys = ["order_id", "payment_sequential"]
orders_payments_schema = {
    "order_id": "string",
    "payment_sequential": "int",
    "payment_type": "string",
    "payment_installments": "int",
    "payment_value": "double"
}

products_keys = ["product_id"]
products_schema = {
    "product_id": "string",
    "product_category_name": "string",
    "product_name_lenght": "int",
    "product_description_lenght": "int",
    "product_photos_qty": "int",
    "product_weight_g": "integer",
    "product_length_cm": "integer",
    "product_height_cm": "integer",
    "product_width_cm": "integer"
}

In [0]:
# --------------------------------------
# REUSEABL FUNCTION TO PROCESS TABLES
# --------------------------------------
from pyspark.sql import functions as F

def process_table(
    table_name,
    bronze_path,
    silver_table,
    schema_dict,
    business_keys,
    partition_col=None,
    zorder_cols=None
):
    print(f"===== Processing {table_name} =====")

    # ----------------------------
    # 1. Read Bronze (PARQUET)
    # ----------------------------
    df = spark.read.parquet(bronze_path)

    # -------------------
    # 2. Cast columns
    # -------------------
    for col, dtype in schema_dict.items():
        df = df.withColumn(col, F.col(col).cast(dtype))

    # -------------------------------
    # 3. Add ingestion timestamp
    # -------------------------------
    df = df.withColumn("ingestion_ts", F.current_timestamp())
    df = df.withColumn("ingestion_date", F.to_date("ingestion_ts"))

    # ------------------------
    # 4. NULL count BEFORE
    # ------------------------
    def count_nulls(df):
        return df.select([
        F.count(F.when(F.col(c).isNull(), c)).alias(c + "_nulls")
        for c in df.columns
    ])
    display(count_nulls(df))

    # ----------------------
    # 5. Dedup & cleanse
    # ----------------------
    def cleanse_business_keys(
        df,
        business_keys
    ):
        # Drop rows where any business key is null
        for key in business_keys:
            df = df.filter(df[key].isNotNull())
        # Drop duplicate rows based on business keys
        df = df.dropDuplicates(business_keys)
        return df

    df = cleanse_business_keys(df, business_keys)
    df = fill_non_business_nulls(df, business_keys)

    # ----------------------------------------
    # 6. Write Silver Delta (Partitioned)
    # ----------------------------------------
    writer = (
        df.write
          .format("delta")
          .mode("overwrite")
    )

    if partition_col:
        writer = writer.partitionBy(partition_col)

    writer.saveAsTable(silver_table)

    # --------------------------
    # 7. OPTIMIZE (Z-ORDER)
    # --------------------------
    if zorder_cols:
        spark.sql(
            f"OPTIMIZE {silver_table} ZORDER BY ({','.join(zorder_cols)})"
        )

    print(f"===== {table_name} Completed =====\n")


In [0]:
# PROCESS TABLES

process_table(
    table_name="customers",
    bronze_path="/Volumes/real_time_projects/ecommerce_historical/lakehouse_vol/bronze/customers",
    silver_table="real_time_projects.ecommerce_historical.customers",
    schema_dict=customers_schema,
    business_keys=customers_keys,
    partition_col="customer_state",
    zorder_cols=customers_keys
)

process_table(
    table_name="orders",
    bronze_path="/Volumes/real_time_projects/ecommerce_historical/lakehouse_vol/bronze/orders",
    silver_table="real_time_projects.ecommerce_historical.orders",
    schema_dict=orders_schema,
    business_keys=orders_keys,
    partition_col="order_status",
    zorder_cols=orders_keys
)   

process_table(
    table_name="products",
    bronze_path="/Volumes/real_time_projects/ecommerce_historical/lakehouse_vol/bronze/products",
    silver_table="real_time_projects.ecommerce_historical.products",
    schema_dict=products_schema,
    business_keys=products_keys,
    partition_col="product_category_name",
    zorder_cols=products_keys
)

process_table(
    table_name="order_items",
    bronze_path="/Volumes/real_time_projects/ecommerce_historical/lakehouse_vol/bronze/order_items",
    silver_table="real_time_projects.ecommerce_historical.order_items",
    schema_dict=order_items_schema,
    business_keys=order_items_keys,
    partition_col="ingestion_date",
    zorder_cols=order_items_keys
)

process_table(
    table_name="order_payments",
    bronze_path="/Volumes/real_time_projects/ecommerce_historical/lakehouse_vol/bronze/payments",
    silver_table="real_time_projects.ecommerce_historical.order_payments",
    schema_dict=orders_payments_schema,
    business_keys=orders_payments_keys,
    partition_col="ingestion_date",
    zorder_cols=orders_payments_keys
)