## The Look E-commerce Data Warehouse Project

### Project Presentation
**Google Slides:** https://docs.google.com/presentation/d/14zteju0_U0E5X23eHGvP6SyPlDVBveSqmVO--eiN6kM/edit?usp=sharing

### Project Overview
This notebook contains the complete ETL pipeline for The Look e-commerce data warehouse implementation.

**A. Dataset Access**

The dataset is sourced from Google BigQuery's public dataset: bigquery-public-data.thelook_ecommerce

**B. Data Modeling**

Business Process Analysis

Core Business Story:

The Look is an e-commerce platform selling fashion products globally. The business needs to analyze:

- Sales performance across products, customers, and time periods
- Customer purchasing behavior and demographics
- Product performance and inventory insights
- Geographic sales distribution
- Order fulfillment and delivery patterns

Business Process Flow

Customer browses products → Adds to cart → Places order → Order fulfillment → Delivery
Each step generates valuable data points

Key Business Questions:

- What are our top-selling products by revenue and quantity?
- Which customer segments generate the most revenue?
- How do sales trend over time (daily, monthly, yearly)?
- What's our order fulfillment performance?
- Which geographic regions perform best?

Data Warehouse Design
**Schema Type: Star Schema**

1 Fact Table: fact_sales

5 Dimension Tables: dim_products, dim_users, dim_orders, dim_date, dim_distribution_centers

**Fact Table:**

fact_sales - Grain: One row per order item

Dimension Tables:

- dim_products - Product information
- dim_users - Customer information
- dim_orders - Orders information
- dim_date - Time dimension
- dim_distribution_centers - Distribution center details

Structure:

- Keys: order_id (FK), user_id (FK), product_id (FK), distribution_center_id (FK), date_id (FK)
- Measures: quantity, revenue, total_cost, profit
  
**Dimension Tables Structure**

- dim_products:
|-- product_id (PK)
 |-- product_name
 |-- category
 |-- brand
 |-- department
 |-- product_price
 |-- cost

- dim_users:
|-- user_id (PK)
 |-- first_name
 |-- last_name
 |-- email
 |-- age
 |-- gender
 |-- city
 |-- state
 |-- country
 |-- postal_code
 |-- traffic_source
 |-- created_at

- dim_orders:
|-- order_id (PK)
 |-- user_id
 |-- order_date
 |-- quantity
 |-- status
 |-- shipped_at
 |-- delivered_at
 |-- returned_at

- dim_distribution_centers:
|-- distribution_center_id (PK)
 |-- name
 |-- latitude
 |-- longitude

- dim_date:
|-- date_id (PK)
 |-- date
 |-- year
 |-- quarter
 |-- month
 |-- week
 |-- day
 |-- day_of_week
 |-- day_name
 |-- month_name
 |-- is_weekend


And then we have:

**C. Extract - BigQuery to PySpark DataFrames** 

**D. Transform - PySpark Data Processing** 

**E. Load - Load Transformed Data to PostgreSQL (DDL Script linked in folder)**

### 0. SETUP AND INITIALIZATION

### I. EXTRACT: BigQuery TO PYSPARK DATAFRAMES 

In [1]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from google.cloud import bigquery
import os

In [2]:
# Initialize Spark Session 
spark = (
    SparkSession.builder
    .appName("TheLook_DataWarehouse_ETL")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config(
        "spark.jars.packages",
        "org.postgresql:postgresql:42.7.3"
    )
    .getOrCreate()
)

print(f"✓ Spark Session initialized | Spark Version: {spark.version}")

✓ Spark Session initialized | Spark Version: 4.1.1


In [3]:
# Install Dependencies
!pip install pyspark
!pip install google-cloud-bigquery
!pip install pandas-gbq



In [4]:
# Setup Google Cloud Credentials
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/jovyan/service_account.json.json'
client = bigquery.Client()

In [5]:
import os
os.path.exists("/home/jovyan/service_account.json.json")

True

#### I. 1 fact_sales table

In [6]:
# Fact Table
fact_sales = """
SELECT
  -- Foreign Keys
  oi.id AS order_item_id,                     
  oi.order_id,
  o.user_id,
  oi.product_id,
  ii.product_distribution_center_id AS distribution_center_id,

  -- Date Key (surrogate)
  CAST(FORMAT_DATE('%Y%m%d', DATE(o.created_at)) AS INT64) AS date_id,

  -- MEASURES
  oi.sale_price,
  p.cost,
  1 AS quantity,
  oi.sale_price * 1 AS revenue,
  p.cost * 1 AS total_cost,
  (oi.sale_price - p.cost) AS profit

FROM `bigquery-public-data.thelook_ecommerce.order_items` oi
JOIN `bigquery-public-data.thelook_ecommerce.orders` o
  ON oi.order_id = o.order_id
JOIN `bigquery-public-data.thelook_ecommerce.inventory_items` ii
  ON oi.inventory_item_id = ii.id
JOIN `bigquery-public-data.thelook_ecommerce.products` p
  ON oi.product_id = p.id
"""

