# Scalable E-commerce Data Pipeline (PySpark)

**Author:** Ivy Maina  
**Focus:** Data Engineering · Distributed Processing · Analytics Pipelines

This notebook demonstrates an end-to-end data engineering pipeline for processing e-commerce event data using a Bronze–Silver–Gold architecture. The pipeline ingests raw event logs, applies data quality and transformation logic, and produces analytics-ready datasets for downstream reporting and decision-making.


## Pipeline Architecture Overview

The pipeline follows a standard **Bronze–Silver–Gold** design pattern:

- **Bronze:** Raw event ingestion with enforced schemas and partitioned storage.
- **Silver:** Data cleaning, validation, and business rule enforcement.
- **Gold:** Aggregated, analytics-ready metrics for business insights.

This layered approach improves data reliability, scalability, and maintainability.


## Raw Data Generation

Synthetic event data is used to simulate large-scale user activity in lieu of production logs.

In [44]:
# Install Pyspark

!pip -q install pyspark==3.5.3 pyarrow pandas

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("ScalableEventPipeline")
         .getOrCreate())

spark


Environment Setup:

Initialized a PySpark environment to support scalable data processing.

In [45]:
# Generate synthetic raw event logs

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import uuid
import os

# Config
NUM_DAYS = 3
EVENTS_PER_DAY = 50_000   # start smaller for speed; later we can increase
SEED = 42

np.random.seed(SEED)

EVENT_TYPES = ["view", "add_to_cart", "purchase", "search"]
EVENT_PROBS = [0.60, 0.20, 0.10, 0.10]
DEVICES = ["mobile", "desktop", "tablet"]
COUNTRIES = ["US", "CA", "UK", "DE", "FR", "IN"]
TRAFFIC_SOURCES = ["organic", "paid", "email", "social", "referral"]

def random_timestamp(day_start: datetime) -> datetime:
    seconds = np.random.randint(0, 86400)
    return day_start + timedelta(seconds=int(seconds))

def generate_events_for_day(day: datetime, n: int) -> pd.DataFrame:
    rows = []
    for _ in range(n):
        event_time = random_timestamp(day)
        event_type = np.random.choice(EVENT_TYPES, p=EVENT_PROBS)

        price = 0.0
        quantity = 0
        if event_type == "purchase":
            price = float(np.round(np.random.uniform(10, 500), 2))
            quantity = int(np.random.randint(1, 5))

        rows.append({
            "event_id": str(uuid.uuid4()),
            "event_time": event_time,
            "user_id": int(np.random.randint(1, 1_000_000)),
            "session_id": str(uuid.uuid4()),
            "event_type": event_type,
            "product_id": int(np.random.randint(1, 50_000)),
            "device_type": str(np.random.choice(DEVICES)),
            "country": str(np.random.choice(COUNTRIES)),
            "traffic_source": str(np.random.choice(TRAFFIC_SOURCES)),
            "price": price,
            "quantity": quantity,
        })
    return pd.DataFrame(rows)

start_date = datetime(2025, 1, 1)
raw_dir = "/content/data/source"
os.makedirs(raw_dir, exist_ok=True)

for i in range(NUM_DAYS):
    day = start_date + timedelta(days=i)
    df_day = generate_events_for_day(day, EVENTS_PER_DAY)
    out_path = f"{raw_dir}/events_{day.strftime('%Y-%m-%d')}.csv"
    df_day.to_csv(out_path, index=False)
    print(f"Wrote {len(df_day):,} rows -> {out_path}")


Wrote 50,000 rows -> /content/data/source/events_2025-01-01.csv
Wrote 50,000 rows -> /content/data/source/events_2025-01-02.csv
Wrote 50,000 rows -> /content/data/source/events_2025-01-03.csv


Raw Event Data Generation:

Generated synthetic e-commerce event logs to simulate real-world user behavior, including views, cart actions, and purchases.

In [46]:
# Confirm files exist
!ls -lh /content/data/source | head


total 20M
-rw-r--r-- 1 root root 6.6M Jan  6 09:31 events_2025-01-01.csv
-rw-r--r-- 1 root root 6.6M Jan  6 09:31 events_2025-01-02.csv
-rw-r--r-- 1 root root 6.6M Jan  6 09:31 events_2025-01-03.csv


