### Import Libraries and Initialize PySpark

In [9]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta
import os

print("✓ Libraries imported")

✓ Libraries imported


### Create Spark Session

In [10]:
# Create Spark Session
spark = SparkSession.builder \
    .appName("Project2 - Fashion Retail Streaming Data Lakehouse") \
    .config("spark.sql.warehouse.dir", os.path.abspath("../lakehouse")) \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"✓ Spark Session created")
print(f"  Version: {spark.version}")
print(f"  App Name: {spark.sparkContext.appName}")

✓ Spark Session created
  Version: 4.1.0
  App Name: Project2 - Fashion Retail Streaming Data Lakehouse


25/12/17 01:23:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Load Customer Dimension

In [12]:
# Load customers from CSV
df_dim_customers = spark.read.csv("../data/customers.csv", header=True, inferSchema=True)

# First, let's see what columns we actually have
print("Actual columns in CSV:")
print(df_dim_customers.columns)
print()

# Add customer_key (surrogate key) - use lowercase column name
from pyspark.sql.window import Window
window_spec = Window.orderBy("customer_reference_id")
df_dim_customers = df_dim_customers.withColumn("customer_key", row_number().over(window_spec))

# Select columns that exist (using lowercase names from your CSV)
df_dim_customers = df_dim_customers.select(
    col("customer_key"),
    col("customer_reference_id"),
    col("first_name"),
    col("last_name"),
    col("email"),
    col("city"),
    col("age"),
    col("loyalty_tier")
)

# Derive loyalty_member (Yes/No)
df_dim_customers = df_dim_customers.withColumn(
    "loyalty_member",
    when(col("loyalty_tier").isin("Gold", "Platinum"), "Yes").otherwise("No")
)

print(f"✓ Customer dimension loaded: {df_dim_customers.count()} records")
df_dim_customers.show(5)

Actual columns in CSV:
['customer_reference_id', 'first_name', 'last_name', 'email', 'phone', 'city', 'state', 'zip_code', 'registration_date', 'customer_segment', 'loyalty_tier', 'age']

✓ Customer dimension loaded: 166 records
+------------+---------------------+----------+---------+--------------------+------------+---+------------+--------------+
|customer_key|customer_reference_id|first_name|last_name|               email|        city|age|loyalty_tier|loyalty_member|
+------------+---------------------+----------+---------+--------------------+------------+---+------------+--------------+
|           1|                 3957|     Aiden|    Davis|aiden.davis26@ema...|   San Diego| 20|    Platinum|           Yes|
|           2|                 3958|    Olivia|   Garcia|olivia.garcia224@...|     Seattle| 46|      Silver|            No|
|           3|                 3959|     Grace|    Moore|grace.moore829@em...|   Las Vegas| 66|      Silver|            No|
|           4|             

25/12/17 01:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/17 01:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/17 01:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### Load Product Dimension

In [14]:
# Load products from CSV
df_dim_products = spark.read.csv("../data/products.csv", header=True, inferSchema=True)

# First, let's see what columns we actually have
print("Actual columns in CSV:")
print(df_dim_products.columns)
print()

# Add product_key (surrogate key) - use lowercase column name
window_spec = Window.orderBy("item_name")
df_dim_products = df_dim_products.withColumn("product_key", row_number().over(window_spec))

# Select columns that exist (using lowercase names from your CSV)
df_dim_products = df_dim_products.select(
    col("product_key"),
    col("item_name"),
    col("category"),
    col("brand"),
    col("material"),
    col("season"),
    col("gender_target"),
    col("base_price"),
    col("stock_quantity")
)

print(f"✓ Product dimension loaded: {df_dim_products.count()} records")
df_dim_products.show(5)

Actual columns in CSV:
['product_id', 'item_name', 'category', 'brand', 'material', 'season', 'gender_target', 'base_price', 'stock_quantity', 'supplier_name', 'product_introduction_date']

✓ Product dimension loaded: 61 records
+-----------+---------+-----------+---------------+---------+-------------+-------------+----------+--------------+
|product_key|item_name|   category|          brand| material|       season|gender_target|base_price|stock_quantity|
+-----------+---------+-----------+---------------+---------+-------------+-------------+----------+--------------+
|          1| Backpack|Accessories|Premium Fashion|Synthetic|Spring/Summer|          Men|      4657|           120|
|          2|     Belt|Accessories|Premium Fashion|    Suede|  Fall/Winter|        Women|      4841|            58|
|          3|   Blazer|     Formal|  Fashion House|     Wool|  Fall/Winter|       Unisex|      3848|           396|
|          4|   Blouse|       Tops| Boutique Brand|Polyester|Spring/Summer|