# BigQuery → Pandas
df_pd = client.query(fact_sales).to_dataframe()
df_pd.head()
df_pd.info()



<class 'pandas.core.frame.DataFrame'>
RangeIndex: 181367 entries, 0 to 181366
Data columns (total 12 columns):
 #   Column                  Non-Null Count   Dtype  
---  ------                  --------------   -----  
 0   order_item_id           181367 non-null  Int64  
 1   order_id                181367 non-null  Int64  
 2   user_id                 181367 non-null  Int64  
 3   product_id              181367 non-null  Int64  
 4   distribution_center_id  181367 non-null  Int64  
 5   date_id                 181367 non-null  Int64  
 6   sale_price              181367 non-null  float64
 7   cost                    181367 non-null  float64
 8   quantity                181367 non-null  Int64  
 9   revenue                 181367 non-null  float64
 10  total_cost              181367 non-null  float64
 11  profit                  181367 non-null  float64
dtypes: Int64(7), float64(5)
memory usage: 17.8 MB


In [7]:
# Pandas → Spark
fact_sales_raw = spark.createDataFrame(df_pd)

fact_sales_raw.show(5)
fact_sales_raw.printSchema()

+-------------+--------+-------+----------+----------------------+--------+------------------+------------------+--------+------------------+------------------+--------------------+
|order_item_id|order_id|user_id|product_id|distribution_center_id| date_id|        sale_price|              cost|quantity|           revenue|        total_cost|              profit|
+-------------+--------+-------+----------+----------------------+--------+------------------+------------------+--------+------------------+------------------+--------------------+
|       114428|   78746|  63006|     14235|                     1|20250519|0.0199999995529651|0.0082999997779726|       1|0.0199999995529651|0.0082999997779726|0.011699999774992502|
|        78796|   54222|  43125|     14235|                     1|20211007|0.0199999995529651|0.0082999997779726|       1|0.0199999995529651|0.0082999997779726|0.011699999774992502|
|       105519|   72603|  57952|     14235|                     1|20240713|0.0199999995529

#### I. 2 dim_users table

In [8]:
# Dim User
dim_users = """
SELECT
  id AS user_id,
  first_name,
  last_name,
  email,
  age,
  gender,
  city,
  state,
  country,
  postal_code,
  traffic_source,
  created_at
FROM `bigquery-public-data.thelook_ecommerce.users`
"""

df_users_pd = client.query(dim_users).to_dataframe()
df_users_pd.head()



Unnamed: 0,user_id,first_name,last_name,email,age,gender,city,state,country,postal_code,traffic_source,created_at
0,14146,Theresa,White,theresawhite@example.com,18,F,,Acre,Brasil,69980-000,Search,2019-11-10 12:24:00+00:00
1,38373,Laurie,Johnson,lauriejohnson@example.com,44,F,,Acre,Brasil,69980-000,Email,2023-01-31 13:54:00+00:00
2,53337,Jacqueline,Duarte,jacquelineduarte@example.net,43,F,,Acre,Brasil,69980-000,Search,2025-01-24 02:43:00+00:00
3,2816,Vickie,Jones,vickiejones@example.org,50,F,,Acre,Brasil,69980-000,Search,2019-09-29 14:42:00+00:00
4,44001,Hannah,Stanton,hannahstanton@example.net,12,F,,Acre,Brasil,69980-000,Search,2025-04-27 11:25:00+00:00


In [9]:
# Pandas → Spark
dim_users_raw = spark.createDataFrame(df_users_pd)
dim_users_raw.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- created_at: timestamp (nullable = true)



#### I. 3 dim_products table

In [10]:
# Dim Products
dim_products = """
SELECT
  id AS product_id,
  name AS product_name,
  category,
  brand,
  department,
  retail_price AS sale_price,
  cost
FROM `bigquery-public-data.thelook_ecommerce.products`
"""

df_products_pd = client.query(dim_products).to_dataframe()
df_products_pd.head()