In [47]:
#Quick spark read sanity check

raw_df = (spark.read
          .option("header", "true")
          .csv("/content/data/source/*.csv"))

print("Rows:", raw_df.count())
raw_df.show(5, truncate=False)


Rows: 150000
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+
|event_id                            |event_time         |user_id|session_id                          |event_type |product_id|device_type|country|traffic_source|price|quantity|
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+
|40a6697c-2512-4f0e-b2bd-1be29321bd88|2025-01-01 04:23:15|259179 |513c376c-4c64-4357-ab6b-ed73480d6f67|search     |44733     |mobile     |CA     |email         |0.0  |0       |
|841c927c-faa3-4265-a905-55ae816d86b6|2025-01-01 10:19:54|912757 |ebb1e6a7-a67e-4ac6-9b71-0d5250a7d944|view       |44132     |tablet     |IN     |referral      |0.0  |0       |
|2732d3a5-7d16-45e0-ad18-82a319246e6f|2025-01-01 00:12:49|327070 |60c03cbb-ec4f-40f9-9146-fccb2d64dc04

Raw Data Validation:

Raw Data Validation
Loaded the raw CSV files into Spark to verify data volume (150,000 events) and inspect sample records, confirming realistic event timestamps, user actions, and session-level attributes.

## Bronze Layer (Raw Ingestion)

In [48]:
# Define schema and read raw CSV

from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType, TimestampType
)
from pyspark.sql.functions import col, to_date, current_timestamp

bronze_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("event_time", TimestampType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("session_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("product_id", IntegerType(), True),
    StructField("device_type", StringType(), True),
    StructField("country", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
])

raw_path = "/content/data/source/*.csv"

bronze_df = (spark.read
             .option("header", "true")
             .schema(bronze_schema)
             .csv(raw_path))

bronze_df.printSchema()
bronze_df.show(5, truncate=False)
print("Rows:", bronze_df.count())


root
 |-- event_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- device_type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)

+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+
|event_id                            |event_time         |user_id|session_id                          |event_type |product_id|device_type|country|traffic_source|price|quantity|
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+
|40a6

Bronze Layer - Schema Enforcement:

Applied an explicit schema to raw event logs to ensure consistent data types and prevent downstream parsing errors.

In [49]:
# Metadata columns

bronze_df = (bronze_df
             .withColumn("event_date", to_date(col("event_time")))
             .withColumn("ingest_time", current_timestamp()))

bronze_df.select("event_time", "event_date", "ingest_time").show(5, truncate=False)


+-------------------+----------+--------------------------+
|event_time         |event_date|ingest_time               |
+-------------------+----------+--------------------------+
|2025-01-01 04:23:15|2025-01-01|2026-01-06 09:31:45.791495|
|2025-01-01 10:19:54|2025-01-01|2026-01-06 09:31:45.791495|
|2025-01-01 00:12:49|2025-01-01|2026-01-06 09:31:45.791495|
|2025-01-01 23:41:45|2025-01-01|2026-01-06 09:31:45.791495|
|2025-01-01 23:27:58|2025-01-01|2026-01-06 09:31:45.791495|
+-------------------+----------+--------------------------+
only showing top 5 rows



Metadata Enrichment:

Added ingestion metadata (ingest_time) and derived event_date to support auditing and partitioning.

In [50]:
# Bronze as partitioned Parquet

bronze_out = "/content/data/bronze/events"

(bronze_df.write
 .mode("overwrite")
 .partitionBy("event_date")
 .parquet(bronze_out))

print("✅ Bronze layer written to:", bronze_out)


✅ Bronze layer written to: /content/data/bronze/events


Bronze Storage:

Persisted raw events as partitioned Parquet files by event_date for scalable and efficient access.

In [51]:
#Verify partitions exist
!ls -lh /content/data/bronze/events | head -n 20


total 12K
drwxr-xr-x 2 root root 4.0K Jan  6 09:31 event_date=2025-01-01
drwxr-xr-x 2 root root 4.0K Jan  6 09:31 event_date=2025-01-02
drwxr-xr-x 2 root root 4.0K Jan  6 09:31 event_date=2025-01-03
-rw-r--r-- 1 root root    0 Jan  6 09:31 _SUCCESS