25/12/17 01:31:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/17 01:31:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/17 01:31:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### Generate Date Dimension 

In [15]:
# Generate date dimension covering sales period (2023)
dates = []
start_date = datetime(2023, 1, 1)

for i in range(365):
    date = start_date + timedelta(days=i)
    dates.append({
        'date_key': i + 1,
        'purchase_date': date.strftime('%Y-%m-%d'),
        'full_date': date,
        'year': date.year,
        'month': date.month,
        'day': date.day,
        'quarter': (date.month - 1) // 3 + 1,
        'month_name': date.strftime('%B'),
        'day_name': date.strftime('%A')
    })

df_dim_date = spark.createDataFrame(dates)

print(f"✓ Date dimension generated: {df_dim_date.count()} records")
df_dim_date.show(5)

✓ Date dimension generated: 365 records
+--------+---+---------+-------------------+-----+----------+-------------+-------+----+
|date_key|day| day_name|          full_date|month|month_name|purchase_date|quarter|year|
+--------+---+---------+-------------------+-----+----------+-------------+-------+----+
|       1|  1|   Sunday|2023-01-01 00:00:00|    1|   January|   2023-01-01|      1|2023|
|       2|  2|   Monday|2023-01-02 00:00:00|    1|   January|   2023-01-02|      1|2023|
|       3|  3|  Tuesday|2023-01-03 00:00:00|    1|   January|   2023-01-03|      1|2023|
|       4|  4|Wednesday|2023-01-04 00:00:00|    1|   January|   2023-01-04|      1|2023|
|       5|  5| Thursday|2023-01-05 00:00:00|    1|   January|   2023-01-05|      1|2023|
+--------+---+---------+-------------------+-----+----------+-------------+-------+----+
only showing top 5 rows


### Create Payment Dimension

In [16]:
# Create payment dimension (simple lookup table)
payment_data = [
    {'payment_key': 1, 'payment_method': 'Credit Card'},
    {'payment_key': 2, 'payment_method': 'PayPal'},
    {'payment_key': 3, 'payment_method': 'Cash'},
    {'payment_key': 4, 'payment_method': 'Debit Card'}
]

df_dim_payment = spark.createDataFrame(payment_data)

print(f"✓ Payment dimension created: {df_dim_payment.count()} records")
df_dim_payment.show()

✓ Payment dimension created: 4 records
+-----------+--------------+
|payment_key|payment_method|
+-----------+--------------+
|          1|   Credit Card|
|          2|        PayPal|
|          3|          Cash|
|          4|    Debit Card|
+-----------+--------------+



### Summary of All Dimensions

In [17]:
print("="*60)
print("DIMENSION TABLES SUMMARY")
print("="*60)
print(f"✓ Customers: {df_dim_customers.count()} records")
print(f"✓ Products: {df_dim_products.count()} records")
print(f"✓ Dates: {df_dim_date.count()} records")
print(f"✓ Payment Methods: {df_dim_payment.count()} records")
print("="*60)
print("✅ All dimensions ready for streaming ETL!")

DIMENSION TABLES SUMMARY
✓ Customers: 166 records
✓ Products: 61 records
✓ Dates: 365 records
✓ Payment Methods: 4 records
✅ All dimensions ready for streaming ETL!


### Verify Column Names for Joins

In [18]:
print("Customer columns:", df_dim_customers.columns)
print("Product columns:", df_dim_products.columns)
print("Date columns:", df_dim_date.columns)
print("Payment columns:", df_dim_payment.columns)

Customer columns: ['customer_key', 'customer_reference_id', 'first_name', 'last_name', 'email', 'city', 'age', 'loyalty_tier', 'loyalty_member']
Product columns: ['product_key', 'item_name', 'category', 'brand', 'material', 'season', 'gender_target', 'base_price', 'stock_quantity']
Date columns: ['date_key', 'day', 'day_name', 'full_date', 'month', 'month_name', 'purchase_date', 'quarter', 'year']
Payment columns: ['payment_key', 'payment_method']