Unnamed: 0,product_id,product_name,category,brand,department,sale_price,cost
0,13842,Low Profile Dyed Cotton Twill Cap - Navy W39S55D,Accessories,MG,Women,6.25,2.51875
1,13928,Low Profile Dyed Cotton Twill Cap - Putty W39S55D,Accessories,MG,Women,5.95,2.33835
2,14115,Enzyme Regular Solid Army Caps-Black W35S45D,Accessories,MG,Women,10.99,4.87956
3,14157,Enzyme Regular Solid Army Caps-Olive W35S45D (...,Accessories,MG,Women,10.99,4.64877
4,14273,Washed Canvas Ivy Cap - Black W11S64C,Accessories,MG,Women,15.99,6.50793


In [11]:
# Pandas → Spark
dim_products_raw = spark.createDataFrame(df_products_pd)
dim_products_raw.printSchema()

root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- department: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- cost: double (nullable = true)



#### I. 4 dim_orders table

In [12]:
# Dim Orders
dim_orders = """
SELECT
  order_id AS order_id,
  user_id,
  DATE(created_at) AS order_date,
  num_of_item AS quantity,
  status,
  shipped_at,
  delivered_at,
  returned_at,
FROM `bigquery-public-data.thelook_ecommerce.orders`
"""

df_orders_pd = client.query(dim_orders).to_dataframe()
df_orders_pd.head()



Unnamed: 0,order_id,user_id,order_date,quantity,status,shipped_at,delivered_at,returned_at
0,5,7,2025-12-28,1,Cancelled,NaT,NaT,NaT
1,22,23,2025-02-19,1,Cancelled,NaT,NaT,NaT
2,31,32,2023-07-03,2,Cancelled,NaT,NaT,NaT
3,67,59,2022-03-18,1,Cancelled,NaT,NaT,NaT
4,80,70,2025-12-16,1,Cancelled,NaT,NaT,NaT


In [13]:
# Pandas → Spark
dim_orders_raw = spark.createDataFrame(df_orders_pd)
dim_orders_raw.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- order_date: date (nullable = true)
 |-- quantity: long (nullable = true)
 |-- status: string (nullable = true)
 |-- shipped_at: timestamp (nullable = true)
 |-- delivered_at: timestamp (nullable = true)
 |-- returned_at: timestamp (nullable = true)



#### I. 5 dim_distribution_centers table

In [14]:
# Dim Distribution Center
dim_distribution_center = """
SELECT
  id AS distribution_center_id,
  name,
  latitude,
  longitude
FROM `bigquery-public-data.thelook_ecommerce.distribution_centers`
ORDER BY distribution_center_id
"""

df_distribution_center_pd = client.query(dim_distribution_center).to_dataframe()
df_distribution_center_pd.head()



Unnamed: 0,distribution_center_id,name,latitude,longitude
0,1,Memphis TN,35.1174,-89.9711
1,2,Chicago IL,41.8369,-87.6847
2,3,Houston TX,29.7604,-95.3698
3,4,Los Angeles CA,34.05,-118.25
4,5,New Orleans LA,29.95,-90.0667


In [15]:
# Pandas → Spark
dim_distribution_centers_raw = spark.createDataFrame(df_distribution_center_pd)
dim_distribution_centers_raw.printSchema()

root
 |-- distribution_center_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



#### I. 6 dim_date table

In [16]:
dim_date = """
SELECT DISTINCT
  DATE(created_at) AS date,
  EXTRACT(YEAR FROM created_at) AS year,
  EXTRACT(QUARTER FROM created_at) AS quarter,
  EXTRACT(MONTH FROM created_at) AS month,
  EXTRACT(WEEK FROM created_at) AS week,
  EXTRACT(DAY FROM created_at) AS day,
  EXTRACT(DAYOFWEEK FROM created_at) AS day_of_week,
  FORMAT_DATE('%A', DATE(created_at)) AS day_name,
  FORMAT_DATE('%B', DATE(created_at)) AS month_name,
  CASE
    WHEN EXTRACT(DAYOFWEEK FROM created_at) IN (1, 7) THEN TRUE
    ELSE FALSE
  END AS is_weekend
FROM `bigquery-public-data.thelook_ecommerce.orders`
"""
            
df_date_pd = client.query(dim_date).to_dataframe()
df_date_pd.head()



Unnamed: 0,date,year,quarter,month,week,day,day_of_week,day_name,month_name,is_weekend
0,2025-12-28,2025,4,12,52,28,1,Sunday,December,True
1,2025-02-19,2025,1,2,7,19,4,Wednesday,February,False
2,2023-07-03,2023,3,7,27,3,2,Monday,July,False
3,2022-03-18,2022,1,3,11,18,6,Friday,March,False
4,2025-12-16,2025,4,12,50,16,3,Tuesday,December,False


In [17]:
# Pandas → Spark
dim_date_raw = spark.createDataFrame(df_date_pd)
dim_date_raw.printSchema()

root
 |-- date: date (nullable = true)
 |-- year: long (nullable = true)
 |-- quarter: long (nullable = true)
 |-- month: long (nullable = true)
 |-- week: long (nullable = true)
 |-- day: long (nullable = true)
 |-- day_of_week: long (nullable = true)
 |-- day_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)



### II. TRANSFORM