Bronze Layer Validation:

Confirmed successful writes by reloading the Bronze dataset and validating record counts.

## Silver Layer (Clean & Trusted)

In [52]:
# Read Bronze data

bronze_path = "/content/data/bronze/events"

bronze_df = spark.read.parquet(bronze_path)
print("Bronze rows:", bronze_df.count())

bronze_df.show(5, truncate=False)


Bronze rows: 150000
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+--------------------------+----------+
|event_id                            |event_time         |user_id|session_id                          |event_type |product_id|device_type|country|traffic_source|price|quantity|ingest_time               |event_date|
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+--------------------------+----------+
|40a6697c-2512-4f0e-b2bd-1be29321bd88|2025-01-01 04:23:15|259179 |513c376c-4c64-4357-ab6b-ed73480d6f67|search     |44733     |mobile     |CA     |email         |0.0  |0       |2026-01-06 09:31:46.301469|2025-01-01|
|841c927c-faa3-4265-a905-55ae816d86b6|2025-01-01 10:19:54|912757 |ebb1e6a7-a67e-4ac6-9b71-0d5250a7d944|view       |44132

Silver Layer Initialization:

Loaded Bronze data as the foundation for data quality checks and cleaning.

In [53]:
# Basic data quality checks

from pyspark.sql.functions import sum as spark_sum

null_counts = bronze_df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in bronze_df.columns
])

null_counts.show(truncate=False)



+--------+----------+-------+----------+----------+----------+-----------+-------+--------------+-----+--------+-----------+----------+
|event_id|event_time|user_id|session_id|event_type|product_id|device_type|country|traffic_source|price|quantity|ingest_time|event_date|
+--------+----------+-------+----------+----------+----------+-----------+-------+--------------+-----+--------+-----------+----------+
|0       |0         |0      |0         |0         |0         |0          |0      |0             |0    |0       |0          |0         |
+--------+----------+-------+----------+----------+----------+-----------+-------+--------------+-----+--------+-----------+----------+



Data Quality Checks:

Evaluated the dataset for missing values and structural issues prior to applying cleaning rules.Null check results show no missing values across columns, since the dataset is synthetically generated.

In [54]:
from pyspark.sql.functions import count

bronze_df.groupBy("event_type").agg(
    count("*").alias("rows")
).show()


+-----------+-----+
| event_type| rows|
+-----------+-----+
|   purchase|15035|
|add_to_cart|29842|
|       view|89977|
|     search|15146|
+-----------+-----+



In [55]:
bronze_df.filter((col("event_type") != "purchase") & ((col("price") != 0) | (col("quantity") != 0))).show(5, truncate=False)

+--------+----------+-------+----------+----------+----------+-----------+-------+--------------+-----+--------+-----------+----------+
|event_id|event_time|user_id|session_id|event_type|product_id|device_type|country|traffic_source|price|quantity|ingest_time|event_date|
+--------+----------+-------+----------+----------+----------+-----------+-------+--------------+-----+--------+-----------+----------+
+--------+----------+-------+----------+----------+----------+-----------+-------+--------------+-----+--------+-----------+----------+



Confirming that non-purchase events have price/quantity = 0.

In [56]:
# Clean and validate (Silver logic)

from pyspark.sql.functions import lower

valid_event_types = ["view", "add_to_cart", "purchase", "search"]

silver_df = (
    bronze_df
    # Drop rows missing critical fields
    .dropna(subset=["event_time", "user_id", "event_type"])

    # Normalize strings
    .withColumn("event_type", lower(col("event_type")))
    .withColumn("device_type", lower(col("device_type")))

    # Keep only valid events
    .filter(col("event_type").isin(valid_event_types))

    # Business rules for purchases
    .filter(
        (col("event_type") != "purchase") |
        ((col("price") > 0) & (col("quantity") > 0))
    )
)

print("Silver rows:", silver_df.count())


Silver rows: 150000


Silver Layer - Cleaning & Validation:

Applied business validation rules, standardized categorical fields, and filtered invalid records to create a trusted dataset.

In [57]:
# Write Silver layer

silver_out = "/content/data/silver/events"

