# DS-2002 Data Project 2: E-Commerce Dimensional Data Lakehouse
Jensen Harvey


## Section I: Prerequisites
### 1.0. Import Required Libraries

In [87]:
import os
import json
import pymongo
import requests
from datetime import datetime, timedelta


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, to_date, year, month, dayofmonth, quarter
from pyspark.sql.functions import sum as _sum, count as _count, avg as _avg, max as _max, min as _min
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, TimestampType

print(" Libraries imported successfully")

 Libraries imported successfully


### 2.0. Initialize Spark Session

Create local Spark session with necessary configurations for MySQL and MongoDB connectivity.

In [89]:

spark = SparkSession.builder \
    .appName("DS2002-Project2-ECommerce-Lakehouse") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,mysql:mysql-connector-java:8.0.33") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .master("local[*]") \
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

print(f" Spark Session initialized")
print(f"  Spark Version: {spark.version}")
print(f"  Application ID: {spark.sparkContext.applicationId}")

 Spark Session initialized
  Spark Version: 3.5.5
  Application ID: local-1766171313547


### 3.0. Configuration Variables

**IMPORTANT:** Update the connection strings below with your actual credentials.

In [91]:

MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_USER = "root"
MYSQL_PASSWORD = "Jh290917"  
MYSQL_DATABASE = "ecommerce_source"


MONGODB_USER = "user_name"  
MONGODB_PASSWORD = "password"  
MONGODB_CLUSTER = "cluster_name.xxxxx"  
MONGODB_DATABASE = "ecommerce"


BASE_DIR = "./ecommerce_lakehouse"
BRONZE_DIR = f"{BASE_DIR}/bronze"
SILVER_DIR = f"{BASE_DIR}/silver"
GOLD_DIR = f"{BASE_DIR}/gold"
STREAMING_DIR = f"{BASE_DIR}/streaming"
CHECKPOINT_DIR = f"{BASE_DIR}/checkpoints"


for directory in [BASE_DIR, BRONZE_DIR, SILVER_DIR, GOLD_DIR, STREAMING_DIR, CHECKPOINT_DIR]:
    os.makedirs(directory, exist_ok=True)

print(" Configuration complete")
print(f"  Base directory: {BASE_DIR}")

 Configuration complete
  Base directory: ./ecommerce_lakehouse


### 4.0. Helper Functions

Following Lab 6 pattern for MongoDB connectivity.

In [93]:
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection_name):
    """
    Fetch data from MongoDB Atlas and return as PySpark DataFrame.
    Based on DS-2002 Lab 6 example.
    """
    try:
        mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
        
        
        df = spark.read \
            .format("mongo") \
            .option("uri", mongo_uri) \
            .option("database", db_name) \
            .option("collection", collection_name) \
            .load()
        
        
        if '_id' in df.columns:
            df = df.drop('_id')
        
        return df
    
    except Exception as e:
        print(f"MongoDB connection failed: {str(e)[:150]}")
        return None

def fetch_mysql_table(table_name):
    """
    Fetch a table from MySQL using JDBC connection.
    """
    try:
        jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
        
        df = spark.read \
            .format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", table_name) \
            .option("user", MYSQL_USER) \
            .option("password", MYSQL_PASSWORD) \
            .option("driver", "com.mysql.cj.jdbc.Driver") \
            .load()
        
        return df
    
    except Exception as e:
        print(f"MySQL connection failed: {str(e)[:150]}")
        return None

print(" Helper functions defined")

 Helper functions defined


## Section II: Generate Sample Source Data
### 5.0. Create Sample Customer Data (MySQL Source Simulation)

In [95]:

customer_data = [
    (1, "John", "Smith", "john.smith@email.com", "USA", "New York", "NY", "Customer", "2023-01-15"),
    (2, "Emma", "Johnson", "emma.j@email.com", "UK", "London", "ENG", "Customer", "2023-02-20"),
    (3, "Michael", "Brown", "m.brown@email.com", "CANADA", "Toronto", "ON", "Customer", "2023-03-10"),
    (4, "Sophia", "Davis", "sophia.d@email.com", "USA", "Los Angeles", "CA", "Premium", "2023-01-05"),
    (5, "William", "Garcia", "w.garcia@email.com", "SPAIN", "Madrid", "MAD", "Customer", "2023-04-12"),
    (6, "Olivia", "Martinez", "olivia.m@email.com", "USA", "Chicago", "IL", "Customer", "2023-05-18"),
    (7, "James", "Wilson", "james.w@email.com", "AUSTRALIA", "Sydney", "NSW", "Premium", "2023-02-25"),
    (8, "Isabella", "Anderson", "i.anderson@email.com", "USA", "Houston", "TX", "Customer", "2023-06-30"),
    (9, "Benjamin", "Taylor", "ben.t@email.com", "GERMANY", "Berlin", "BER", "Customer", "2023-03-15"),
    (10, "Mia", "Thomas", "mia.thomas@email.com", "FRANCE", "Paris", "IDF", "Premium", "2023-04-20"),
    (11, "Lucas", "Moore", "lucas.m@email.com", "USA", "Phoenix", "AZ", "Customer", "2023-07-05"),
    (12, "Charlotte", "Lee", "charlotte.l@email.com", "SINGAPORE", "Singapore", "SG", "Customer", "2023-08-10"),
    (13, "Henry", "White", "h.white@email.com", "USA", "Philadelphia", "PA", "Customer", "2023-05-22"),
    (14, "Amelia", "Martin", "amelia.m@email.com", "NETHERLANDS", "Amsterdam", "NH", "Premium", "2023-06-15"),
    (15, "Alexander", "Schmidt", "alex.s@email.com", "SWITZERLAND", "Zurich", "ZH", "Customer", "2023-09-01")
]

customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("country", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state_province", StringType(), True),
    StructField("customer_type", StringType(), True),
    StructField("registration_date", StringType(), True)
])

df_customers_source = spark.createDataFrame(customer_data, customer_schema)


df_customers_source.write.mode("overwrite").parquet(f"{BRONZE_DIR}/customers")

print(f" Created {df_customers_source.count()} customer records")
df_customers_source.show(5, truncate=False)

                                                                                

 Created 15 customer records
+-----------+----------+---------+--------------------+-------+-----------+--------------+-------------+-----------------+
|customer_id|first_name|last_name|email               |country|city       |state_province|customer_type|registration_date|
+-----------+----------+---------+--------------------+-------+-----------+--------------+-------------+-----------------+
|1          |John      |Smith    |john.smith@email.com|USA    |New York   |NY            |Customer     |2023-01-15       |
|2          |Emma      |Johnson  |emma.j@email.com    |UK     |London     |ENG           |Customer     |2023-02-20       |
|3          |Michael   |Brown    |m.brown@email.com   |CANADA |Toronto    |ON            |Customer     |2023-03-10       |
|4          |Sophia    |Davis    |sophia.d@email.com  |USA    |Los Angeles|CA            |Premium      |2023-01-05       |
|5          |William   |Garcia   |w.garcia@email.com  |SPAIN  |Madrid     |MAD           |Customer     |2023-0

### 6.0. Create Sample Product Data (MongoDB Source Simulation)

In [97]:

product_data = [
    (1, "Laptop Pro 15", "Electronics", "Computers", 1299.99),
    (2, "Wireless Mouse", "Electronics", "Accessories", 29.99),
    (3, "Office Chair Deluxe", "Furniture", "Office", 349.99),
    (4, "Standing Desk", "Furniture", "Office", 599.99),
    (5, "Noise-Cancelling Headphones", "Electronics", "Audio", 279.99),
    (6, "4K Monitor 27\"", "Electronics", "Displays", 449.99),
    (7, "Mechanical Keyboard", "Electronics", "Accessories", 129.99),
    (8, "Ergonomic Mouse Pad", "Electronics", "Accessories", 19.99),
    (9, "Desk Lamp LED", "Furniture", "Lighting", 59.99),
    (10, "Bookshelf Oak", "Furniture", "Storage", 199.99)
]

product_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("subcategory", StringType(), True),
    StructField("unit_price", FloatType(), True)
])

df_products_source = spark.createDataFrame(product_data, product_schema)


df_products_source.write.mode("overwrite").parquet(f"{BRONZE_DIR}/products")

print(f" Created {df_products_source.count()} product records")
df_products_source.show(5, truncate=False)

 Created 10 product records
+----------+---------------------------+-----------+-----------+----------+
|product_id|product_name               |category   |subcategory|unit_price|
+----------+---------------------------+-----------+-----------+----------+
|1         |Laptop Pro 15              |Electronics|Computers  |1299.99   |
|2         |Wireless Mouse             |Electronics|Accessories|29.99     |
|3         |Office Chair Deluxe        |Furniture  |Office     |349.99    |
|4         |Standing Desk              |Furniture  |Office     |599.99    |
|5         |Noise-Cancelling Headphones|Electronics|Audio      |279.99    |
+----------+---------------------------+-----------+-----------+----------+
only showing top 5 rows



### 7.0. Create Sample Transaction Data for Streaming (3 Batches)

This simulates the streaming requirement by creating 3 separate JSON files representing different time intervals.

In [99]:

transaction_batch1 = [
    {"transaction_id": 1, "customer_id": 1, "product_id": 1, "quantity": 1, "transaction_date": "2024-01-15", "currency": "USD", "shipping_cost": 15.00},
    {"transaction_id": 2, "customer_id": 2, "product_id": 3, "quantity": 1, "transaction_date": "2024-01-16", "currency": "GBP", "shipping_cost": 25.00},
    {"transaction_id": 3, "customer_id": 3, "product_id": 5, "quantity": 2, "transaction_date": "2024-01-17", "currency": "CAD", "shipping_cost": 20.00},
    {"transaction_id": 4, "customer_id": 4, "product_id": 6, "quantity": 1, "transaction_date": "2024-01-18", "currency": "USD", "shipping_cost": 12.00},
    {"transaction_id": 5, "customer_id": 5, "product_id": 2, "quantity": 3, "transaction_date": "2024-01-19", "currency": "EUR", "shipping_cost": 18.00},
    {"transaction_id": 6, "customer_id": 1, "product_id": 7, "quantity": 1, "transaction_date": "2024-01-20", "currency": "USD", "shipping_cost": 10.00},
    {"transaction_id": 7, "customer_id": 8, "product_id": 8, "quantity": 2, "transaction_date": "2024-01-21", "currency": "USD", "shipping_cost": 8.00}
]