In [18]:
"""
# Load Dimension Tables first (to satisfy foreign key constraints)
dim_products_raw.write.jdbc(
    url=jdbc_url,
    table="dim_products",
    mode="append",
    properties=db_properties
)
print("✓ dim_products loaded")

dim_users_raw.write.jdbc(
    url=jdbc_url,
    table="dim_users",
    mode="append",
    properties=db_properties
)
print("✓ dim_users loaded")

dim_orders_raw.write.jdbc(
    url=jdbc_url,
    table="dim_users",
    mode="append",
    properties=db_properties
)
print("✓ dim_orders loaded")

dim_distribution_centers_raw.write.jdbc(
    url=jdbc_url,
    table="dim_distribution_centers",
    mode="append",
    properties=db_properties
)
print("✓ dim_distribution_centers loaded")

dim_date_raw.write.jdbc(
    url=jdbc_url,
    table="dim_date",
    mode="append",
    properties=db_properties
)
print("✓ dim_date loaded")

# Load Fact Table last
fact_sales_raw.write.jdbc(
    url=jdbc_url,
    table="fact_orders",
    mode="append",
    properties=db_properties
)
print("✓ fact_sales loaded")

print("\n✓ All data loaded to PostgreSQL successfully!")
"""

'\n# Load Dimension Tables first (to satisfy foreign key constraints)\ndim_products_raw.write.jdbc(\n    url=jdbc_url,\n    table="dim_products",\n    mode="append",\n    properties=db_properties\n)\nprint("✓ dim_products loaded")\n\ndim_users_raw.write.jdbc(\n    url=jdbc_url,\n    table="dim_users",\n    mode="append",\n    properties=db_properties\n)\nprint("✓ dim_users loaded")\n\ndim_orders_raw.write.jdbc(\n    url=jdbc_url,\n    table="dim_users",\n    mode="append",\n    properties=db_properties\n)\nprint("✓ dim_orders loaded")\n\ndim_distribution_centers_raw.write.jdbc(\n    url=jdbc_url,\n    table="dim_distribution_centers",\n    mode="append",\n    properties=db_properties\n)\nprint("✓ dim_distribution_centers loaded")\n\ndim_date_raw.write.jdbc(\n    url=jdbc_url,\n    table="dim_date",\n    mode="append",\n    properties=db_properties\n)\nprint("✓ dim_date loaded")\n\n# Load Fact Table last\nfact_sales_raw.write.jdbc(\n    url=jdbc_url,\n    table="fact_orders",\n    mode=

#### II. 1 TRANSFORM: DIM_PRODUCTS

In [19]:
print("\n=== Processing dim_products ===")

# Cache dataframe to avoid multiple scans
dim_products_raw.cache()

raw_count = dim_products_raw.count()
print(f"Raw records: {raw_count}")
print("Original Schema:")
dim_products_raw.printSchema()

# -------------------------------
# Transform: Data Cleaning & Typing
# -------------------------------
dim_products = (
    dim_products_raw
    .select(
        col("product_id").cast(IntegerType()).alias("product_id"),
        trim(col("product_name")).cast(StringType()).alias("product_name"),
        col("sale_price").cast(DecimalType(10, 2)).alias("product_price"),
        trim(col("category")).cast(StringType()).alias("category"),
        trim(col("brand")).cast(StringType()).alias("brand"),
        trim(col("department")).cast(StringType()).alias("department"),
        col("cost").cast(DecimalType(10, 2)).alias("cost"),
    )
    .dropna(subset=["product_id", "product_name"])
    .dropDuplicates(["product_id"])
    .fillna({
        "category": "Unknown",
        "brand": "Unknown",
        "department": "Unknown"
    })
)

clean_count = dim_products.count()

print(f"Cleaned records: {clean_count}")
print(f"Duplicates removed: {raw_count - clean_count}")

print("\nTransformed Schema:")
dim_products.printSchema()

print("\nSample Data:")
dim_products.show(5, truncate=False)


=== Processing dim_products ===
Raw records: 29120
Original Schema:
root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- department: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- cost: double (nullable = true)

Cleaned records: 29118
Duplicates removed: 2

Transformed Schema:
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: decimal(10,2) (nullable = true)
 |-- category: string (nullable = false)
 |-- brand: string (nullable = false)
 |-- department: string (nullable = false)
 |-- cost: decimal(10,2) (nullable = true)


Sample Data:
+----------+------------------------------------------------+-------------+-----------+------------------+----------+-----+
|product_id|product_name                                    |product_price|category   |brand             |department|cost |
+-----

#### II. 2 TRANSFORM: DIM_USERS

In [20]:
print("\n=== Processing dim_users ===")

# Cache to avoid repeated scans
dim_users_raw.cache()

raw_count = dim_users_raw.count()
print(f"Raw records: {raw_count}")
print("Original Schema:")
dim_users_raw.printSchema()