(silver_df.write
 .mode("overwrite")
 .partitionBy("event_date")
 .parquet(silver_out))

print("✅ Silver layer written to:", silver_out)

✅ Silver layer written to: /content/data/silver/events


Silver Storage:

Saved the cleaned dataset as partitioned Parquet files for downstream analytics.

In [58]:
# Verify Silver data

silver_check = spark.read.parquet(silver_out)

silver_check.show(5, truncate=False)
print("Silver rows:", silver_check.count())


+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+--------------------------+----------+
|event_id                            |event_time         |user_id|session_id                          |event_type |product_id|device_type|country|traffic_source|price|quantity|ingest_time               |event_date|
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+--------------------------+----------+
|40a6697c-2512-4f0e-b2bd-1be29321bd88|2025-01-01 04:23:15|259179 |513c376c-4c64-4357-ab6b-ed73480d6f67|search     |44733     |mobile     |CA     |email         |0.0  |0       |2026-01-06 09:31:46.301469|2025-01-01|
|841c927c-faa3-4265-a905-55ae816d86b6|2025-01-01 10:19:54|912757 |ebb1e6a7-a67e-4ac6-9b71-0d5250a7d944|view       |44132     |tablet     |IN

## Gold Layer (Analytics-Ready Business Metrics)

Aggregated the Silver dataset into business-facing metrics to support reporting and decision-making.

In [59]:
# Load Silver data

silver_path = "/content/data/silver/events"

silver_df = spark.read.parquet(silver_path)
print("Silver rows:", silver_df.count())
silver_df.show(5, truncate=False)


Silver rows: 150000
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+--------------------------+----------+
|event_id                            |event_time         |user_id|session_id                          |event_type |product_id|device_type|country|traffic_source|price|quantity|ingest_time               |event_date|
+------------------------------------+-------------------+-------+------------------------------------+-----------+----------+-----------+-------+--------------+-----+--------+--------------------------+----------+
|40a6697c-2512-4f0e-b2bd-1be29321bd88|2025-01-01 04:23:15|259179 |513c376c-4c64-4357-ab6b-ed73480d6f67|search     |44733     |mobile     |CA     |email         |0.0  |0       |2026-01-06 09:31:46.301469|2025-01-01|
|841c927c-faa3-4265-a905-55ae816d86b6|2025-01-01 10:19:54|912757 |ebb1e6a7-a67e-4ac6-9b71-0d5250a7d944|view       |44132

In [60]:
# Daily revenue

from pyspark.sql.functions import sum as spark_sum

daily_revenue = (
    silver_df
    .filter(silver_df.event_type == "purchase")
    .groupBy("event_date")
    .agg(
        spark_sum(silver_df.price * silver_df.quantity).alias("daily_revenue")
    )
    .orderBy("event_date")
)

daily_revenue.show()


+----------+------------------+
|event_date|     daily_revenue|
+----------+------------------+
|2025-01-01| 3237903.660000016|
|2025-01-02|3141487.8099999996|
|2025-01-03|3143526.4500000067|
+----------+------------------+



Daily Revenue:

Aggregated purchase events to compute total daily revenue.
Revenue remains relatively stable across the three days, indicating consistent purchasing behavior in the simulated dataset.

In [61]:
# Daily conversion rate

from pyspark.sql.functions import countDistinct, count

daily_conversion = (
    silver_df
    .groupBy("event_date")
    .agg(
        countDistinct("session_id").alias("total_sessions"),
        countDistinct(
            silver_df.filter(silver_df.event_type == "purchase")["session_id"]
        ).alias("converted_sessions")
    )
    .withColumn(
        "conversion_rate",
        col("converted_sessions") / col("total_sessions")
    )
    .orderBy("event_date")
)

daily_conversion.show()


+----------+--------------+------------------+---------------+
|event_date|total_sessions|converted_sessions|conversion_rate|
+----------+--------------+------------------+---------------+
|2025-01-01|         50000|             50000|            1.0|
|2025-01-02|         50000|             50000|            1.0|
|2025-01-03|         50000|             50000|            1.0|
+----------+--------------+------------------+---------------+



Daily Conversion Rate:

