Spark session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Bronze to Gold Complex Sales Pipeline") \
    .getOrCreate()


Imports

In [None]:
from pyspark.sql.functions import (
    col, when, to_timestamp, row_number, sum, count,
    avg, max, min, current_timestamp, datediff
)
from pyspark.sql.window import Window


Read BRONZE data

In [None]:
bronze_path = "s3://dataplatform-raw-data-prod/sales/"

bronze_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(bronze_path)

bronze_df.show()


Standardize schema

In [None]:
bronze_std_df = bronze_df \
    .withColumn("amount", col("amount").cast("double")) \
    .withColumn("order_timestamp", to_timestamp(col("order_time"))) \
    .withColumn("ingestion_timestamp", current_timestamp())


Data quality filtering

In [None]:
quality_df = bronze_std_df.filter(
    col("order_id").isNotNull() &
    col("customer_id").isNotNull() &
    col("amount").isNotNull() &
    (col("amount") > 0)
)


Deduplication (window function)

In [None]:
dedup_window = Window.partitionBy("order_id") \
    .orderBy(col("order_timestamp").desc())

dedup_df = quality_df \
    .withColumn("row_num", row_number().over(dedup_window)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")


Business enrichment

In [None]:
enriched_df = dedup_df \
    .withColumn(
        "order_value_category",
        when(col("amount") >= 500, "HIGH")
        .when(col("amount") >= 200, "MEDIUM")
        .otherwise("LOW")
    ) \
    .withColumn(
        "order_age_days",
        datediff(current_timestamp(), col("order_timestamp"))
    )


Write SILVER

In [None]:
silver_path = "s3://dataplatform-processed-data-prod/sales/"

enriched_df.write \
    .mode("overwrite") \
    .parquet(silver_path)


GOLD aggregations

In [None]:
gold_df = enriched_df.groupBy("customer_id").agg(
    count("order_id").alias("total_orders"),
    sum("amount").alias("total_spend"),
    avg("amount").alias("avg_order_value"),
    max("amount").alias("max_order_value"),
    min("amount").alias("min_order_value")
)


Customer segmentation

In [None]:
segmented_df = gold_df.withColumn(
    "customer_segment",
    when(col("total_spend") >= 2000, "PLATINUM")
    .when(col("total_spend") >= 1000, "GOLD")
    .when(col("total_spend") >= 500, "SILVER")
    .otherwise("BRONZE")
)


Write GOLD

In [None]:
gold_path = "s3://dataplatform-curated-data-prod/customer_sales_summary/"

segmented_df.write \
    .mode("overwrite") \
    .parquet(gold_path)


Data quality metrics

In [None]:
metrics_df = spark.createDataFrame(
    [
        ("bronze_rows", bronze_df.count()),
        ("after_quality_filter", quality_df.count()),
        ("after_deduplication", dedup_df.count()),
        ("gold_customers", segmented_df.count())
    ],
    ["metric_name", "metric_value"]
)

metrics_df.show()