# -------------------------------
# Transform: Cleaning & Typing
# -------------------------------
dim_users = (
    dim_users_raw
    .select(
        col("user_id").cast(IntegerType()).alias("user_id"),
        trim(col("first_name")).cast(StringType()).alias("first_name"),
        trim(col("last_name")).cast(StringType()).alias("last_name"),
        lower(trim(col("email"))).cast(StringType()).alias("email"),
        col("age").cast(IntegerType()).alias("age"),
        trim(col("gender")).cast(StringType()).alias("gender"),
        trim(col("city")).cast(StringType()).alias("city"),
        trim(col("state")).cast(StringType()).alias("state"),
        trim(col("country")).cast(StringType()).alias("country"),
        trim(col("postal_code")).cast(StringType()).alias("postal_code"),
        trim(col("traffic_source")).cast(StringType()).alias("traffic_source"),
        to_timestamp(col("created_at")).alias("created_at")
    )
    .dropna(subset=["user_id"])
    .dropDuplicates(["user_id"])
    .fillna({
        "city": "Unknown",
        "state": "Unknown",
        "country": "Unknown",
        "traffic_source": "Unknown"
    })
)

# -------------------------------
# Data Quality Rule: Age validation
# -------------------------------
dim_users = dim_users.filter(
    col("age").isNull() | ((col("age") >= 0) & (col("age") <= 120))
)

clean_count = dim_users.count()

print(f"Cleaned records: {clean_count}")
print(f"Duplicates removed: {raw_count - clean_count}")

print("\nTransformed Schema:")
dim_users.printSchema()

print("\nSample Data:")
dim_users.show(5, truncate=False)

print("\nAge Statistics:")
dim_users.select("age").describe().show()


=== Processing dim_users ===
Raw records: 100000
Original Schema:
root
 |-- user_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- created_at: timestamp (nullable = true)

Cleaned records: 100000
Duplicates removed: 0

Transformed Schema:
root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- country: string (nullable = false)
 |-- postal_code: string (nullab

#### II. 3 TRANSFORM: DIM_ORDERS

In [21]:
# Cache to avoid multiple scans
dim_orders_raw.cache()

raw_count = dim_orders_raw.count()
print(f"Raw records: {raw_count}")
print("Original Schema:")
dim_orders_raw.printSchema()

dim_orders = (
    dim_orders_raw
    .select(
        col("order_id").cast(IntegerType()).alias("order_id"),
        col("user_id").cast(IntegerType()).alias("user_id"),
        to_date(col("order_date")).alias("order_date"),
        col("status").cast(StringType()).alias("status"),
        to_timestamp(col("shipped_at")).alias("shipped_at"),
        to_timestamp(col("delivered_at")).alias("delivered_at"),
        to_timestamp(col("returned_at")).alias("returned_at")
    )
    .dropna(subset=["order_id"])
    .dropDuplicates(["order_id"])
)

clean_count = dim_orders.count()

print(f"Cleaned records: {clean_count}")
print(f"Dropped records: {raw_count - clean_count}")

print("\nTransformed Schema:")
dim_orders.printSchema()

print("\nSample Data:")
dim_orders.show(5, truncate=False)


Raw records: 125040
Original Schema:
root
 |-- order_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- order_date: date (nullable = true)
 |-- quantity: long (nullable = true)
 |-- status: string (nullable = true)
 |-- shipped_at: timestamp (nullable = true)
 |-- delivered_at: timestamp (nullable = true)
 |-- returned_at: timestamp (nullable = true)

Cleaned records: 125040
Dropped records: 0

Transformed Schema:
root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- shipped_at: timestamp (nullable = true)
 |-- delivered_at: timestamp (nullable = true)
 |-- returned_at: timestamp (nullable = true)


Sample Data:
+--------+-------+----------+----------+-------------------+-------------------+-----------+
|order_id|user_id|order_date|status    |shipped_at         |delivered_at       |returned_at|
+--------+-------+----------+----------+-------------------+-

#### II. 3 TRANSFORM: DIM_DISTRIBUTION_CENTERS

In [22]:
# Cache to avoid multiple scans
dim_distribution_centers_raw.cache()

raw_count = dim_distribution_centers_raw.count()
print(f"Raw records: {raw_count}")
print("Original Schema:")
dim_distribution_centers_raw.printSchema()

# -------------------------------
# Transform: Cleaning & Typing
# -------------------------------
dim_distribution_centers = (
    dim_distribution_centers_raw
    .select(
        col("distribution_center_id").cast(IntegerType()).alias("distribution_center_id"),
        trim(col("name")).cast(StringType()).alias("name"),
        col("latitude").cast(DecimalType(9, 6)).alias("latitude"),
        col("longitude").cast(DecimalType(9, 6)).alias("longitude")
    )
    .dropna(subset=["distribution_center_id"])
    .dropDuplicates(["distribution_center_id"])
)

clean_count = dim_distribution_centers.count()

print(f"Cleaned records: {clean_count}")
print(f"Duplicates removed: {raw_count - clean_count}")

print("\nTransformed Schema:")
dim_distribution_centers.printSchema()

print("\nSample Data:")
dim_distribution_centers.show(5, truncate=False)

Raw records: 10
Original Schema:
root
 |-- distribution_center_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

Cleaned records: 10
Duplicates removed: 0

Transformed Schema:
root
 |-- distribution_center_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: decimal(9,6) (nullable = true)
 |-- longitude: decimal(9,6) (nullable = true)


Sample Data:
+----------------------+--------------+---------+-----------+
|distribution_center_id|name          |latitude |longitude  |
+----------------------+--------------+---------+-----------+
|1                     |Memphis TN    |35.117400|-89.971100 |
|2                     |Chicago IL    |41.836900|-87.684700 |
|3                     |Houston TX    |29.760400|-95.369800 |
|4                     |Los Angeles CA|34.050000|-118.250000|
|5                     |New Orleans LA|29.950000|-90.066700 |
+----------------------+-----

#### II. 4 TRANSFORM: DIM_DATE

In [23]:
print("\n=== Processing dim_date ===")
# Cache to avoid repeated scans
dim_date_raw.cache()
raw_count = dim_date_raw.count()
print(f"Raw records: {raw_count}")
print("Original Schema:")
dim_date_raw.printSchema()

# -------------------------------
# Transform: Date Dimension
# -------------------------------
dim_date = (
    dim_date_raw
    .select(
        # Date key (YYYYMMDD)
        date_format(to_date(col("date")), "yyyyMMdd")
            .cast(IntegerType())
            .alias("date_id"),

        to_date(col("date")).alias("date"),

        col("year").cast(IntegerType()).alias("year"),
        col("quarter").cast(IntegerType()).alias("quarter"),
        col("month").cast(IntegerType()).alias("month"),
        col("week").cast(IntegerType()).alias("week"),
        col("day").cast(IntegerType()).alias("day"),
        col("day_of_week").cast(IntegerType()).alias("day_of_week"),
        col("day_name").cast(StringType()).alias("day_name"),
        col("month_name").cast(StringType()).alias("month_name"),
        col("is_weekend").cast(BooleanType()).alias("is_weekend")
    )
    .dropna(subset=["date"])
    .dropDuplicates(["date"])
)

print("=== TRANSFORMED SCHEMA ===")
dim_date.printSchema()

print("=== SAMPLE DATA ===")
dim_date.show(5, truncate=False)


=== Processing dim_date ===
Raw records: 2522
Original Schema:
root
 |-- date: date (nullable = true)
 |-- year: long (nullable = true)
 |-- quarter: long (nullable = true)
 |-- month: long (nullable = true)
 |-- week: long (nullable = true)
 |-- day: long (nullable = true)
 |-- day_of_week: long (nullable = true)
 |-- day_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)