Computed daily session-to-purchase conversion rates based on unique sessions.
All sessions convert in this synthetic dataset by design, resulting in a 100% conversion rate across days. In a production setting, this metric would typically be much lower and is included here to demonstrate the aggregation logic rather than realistic performance.

In [62]:
# Revenue by device

revenue_by_device = (
    silver_df
    .filter(silver_df.event_type == "purchase")
    .groupBy("device_type")
    .agg(
        spark_sum(silver_df.price * silver_df.quantity).alias("revenue")
    )
    .orderBy(col("revenue").desc())
)

revenue_by_device.show()


+-----------+------------------+
|device_type|           revenue|
+-----------+------------------+
|     tablet| 3213153.719999999|
|    desktop|3183652.1099999947|
|     mobile|3126112.0900000012|
+-----------+------------------+



Revenue by Device:

Aggregated purchase revenue by device type to compare platform performance.
Tablet and desktop users generate slightly higher revenue than mobile users in the simulated data, suggesting potential differences in purchasing behavior across devices.

In [63]:
# Conversion by traffic source

conversion_by_source = (
    silver_df
    .groupBy("traffic_source")
    .agg(
        countDistinct("session_id").alias("total_sessions"),
        countDistinct(
            silver_df.filter(silver_df.event_type == "purchase")["session_id"]
        ).alias("converted_sessions")
    )
    .withColumn(
        "conversion_rate",
        col("converted_sessions") / col("total_sessions")
    )
    .orderBy(col("conversion_rate").desc())
)

conversion_by_source.show()


+--------------+--------------+------------------+---------------+
|traffic_source|total_sessions|converted_sessions|conversion_rate|
+--------------+--------------+------------------+---------------+
|      referral|         30005|             30005|            1.0|
|         email|         30012|             30012|            1.0|
|       organic|         30125|             30125|            1.0|
|        social|         30083|             30083|            1.0|
|          paid|         29775|             29775|            1.0|
+--------------+--------------+------------------+---------------+



Conversion by Traffic Source:

Computed session-level conversion rates by traffic source to evaluate acquisition channel performance.
All channels show a 100% conversion rate in this synthetic dataset, reflecting the controlled data generation process rather than real-world user behavior.

In [64]:
gold_path = "/content/data/gold"

daily_revenue.write.mode("overwrite").parquet(f"{gold_path}/daily_revenue")
daily_conversion.write.mode("overwrite").parquet(f"{gold_path}/daily_conversion")
revenue_by_device.write.mode("overwrite").parquet(f"{gold_path}/revenue_by_device")
conversion_by_source.write.mode("overwrite").parquet(f"{gold_path}/conversion_by_source")

print("Gold tables written")


Gold tables written


Gold Layer Summary:

Produced analytics-ready tables including daily revenue, conversion rates, and performance breakdowns by device and traffic source. These outputs demonstrate how cleaned event data can be transformed into business-facing metrics suitable for reporting and decision-making.

## Key Insights

**User Activity & Revenue Stability:**

Daily revenue remains relatively stable across the observed period, indicating consistent purchasing behavior in the simulated event data.

**Device-Level Performance:**

Tablet and desktop users generate slightly higher revenue than mobile users, suggesting potential differences in purchasing behavior across device types.

**Traffic Source Performance:**

All traffic sources exhibit identical conversion rates due to the controlled nature of the synthetic dataset. In a real-world scenario, this analysis would help identify high-performing acquisition channels.

**Pipeline Reliability:**

The Bronze–Silver–Gold pipeline successfully processes raw event logs into analytics-ready tables while enforcing schema consistency, data quality rules, and partitioning strategies.

## Recommendations & Next Steps

**Use Realistic Conversion Modeling:**

Introduce more realistic conversion behavior (e.g., lower purchase probabilities) to better reflect real-world user funnels and improve metric realism.

**Extend Data Quality Checks:**

Add anomaly detection, late-arriving data handling, and record quarantining to strengthen pipeline robustness.

**Support Incremental Processing:**

Modify the pipeline to support incremental daily ingestion rather than full overwrites, improving scalability and efficiency.

**Production Integration:**

In a production setting, raw events would be ingested from streaming platforms (e.g., Kafka) or cloud storage, with orchestration handled by tools like Airflow.