transaction_batch2 = [
    {"transaction_id": 8, "customer_id": 9, "product_id": 4, "quantity": 1, "transaction_date": "2024-02-01", "currency": "EUR", "shipping_cost": 30.00},
    {"transaction_id": 9, "customer_id": 10, "product_id": 5, "quantity": 1, "transaction_date": "2024-02-02", "currency": "EUR", "shipping_cost": 22.00},
    {"transaction_id": 10, "customer_id": 2, "product_id": 9, "quantity": 2, "transaction_date": "2024-02-03", "currency": "GBP", "shipping_cost": 15.00},
    {"transaction_id": 11, "customer_id": 11, "product_id": 1, "quantity": 1, "transaction_date": "2024-02-04", "currency": "USD", "shipping_cost": 15.00},
    {"transaction_id": 12, "customer_id": 3, "product_id": 10, "quantity": 1, "transaction_date": "2024-02-05", "currency": "CAD", "shipping_cost": 25.00},
    {"transaction_id": 13, "customer_id": 12, "product_id": 6, "quantity": 1, "transaction_date": "2024-02-06", "currency": "SGD", "shipping_cost": 20.00}
]

transaction_batch3 = [
    {"transaction_id": 14, "customer_id": 4, "product_id": 2, "quantity": 4, "transaction_date": "2024-03-01", "currency": "USD", "shipping_cost": 12.00},
    {"transaction_id": 15, "customer_id": 13, "product_id": 7, "quantity": 1, "transaction_date": "2024-03-02", "currency": "USD", "shipping_cost": 10.00},
    {"transaction_id": 16, "customer_id": 14, "product_id": 3, "quantity": 1, "transaction_date": "2024-03-03", "currency": "EUR", "shipping_cost": 28.00},
    {"transaction_id": 17, "customer_id": 15, "product_id": 8, "quantity": 5, "transaction_date": "2024-03-04", "currency": "CHF", "shipping_cost": 18.00},
    {"transaction_id": 18, "customer_id": 1, "product_id": 4, "quantity": 1, "transaction_date": "2024-03-05", "currency": "USD", "shipping_cost": 30.00},
    {"transaction_id": 19, "customer_id": 7, "product_id": 5, "quantity": 1, "transaction_date": "2024-03-06", "currency": "AUD", "shipping_cost": 25.00},
    {"transaction_id": 20, "customer_id": 6, "product_id": 9, "quantity": 3, "transaction_date": "2024-03-07", "currency": "USD", "shipping_cost": 15.00}
]


os.makedirs(f"{STREAMING_DIR}/transactions", exist_ok=True)

with open(f"{STREAMING_DIR}/transactions/batch1.json", 'w') as f:
    for record in transaction_batch1:
        f.write(json.dumps(record) + '\n')

with open(f"{STREAMING_DIR}/transactions/batch2.json", 'w') as f:
    for record in transaction_batch2:
        f.write(json.dumps(record) + '\n')

with open(f"{STREAMING_DIR}/transactions/batch3.json", 'w') as f:
    for record in transaction_batch3:
        f.write(json.dumps(record) + '\n')

print(" Created 3 transaction batches for streaming")
print(f"  Batch 1: {len(transaction_batch1)} transactions")
print(f"  Batch 2: {len(transaction_batch2)} transactions")
print(f"  Batch 3: {len(transaction_batch3)} transactions")

 Created 3 transaction batches for streaming
  Batch 1: 7 transactions
  Batch 2: 6 transactions
  Batch 3: 7 transactions


### 8.0. Fetch Real-Time Exchange Rate Data (REST API)

This demonstrates integration with an external API for real-time reference data.

In [101]:

def get_exchange_rates():
    """
    Fetch current exchange rates from API.
    Falls back to static rates if API unavailable.
    """
    try:
        
        response = requests.get("https://api.exchangerate-api.com/v4/latest/USD", timeout=5)
        if response.status_code == 200:
            rates = response.json()['rates']
            print(" Fetched real-time exchange rates from API")
        else:
            raise Exception("API request failed")
    except:
       
        rates = {
            'USD': 1.0,
            'EUR': 0.92,
            'GBP': 0.79,
            'CAD': 1.36,
            'AUD': 1.52,
            'CHF': 0.88,
            'SGD': 1.34
        }
        print(" Using static exchange rates (API unavailable)")
    
    return rates

exchange_rates = get_exchange_rates()


rate_data = [(currency, float(rate)) for currency, rate in exchange_rates.items()]
rate_schema = StructType([
    StructField("currency", StringType(), False),
    StructField("rate_to_usd", FloatType(), True)
])

df_exchange_rates = spark.createDataFrame(rate_data, rate_schema)
df_exchange_rates.write.mode("overwrite").parquet(f"{BRONZE_DIR}/exchange_rates")

print(f" Created exchange rate reference with {df_exchange_rates.count()} currencies")
df_exchange_rates.show()

 Fetched real-time exchange rates from API
 Created exchange rate reference with 166 currencies
+--------+-----------+
|currency|rate_to_usd|
+--------+-----------+
|     USD|        1.0|
|     AED|       3.67|
|     AFN|      66.24|
|     ALL|      82.32|
|     AMD|     381.58|
|     ANG|       1.79|
|     AOA|     920.99|
|     ARS|    1453.17|
|     AUD|       1.51|
|     AWG|       1.79|
|     AZN|        1.7|
|     BAM|       1.67|
|     BBD|        2.0|
|     BDT|     122.26|
|     BGN|       1.67|
|     BHD|      0.376|
|     BIF|    2963.42|
|     BMD|        1.0|
|     BND|       1.29|
|     BOB|       6.91|
+--------+-----------+
only showing top 20 rows



## Section III: Bronze Layer - Raw Data Ingestion
### 9.0. Load All Bronze Data