=== TRANSFORMED SCHEMA ===
root
 |-- date_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)

=== SAMPLE DATA ===
+--------+----------+----+-------+-----+----+---+-----------+--------+---

#### II. 5 TRANSFORM: FACT_SALES

In [26]:
print("\n=== Processing fact_sales ===")

# --------------------------------
# Cache raw data
# --------------------------------
fact_sales_raw.cache()

raw_count = fact_sales_raw.count()
print(f"Raw records: {raw_count}")

print("\nOriginal Schema:")
fact_sales_raw.printSchema()


fact_sales = (
    fact_sales_raw
    .dropna(subset=[
        "order_id",
        "user_id",
        "product_id",
        "date_id",
        "sale_price",
        "cost"
    ])
    .select(
        # Foreign Keys
        col("order_id").cast(IntegerType()).alias("order_id"),
        col("user_id").cast(IntegerType()).alias("user_id"),
        col("product_id").cast(IntegerType()).alias("product_id"),
        col("distribution_center_id").cast(IntegerType()).alias("distribution_center_id"),
        col("date_id").cast(IntegerType()).alias("date_id"),

        # Measures
        col("quantity").cast(IntegerType()).alias("quantity"),
        col("sale_price").cast(DoubleType()).alias("sale_price"),
        col("revenue").cast(DoubleType()).alias("revenue"),
        col("total_cost").cast(DoubleType()).alias("total_cost"),
        col("profit").cast(DoubleType()).alias("profit")
    )
    .dropDuplicates()
)

# --------------------------------
# Data Quality Check
# --------------------------------
clean_count = fact_sales.count()

print(f"\nCleaned records: {clean_count}")
print(f"Dropped records: {raw_count - clean_count}")

# --------------------------------
# Output Validation
# --------------------------------
print("\nTransformed Schema:")
fact_sales.printSchema()

print("\nSample Data:")
fact_sales.show(5, truncate=False)


=== Processing fact_sales ===
Raw records: 181367

Original Schema:
root
 |-- order_item_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- distribution_center_id: long (nullable = true)
 |-- date_id: long (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- cost: double (nullable = true)
 |-- quantity: long (nullable = true)
 |-- revenue: double (nullable = true)
 |-- total_cost: double (nullable = true)
 |-- profit: double (nullable = true)


Cleaned records: 181365
Dropped records: 2

Transformed Schema:
root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- distribution_center_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- revenue: double (nullable = true)
 |-- total_cost: double (nullabl

#### II. 6 DATA QUALITY SUMMARY

In [27]:
print("\n" + "="*50)
print("DATA QUALITY SUMMARY")
print("="*50)

quality_summary = {
    "dim_products": dim_products.count(),
    "dim_users": dim_users.count(),
    "dim_distribution_centers": dim_distribution_centers.count(),
    "dim_date": dim_date.count(),
    "fact_sales": fact_sales.count()
}

for table, count in quality_summary.items():
    print(f"{table:30s}: {count:,} records")

# Check for missing foreign keys in fact table
print("\n=== Foreign Key Validation ===")

user_ids_in_fact = fact_sales.select("user_id").distinct()
user_ids_in_dim = dim_users.select("user_id").distinct()
missing_users = user_ids_in_fact.subtract(user_ids_in_dim).count()
print(f"Missing user_ids in dim_users: {missing_users}")

product_ids_in_fact = fact_sales.select("product_id").distinct()
product_ids_in_dim = dim_products.select("product_id").distinct()
missing_products = product_ids_in_fact.subtract(product_ids_in_dim).count()
print(f"Missing product_ids in dim_products: {missing_products}")


DATA QUALITY SUMMARY
dim_products                  : 29,118 records
dim_users                     : 100,000 records
dim_distribution_centers      : 10 records
dim_date                      : 2,522 records
fact_sales                    : 181,365 records

=== Foreign Key Validation ===
Missing user_ids in dim_users: 0
Missing product_ids in dim_products: 2


✅ Sizes all make sense
✅ No explosion / shrinkage
✅ fact > dimensions (correct)

Issue: A small number of fact records (2 rows) referenced missing dim_product keys.
Given the negligible volume (<0.01%), these records were removed to enforce referential integrity in the star schema

#### II. 7 Data Validation

In [28]:
# Missing product_ids Handling
fact_sales_clean = fact_sales.join(
    dim_products.select("product_id"),
    on="product_id",
    how="left_semi"
)

In [29]:
print("Before:", fact_sales.count())
print("After :", fact_sales_clean.count())
print("Dropped:", fact_sales.count() - fact_sales_clean.count())

Before: 181365
After : 181354
Dropped: 11


In [31]:
fact_sales = fact_sales_clean

In [32]:
# FK validation
fact_sales.join(dim_products, "product_id", "left_anti").count()
fact_sales.join(dim_users, "user_id", "left_anti").count()
fact_sales.join(dim_distribution_centers, "distribution_center_id", "left_anti").count()
fact_sales.join(
    dim_date,
    fact_sales.date_id == dim_date.date_id,
    "left_anti"
).count()

0

In [33]:
from pyspark.sql.functions import col, sum as _sum, when

def check_nulls(df, table_name):
    print(f"\n=== Null Check: {table_name} ===")
    df.select([
        _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df.columns
    ]).show(vertical=True)

check_nulls(dim_products, "dim_products")
check_nulls(dim_users, "dim_users")
check_nulls(dim_distribution_centers, "dim_distribution_centers")
check_nulls(dim_date, "dim_date")
check_nulls(dim_orders, "dim_orders")
check_nulls(fact_sales, "fact_sales")


=== Null Check: dim_products ===
-RECORD 0------------
 product_id    | 0   
 product_name  | 0   
 product_price | 0   
 category      | 0   
 brand         | 0   
 department    | 0   
 cost          | 0   


=== Null Check: dim_users ===
-RECORD 0-------------
 user_id        | 0   
 first_name     | 0   
 last_name      | 0   
 email          | 0   
 age            | 0   
 gender         | 0   
 city           | 0   
 state          | 0   
 country        | 0   
 postal_code    | 0   
 traffic_source | 0   
 created_at     | 0   


=== Null Check: dim_distribution_centers ===
-RECORD 0---------------------
 distribution_center_id | 0   
 name                   | 0   
 latitude               | 0   
 longitude              | 0   


=== Null Check: dim_date ===
-RECORD 0----------
 date_id     | 0   
 date        | 0   
 year        | 0   
 quarter     | 0   
 month       | 0   
 week        | 0   
 day         | 0   
 day_of_week | 0   
 day_name    | 0   
 month_name  | 0   
 is_we

**Data Summary:**

Timestamp fields shipped_at, delivered_at, and returned_at are nullable by design, as they represent lifecycle events that may not occur for all sales so: NULL = event has not occurred.

- shipped_at   : 43.870 nulls  (~35%) -> Many orders not shipped yet
- delivered_at : 81.400 nulls (~65%) -> Even more not delivered yet
- returned_at  : 112.465 nulls (~90%) -> Most never returned

### III. LOAD: PREPARE FOR DATABASE INSERTION 

In [34]:
# JDBC Connection Configuration
jdbc_url = "jdbc:postgresql://host.docker.internal:5432/dbt"

jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

# Source
dfs = {
    "dim_date": dim_date,
    "dim_users": dim_users,
    "dim_orders": dim_orders,
    "dim_products": dim_products,
    "dim_distribution_centers": dim_distribution_centers,
    "fact_sales": fact_sales
}


print("=== VALIDATE TRANSFORMED DATAFRAMES ===")

for table, df in dfs.items():
    print(f"\n>>> {table}")
    df.printSchema()
    print(f"Row count: {df.count()}")
    df.show(3, truncate=False)

print("\nVALIDATION DONE, ALL DATA USING THE TRANSFORMED ONE")

# =================================================
# 🔍 VALIDATION dfs[table] 
# =================================================
print("=== PRE-WRITE CHECK ===")

for table, df in dfs.items():
    print(f"{table}: {df.count()} rows")


=== VALIDATE TRANSFORMED DATAFRAMES ===

>>> dim_date
root
 |-- date_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)

Row count: 2522
+--------+----------+----+-------+-----+----+---+-----------+--------+----------+----------+
|date_id |date      |year|quarter|month|week|day|day_of_week|day_name|month_name|is_weekend|
+--------+----------+----+-------+-----+----+---+-----------+--------+----------+----------+
|20190108|2019-01-08|2019|1      |1    |1   |8  |3          |Tuesday |January   |false     |
|20190115|2019-01-15|2019|1      |1    |2   |15 |3          |Tuesday |January   |false     |
|20190117|2019-01-17|2019|1 

In [36]:
# ===============================
# JDBC CONFIG
# ===============================
jdbc_url = "jdbc:postgresql://host.docker.internal:5432/dbt"

jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

# ===============================
# SOURCE DATAFRAMES
# ===============================
dfs = {
     "dim_date": dim_date,
    "dim_users": dim_users,
    "dim_orders": dim_orders,
    "dim_products": dim_products,
    "dim_distribution_centers": dim_distribution_centers,
    "fact_sales": fact_sales
}

# ===============================
# LOAD - MODE: APPEND
# ===============================
print("=== WRITE TRANSFORMED DATA TO POSTGRES ===")

for table, df in dfs.items():
    print(f"\nWriting public.{table}")

    df.cache()
    row_count = df.count()
    print(f"Rows: {row_count}")

    df.write \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", f"public.{table}") \
        .option("user", jdbc_props["user"]) \
        .option("password", jdbc_props["password"]) \
        .option("driver", jdbc_props["driver"]) \
        .mode("append") \
        .save()

    print(f"✅ {table} DONE")

print("\n ALL TRANSFORMED TABLES WRITTEN TO POSTGRES")

=== WRITE TRANSFORMED DATA TO POSTGRES ===

Writing public.dim_date
Rows: 2522
✅ dim_date DONE

Writing public.dim_users
Rows: 100000
✅ dim_users DONE

Writing public.dim_orders
Rows: 125040
✅ dim_orders DONE

Writing public.dim_products
Rows: 29118
✅ dim_products DONE

Writing public.dim_distribution_centers
Rows: 10
✅ dim_distribution_centers DONE

Writing public.fact_sales
Rows: 181354
✅ fact_sales DONE

 ALL TRANSFORMED TABLES WRITTEN TO POSTGRES


In [37]:
# validation
for table, df in dfs.items():
    print(table, df.count())
    df.show(3, truncate=False)

dim_date 2522
+--------+----------+----+-------+-----+----+---+-----------+---------+----------+----------+
|date_id |date      |year|quarter|month|week|day|day_of_week|day_name |month_name|is_weekend|
+--------+----------+----+-------+-----+----+---+-----------+---------+----------+----------+
|20190508|2019-05-08|2019|2      |5    |18  |8  |4          |Wednesday|May       |false     |
|20190604|2019-06-04|2019|2      |6    |22  |4  |3          |Tuesday  |June      |false     |
|20200824|2020-08-24|2020|3      |8    |34  |24 |2          |Monday   |August    |false     |
+--------+----------+----+-------+-----+----+---+-----------+---------+----------+----------+
only showing top 3 rows
dim_users 100000
+-------+----------+---------+-----------------------------+---+------+----------+---------+-------+-----------+--------------+--------------------------+
|user_id|first_name|last_name|email                        |age|gender|city      |state    |country|postal_code|traffic_source|creat

**All set ✅**
the The Look E-Commerce Data Warehouse is now fully operational and well-structured.