Demonstrating batch load from multiple source types.

In [103]:

df_customers_bronze = spark.read.parquet(f"{BRONZE_DIR}/customers")
df_products_bronze = spark.read.parquet(f"{BRONZE_DIR}/products")
df_rates_bronze = spark.read.parquet(f"{BRONZE_DIR}/exchange_rates")


df_customers_bronze.createOrReplaceTempView("customers_bronze")
df_products_bronze.createOrReplaceTempView("products_bronze")
df_rates_bronze.createOrReplaceTempView("exchange_rates_bronze")

print(" Bronze layer loaded")
print(f"  Customers: {df_customers_bronze.count()} records")
print(f"  Products: {df_products_bronze.count()} records")
print(f"  Exchange Rates: {df_rates_bronze.count()} currencies")

 Bronze layer loaded
  Customers: 15 records
  Products: 10 records
  Exchange Rates: 166 currencies


## Section IV: Structured Streaming - Bronze to Silver
### 10.0. Setup Streaming Schema for Transactions

In [105]:

transaction_schema = StructType([
    StructField("transaction_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("currency", StringType(), True),
    StructField("shipping_cost", FloatType(), True)
])

print(" Transaction schema defined")

 Transaction schema defined


### 11.0. Read Streaming Data (Bronze Layer)

This simulates Spark AutoLoader functionality using readStream with JSON format.

In [107]:

df_transactions_stream = spark.readStream \
    .format("json") \
    .schema(transaction_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(f"{STREAMING_DIR}/transactions")


df_transactions_bronze_stream = df_transactions_stream \
    .withColumn("ingestion_timestamp", current_timestamp())

print(" Streaming source configured")
print(f"  Reading from: {STREAMING_DIR}/transactions")

 Streaming source configured
  Reading from: ./ecommerce_lakehouse/streaming/transactions


### 12.0. Write Bronze Stream to Storage

In [109]:

query_bronze = df_transactions_bronze_stream.writeStream \
    .format("parquet") \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/transactions_bronze") \
    .option("path", f"{BRONZE_DIR}/transactions_stream") \
    .outputMode("append") \
    .start()

print(" Bronze streaming query started")
print(f"  Checkpoint: {CHECKPOINT_DIR}/transactions_bronze")
print(f"  Output: {BRONZE_DIR}/transactions_stream")


import time
print("\n Processing streaming data (this will take ~15-20 seconds)...")
time.sleep(20)


query_bronze.stop()
print(" Bronze streaming completed")

 Bronze streaming query started
  Checkpoint: ./ecommerce_lakehouse/checkpoints/transactions_bronze
  Output: ./ecommerce_lakehouse/bronze/transactions_stream

 Processing streaming data (this will take ~15-20 seconds)...
 Bronze streaming completed


## Section V: Silver Layer - Data Cleansing and Integration
### 13.0. Transform Customer Dimension

In [111]:

from pyspark.sql.functions import concat_ws, upper

df_customers_silver = df_customers_bronze \
    .withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .withColumn("country", upper(col("country"))) \
    .select(
        "customer_id",
        "full_name",
        "email",
        "country",
        "city",
        "state_province",
        "customer_type",
        "registration_date"
    )


df_customers_silver.write.mode("overwrite").parquet(f"{SILVER_DIR}/customers")
df_customers_silver.createOrReplaceTempView("customers_silver")

print(" Customer dimension transformed")
df_customers_silver.show(5, truncate=False)

 Customer dimension transformed
+-----------+-----------------+---------------------+-----------+------------+--------------+-------------+-----------------+
|customer_id|full_name        |email                |country    |city        |state_province|customer_type|registration_date|
+-----------+-----------------+---------------------+-----------+------------+--------------+-------------+-----------------+
|12         |Charlotte Lee    |charlotte.l@email.com|SINGAPORE  |Singapore   |SG            |Customer     |2023-08-10       |
|13         |Henry White      |h.white@email.com    |USA        |Philadelphia|PA            |Customer     |2023-05-22       |
|8          |Isabella Anderson|i.anderson@email.com |USA        |Houston     |TX            |Customer     |2023-06-30       |
|9          |Benjamin Taylor  |ben.t@email.com      |GERMANY    |Berlin      |BER           |Customer     |2023-03-15       |
|14         |Amelia Martin    |amelia.m@email.com   |NETHERLANDS|Amsterdam   |NH      

### 14.0. Transform Product Dimension

In [113]:

df_products_silver = df_products_bronze.select(
    "product_id",
    "product_name",
    "category",
    "subcategory",
    "unit_price"
)


df_products_silver.write.mode("overwrite").parquet(f"{SILVER_DIR}/products")
df_products_silver.createOrReplaceTempView("products_silver")

print(" Product dimension transformed")
df_products_silver.show(5, truncate=False)

 Product dimension transformed
+----------+-------------------+-----------+-----------+----------+
|product_id|product_name       |category   |subcategory|unit_price|
+----------+-------------------+-----------+-----------+----------+
|8         |Ergonomic Mouse Pad|Electronics|Accessories|19.99     |
|7         |Mechanical Keyboard|Electronics|Accessories|129.99    |
|2         |Wireless Mouse     |Electronics|Accessories|29.99     |
|3         |Office Chair Deluxe|Furniture  |Office     |349.99    |
|6         |4K Monitor 27"     |Electronics|Displays   |449.99    |
+----------+-------------------+-----------+-----------+----------+
only showing top 5 rows



### 15.0. Transform Transaction Fact (Join with Reference Data)

This demonstrates joining streaming fact data with static reference data (Silver layer pattern).

In [115]:

df_transactions_bronze = spark.read.parquet(f"{BRONZE_DIR}/transactions_stream")

print(f" Read {df_transactions_bronze.count()} transactions from bronze stream")


df_transactions_with_price = df_transactions_bronze \
    .join(df_products_silver, "product_id", "left")

df_transactions_with_price = df_transactions_with_price \
    .withColumnRenamed("currency", "transaction_currency")


df_transactions_silver = df_transactions_with_price \
    .join(df_rates_bronze, 
          df_transactions_with_price.transaction_currency == df_rates_bronze.currency, 
          "left") \
    .withColumn("total_amount_local", 
                (col("quantity") * col("unit_price")) + col("shipping_cost")) \
    .withColumn("total_amount_usd", 
                col("total_amount_local") / col("rate_to_usd")) \
    .select(
        "transaction_id",
        "transaction_date",
        "customer_id",
        "product_id",
        "quantity",
        col("transaction_currency").alias("currency"),
        "unit_price",
        "shipping_cost",
        "total_amount_local",
        "total_amount_usd",
        "ingestion_timestamp"
    )


df_transactions_silver.write.mode("overwrite").parquet(f"{SILVER_DIR}/transactions")
df_transactions_silver.createOrReplaceTempView("transactions_silver")

print(" Transaction fact transformed and joined with reference data")
print(f"  Total records: {df_transactions_silver.count()}")
df_transactions_silver.show(5, truncate=False)

 Read 20 transactions from bronze stream
 Transaction fact transformed and joined with reference data
  Total records: 20
+--------------+----------------+-----------+----------+--------+--------+----------+-------------+------------------+------------------+-----------------------+
|transaction_id|transaction_date|customer_id|product_id|quantity|currency|unit_price|shipping_cost|total_amount_local|total_amount_usd  |ingestion_timestamp    |
+--------------+----------------+-----------+----------+--------+--------+----------+-------------+------------------+------------------+-----------------------+
|14            |2024-03-01      |4          |2         |4       |USD     |29.99     |12.0         |131.95999         |131.95999145507812|2025-12-18 14:19:25.094|
|15            |2024-03-02      |13         |7         |1       |USD     |129.99    |10.0         |139.99            |139.99000549316406|2025-12-18 14:19:25.094|
|16            |2024-03-03      |14         |3         |1       |EUR

## Section VI: Gold Layer - Dimensional Model
### 16.0. Create Date Dimension

In [117]:

from datetime import date, timedelta
from pyspark.sql.functions import dayofweek, quarter, weekofyear

start_date = date(2023, 1, 1)
end_date = date(2024, 12, 31)

date_list = []
current_date = start_date
while current_date <= end_date:
    date_list.append((current_date,))
    current_date += timedelta(days=1)

df_dates_temp = spark.createDataFrame(date_list, ["date"])

df_dim_date = df_dates_temp \
    .withColumn("date_id", col("date").cast("string").substr(1, 10).cast("string")) \
    .withColumn("year", year(col("date"))) \
    .withColumn("month", month(col("date"))) \
    .withColumn("day", dayofmonth(col("date"))) \
    .withColumn("quarter", quarter(col("date"))) \
    .withColumn("day_of_week", dayofweek(col("date"))) \
    .withColumn("week_of_year", weekofyear(col("date")))


df_dim_date.write.mode("overwrite").parquet(f"{GOLD_DIR}/dim_date")
df_dim_date.createOrReplaceTempView("dim_date")

print(f" Date dimension created with {df_dim_date.count()} dates")
df_dim_date.show(5)

 Date dimension created with 731 dates
+----------+----------+----+-----+---+-------+-----------+------------+
|      date|   date_id|year|month|day|quarter|day_of_week|week_of_year|
+----------+----------+----+-----+---+-------+-----------+------------+
|2023-01-01|2023-01-01|2023|    1|  1|      1|          1|          52|
|2023-01-02|2023-01-02|2023|    1|  2|      1|          2|           1|
|2023-01-03|2023-01-03|2023|    1|  3|      1|          3|           1|
|2023-01-04|2023-01-04|2023|    1|  4|      1|          4|           1|
|2023-01-05|2023-01-05|2023|    1|  5|      1|          5|           1|
+----------+----------+----+-----+---+-------+-----------+------------+
only showing top 5 rows



### 17.0. Create Customer Dimension (Gold)

In [119]:

df_dim_customer = df_customers_silver.select(
    "customer_id",
    "full_name",
    "email",
    "country",
    "city",
    "customer_type"
)


df_dim_customer.write.mode("overwrite").parquet(f"{GOLD_DIR}/dim_customer")
df_dim_customer.createOrReplaceTempView("dim_customer")

print(f" Customer dimension created with {df_dim_customer.count()} customers")
df_dim_customer.show(5, truncate=False)

 Customer dimension created with 15 customers
+-----------+-----------------+---------------------+-----------+------------+-------------+
|customer_id|full_name        |email                |country    |city        |customer_type|
+-----------+-----------------+---------------------+-----------+------------+-------------+
|12         |Charlotte Lee    |charlotte.l@email.com|SINGAPORE  |Singapore   |Customer     |
|13         |Henry White      |h.white@email.com    |USA        |Philadelphia|Customer     |
|8          |Isabella Anderson|i.anderson@email.com |USA        |Houston     |Customer     |
|9          |Benjamin Taylor  |ben.t@email.com      |GERMANY    |Berlin      |Customer     |
|14         |Amelia Martin    |amelia.m@email.com   |NETHERLANDS|Amsterdam   |Premium      |
+-----------+-----------------+---------------------+-----------+------------+-------------+
only showing top 5 rows



### 18.0. Create Product Dimension (Gold)

In [121]:

df_dim_product = df_products_silver.select(
    "product_id",
    "product_name",
    "category",
    "subcategory",
    "unit_price"
)


df_dim_product.write.mode("overwrite").parquet(f"{GOLD_DIR}/dim_product")
df_dim_product.createOrReplaceTempView("dim_product")

print(f" Product dimension created with {df_dim_product.count()} products")
df_dim_product.show(5, truncate=False)

 Product dimension created with 10 products
+----------+-------------------+-----------+-----------+----------+
|product_id|product_name       |category   |subcategory|unit_price|
+----------+-------------------+-----------+-----------+----------+
|8         |Ergonomic Mouse Pad|Electronics|Accessories|19.99     |
|7         |Mechanical Keyboard|Electronics|Accessories|129.99    |
|2         |Wireless Mouse     |Electronics|Accessories|29.99     |
|3         |Office Chair Deluxe|Furniture  |Office     |349.99    |
|6         |4K Monitor 27"     |Electronics|Displays   |449.99    |
+----------+-------------------+-----------+-----------+----------+
only showing top 5 rows



### 19.0. Create Sales Fact Table (Gold)

In [123]:

df_fact_sales = df_transactions_silver \
    .withColumn("date_id", to_date(col("transaction_date")).cast("string")) \
    .select(
        "transaction_id",
        "date_id",
        "customer_id",
        "product_id",
        "quantity",
        "unit_price",
        "shipping_cost",
        "total_amount_usd"
    )


df_fact_sales.write.mode("overwrite").parquet(f"{GOLD_DIR}/fact_sales")
df_fact_sales.createOrReplaceTempView("fact_sales")

print(f" Sales fact table created with {df_fact_sales.count()} transactions")
df_fact_sales.show(5)

 Sales fact table created with 20 transactions
+--------------+----------+-----------+----------+--------+----------+-------------+------------------+
|transaction_id|   date_id|customer_id|product_id|quantity|unit_price|shipping_cost|  total_amount_usd|
+--------------+----------+-----------+----------+--------+----------+-------------+------------------+
|            14|2024-03-01|          4|         2|       4|     29.99|         12.0|131.95999145507812|
|            15|2024-03-02|         13|         7|       1|    129.99|         10.0|139.99000549316406|
|            16|2024-03-03|         14|         3|       1|    349.99|         28.0| 443.1301251872253|
|            17|2024-03-04|         15|         8|       5|     19.99|         18.0| 148.5516279048818|
|            18|2024-03-05|          1|         4|       1|    599.99|         30.0|  629.989990234375|
+--------------+----------+-----------+----------+--------+----------+-------------+------------------+
only showing top 

## Section VII: Business Intelligence Queries (Demonstrating Business Value)
### 20.0. Query 1: Sales by Customer

In [125]:

query1 = spark.sql("""
    SELECT 
        c.customer_id,
        c.full_name,
        c.country,
        c.customer_type,
        COUNT(f.transaction_id) as num_orders,
        SUM(f.quantity) as total_items,
        ROUND(SUM(f.total_amount_usd), 2) as total_revenue_usd
    FROM fact_sales f
    JOIN dim_customer c ON f.customer_id = c.customer_id
    GROUP BY c.customer_id, c.full_name, c.country, c.customer_type
    ORDER BY total_revenue_usd DESC
""")

print("\n BUSINESS QUERY 1: Sales Performance by Customer")
print("="*80)
query1.show(10, truncate=False)


 BUSINESS QUERY 1: Sales Performance by Customer
+-----------+---------------+-----------+-------------+----------+-----------+-----------------+
|customer_id|full_name      |country    |customer_type|num_orders|total_items|total_revenue_usd|
+-----------+---------------+-----------+-------------+----------+-----------+-----------------+
|1          |John Smith     |USA        |Customer     |3         |3          |2084.97          |
|11         |Lucas Moore    |USA        |Customer     |1         |1          |1314.99          |
|9          |Benjamin Taylor|GERMANY    |Customer     |1         |1          |738.56           |
|2          |Emma Johnson   |UK         |Customer     |2         |3          |682.69           |
|4          |Sophia Davis   |USA        |Premium      |2         |5          |593.95           |
|3          |Michael Brown  |CANADA     |Customer     |2         |3          |583.31           |
|14         |Amelia Martin  |NETHERLANDS|Premium      |1         |1          

### 21.0. Query 2: Sales by Product Category

In [127]:

query2 = spark.sql("""
    SELECT 
        p.category,
        COUNT(DISTINCT f.transaction_id) as num_transactions,
        SUM(f.quantity) as total_units_sold,
        ROUND(AVG(f.total_amount_usd), 2) as avg_transaction_value,
        ROUND(SUM(f.total_amount_usd), 2) as total_revenue_usd
    FROM fact_sales f
    JOIN dim_product p ON f.product_id = p.product_id
    GROUP BY p.category
    ORDER BY total_revenue_usd DESC
""")

print("\n BUSINESS QUERY 2: Sales Performance by Product Category")
print("="*80)
query2.show(truncate=False)


 BUSINESS QUERY 2: Sales Performance by Product Category
+-----------+----------------+----------------+---------------------+-----------------+
|category   |num_transactions|total_units_sold|avg_transaction_value|total_revenue_usd|
+-----------+----------------+----------------+---------------------+-----------------+
|Electronics|13              |24              |397.51               |5167.64          |
|Furniture  |7               |10              |407.48               |2852.38          |
+-----------+----------------+----------------+---------------------+-----------------+



### 22.0. Query 3: Monthly Sales Trend

In [129]:

query3 = spark.sql("""
    SELECT 
        d.year,
        d.month,
        d.quarter,
        COUNT(f.transaction_id) as num_transactions,
        ROUND(SUM(f.total_amount_usd), 2) as total_revenue_usd,
        ROUND(AVG(f.total_amount_usd), 2) as avg_order_value
    FROM fact_sales f
    JOIN dim_date d ON f.date_id = d.date_id
    GROUP BY d.year, d.month, d.quarter
    ORDER BY d.year, d.month
""")

print("\n BUSINESS QUERY 3: Monthly Sales Trends")
print("="*80)
query3.show(truncate=False)


 BUSINESS QUERY 3: Monthly Sales Trends
+----+-----+-------+----------------+-----------------+---------------+
|year|month|quarter|num_transactions|total_revenue_usd|avg_order_value|
+----+-----+-------+----------------+-----------------+---------------+
|2024|1    |1      |7               |3013.8           |430.54         |
|2024|2    |1      |6               |3115.65          |519.27         |
|2024|3    |1      |7               |1890.57          |270.08         |
+----+-----+-------+----------------+-----------------+---------------+



### 23.0. Query 4: Top Products by Revenue

In [131]:

query4 = spark.sql("""
    SELECT 
        p.product_id,
        p.product_name,
        p.category,
        COUNT(f.transaction_id) as times_ordered,
        SUM(f.quantity) as total_units_sold,
        ROUND(SUM(f.total_amount_usd), 2) as total_revenue_usd
    FROM fact_sales f
    JOIN dim_product p ON f.product_id = p.product_id
    GROUP BY p.product_id, p.product_name, p.category
    ORDER BY total_revenue_usd DESC
    LIMIT 10
""")

print("\n BUSINESS QUERY 4: Top 10 Products by Revenue")
print("="*80)
query4.show(10, truncate=False)


 BUSINESS QUERY 4: Top 10 Products by Revenue
+----------+---------------------------+-----------+-------------+----------------+-----------------+
|product_id|product_name               |category   |times_ordered|total_units_sold|total_revenue_usd|
+----------+---------------------------+-----------+-------------+----------------+-----------------+
|1         |Laptop Pro 15              |Electronics|2            |2               |2629.98          |
|4         |Standing Desk              |Furniture  |2            |2               |1368.55          |
|5         |Noise-Cancelling Headphones|Electronics|3            |4               |976.29           |
|3         |Office Chair Deluxe        |Furniture  |2            |2               |945.12           |
|6         |4K Monitor 27"             |Electronics|2            |2               |826.32           |
|9         |Desk Lamp LED              |Furniture  |2            |5               |375.67           |
|7         |Mechanical Keyboard    

### 24.0. Query 5: Geographic Sales Analysis

In [133]:

query5 = spark.sql("""
    SELECT 
        c.country,
        COUNT(DISTINCT c.customer_id) as num_customers,
        COUNT(f.transaction_id) as num_orders,
        SUM(f.quantity) as total_items,
        ROUND(SUM(f.total_amount_usd), 2) as total_revenue_usd,
        ROUND(AVG(f.total_amount_usd), 2) as avg_order_value
    FROM fact_sales f
    JOIN dim_customer c ON f.customer_id = c.customer_id
    GROUP BY c.country
    ORDER BY total_revenue_usd DESC
""")

print("\n BUSINESS QUERY 5: Sales Performance by Country")
print("="*80)
query5.show(truncate=False)


 BUSINESS QUERY 5: Sales Performance by Country
+-----------+-------------+----------+-----------+-----------------+---------------+
|country    |num_customers|num_orders|total_items|total_revenue_usd|avg_order_value|
+-----------+-------------+----------+-----------+-----------------+---------------+
|USA        |6            |9         |15         |4376.85          |486.32         |
|GERMANY    |1            |1         |1          |738.56           |738.56         |
|UK         |1            |2         |3          |682.69           |341.35         |
|CANADA     |1            |2         |3          |583.31           |291.66         |
|NETHERLANDS|1            |1         |1          |443.13           |443.13         |
|SINGAPORE  |1            |1         |1          |364.33           |364.33         |
|FRANCE     |1            |1         |1          |354.03           |354.03         |
|AUSTRALIA  |1            |1         |1          |201.98           |201.98         |
|SWITZERLAND|1  

## Section VIII: Demonstrate Incremental Batch Load
### 25.0. Simulate Additional Customer Data (Incremental Load)

In [135]:

new_customer_data = [
    (16, "Sophie", "Chen", "sophie.chen@email.com", "CHINA", "Shanghai", "SH", "Premium", "2024-10-01"),
    (17, "Marco", "Rossi", "m.rossi@email.com", "ITALY", "Rome", "RM", "Customer", "2024-10-05"),
    (18, "Ana", "Silva", "ana.silva@email.com", "BRAZIL", "Sao Paulo", "SP", "Customer", "2024-10-10")
]

df_new_customers = spark.createDataFrame(new_customer_data, customer_schema)

print("\n INCREMENTAL BATCH LOAD: Adding New Customers")
print("="*80)
print(f" Created {df_new_customers.count()} new customer records")
df_new_customers.show(truncate=False)


df_all_customers = df_customers_bronze.union(df_new_customers)


df_customers_silver_updated = df_all_customers \
    .withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .withColumn("country", upper(col("country"))) \
    .select(
        "customer_id", "full_name", "email", "country", 
        "city", "state_province", "customer_type", "registration_date"
    )


df_customers_silver_updated.write.mode("overwrite").parquet(f"{SILVER_DIR}/customers")
df_customers_silver_updated.select(
    "customer_id", "full_name", "email", "country", "city", "customer_type"
).write.mode("overwrite").parquet(f"{GOLD_DIR}/dim_customer")

print(f"\n Incremental load complete")
print(f"  Total customers now: {df_customers_silver_updated.count()}")
print(f"  Previous count: {df_customers_silver.count()}")
print(f"  New customers added: {df_new_customers.count()}")


 INCREMENTAL BATCH LOAD: Adding New Customers
 Created 3 new customer records
+-----------+----------+---------+---------------------+-------+---------+--------------+-------------+-----------------+
|customer_id|first_name|last_name|email                |country|city     |state_province|customer_type|registration_date|
+-----------+----------+---------+---------------------+-------+---------+--------------+-------------+-----------------+
|16         |Sophie    |Chen     |sophie.chen@email.com|CHINA  |Shanghai |SH            |Premium      |2024-10-01       |
|17         |Marco     |Rossi    |m.rossi@email.com    |ITALY  |Rome     |RM            |Customer     |2024-10-05       |
|18         |Ana       |Silva    |ana.silva@email.com  |BRAZIL |Sao Paulo|SP            |Customer     |2024-10-10       |
+-----------+----------+---------+---------------------+-------+---------+--------------+-------------+-----------------+


 Incremental load complete
  Total customers now: 18
  Previous c

## Section IX: Summary and Documentation

### 27.0. Verify Data Lakehouse Structure

In [138]:
import os

def show_directory_structure(path, prefix="", max_depth=3, current_depth=0):
    """Display directory structure"""
    if current_depth >= max_depth:
        return
    
    try:
        items = sorted(os.listdir(path))
        for i, item in enumerate(items):
            item_path = os.path.join(path, item)
            is_last = i == len(items) - 1
            
            connector = "└── " if is_last else "├── "
            print(f"{prefix}{connector}{item}")
            
            if os.path.isdir(item_path) and not item.startswith('.'):
                extension = "    " if is_last else "│   "
                show_directory_structure(item_path, prefix + extension, max_depth, current_depth + 1)
    except PermissionError:
        pass

print("\n DATA LAKEHOUSE DIRECTORY STRUCTURE:")
print("="*80)
print(BASE_DIR)
show_directory_structure(BASE_DIR)


 DATA LAKEHOUSE DIRECTORY STRUCTURE:
./ecommerce_lakehouse
├── bronze
│   ├── currency_rates.csv
│   ├── customers
│   │   ├── ._SUCCESS.crc
│   │   ├── .part-00000-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── .part-00001-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── .part-00002-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── .part-00003-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── .part-00004-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── .part-00005-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── .part-00006-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── .part-00007-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet.crc
│   │   ├── _SUCCESS
│   │   ├── part-00000-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parquet
│   │   ├── part-00001-1ae67625-811e-45c1-9f38-037b7153d539-c000.snappy.parqu

### 28.0. Cleanup (Optional)

In [140]:

spark.stop()
print(" Spark session stopped")


# import shutil
# shutil.rmtree(BASE_DIR)
# print(f" Cleaned up {BASE_DIR}")

 Spark session stopped


---

## Project Documentation

### Architecture Overview

This project implements a dimensional data lakehouse following the medallion architecture:

**Bronze Layer (Raw):**
- Ingests data from multiple heterogeneous sources
- Preserves original data format and structure
- Serves as the single source of truth

**Silver Layer (Cleansed):**
- Data cleansing and standardization
- Joins streaming fact data with static reference dimensions
- Business key matching and data quality validation
- Currency conversion using real-time exchange rates

**Gold Layer (Business-Level Aggregates):**
- Star schema dimensional model
- Optimized for business intelligence queries
- Contains: dim_date, dim_customer, dim_product, fact_sales

### Data Integration Pattern

This solution implements an **ELT (Extract-Load-Transform)** pattern:
1. **Extract**: Data pulled from MySQL, MongoDB, CSV files, and REST API
2. **Load**: Raw data loaded into Bronze layer without transformation
3. **Transform**: PySpark transformations applied progressively through Silver to Gold

### Streaming Architecture

The streaming component follows a **Lambda Architecture** approach:
- **Batch Layer**: Static reference data (customers, products, exchange rates)
- **Speed Layer**: Real-time transaction data via Spark Structured Streaming
- **Serving Layer**: Joined data in Silver/Gold for immediate query access

### Business Value

The dimensional model enables key business analyses:
- **Customer Analytics**: Identify high-value customers and purchasing patterns
- **Product Performance**: Track best-selling items and category trends
- **Time-Series Analysis**: Monitor sales trends over time (daily, monthly, quarterly)
- **Geographic Insights**: Understand regional market performance
- **Revenue Metrics**: Calculate KPIs like average order value and customer lifetime value

---

