In [1]:
!pip install delta

Collecting delta
  Downloading delta-0.4.2.tar.gz (4.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: delta
  Building wheel for delta (setup.py) ... [?25l[?25hdone
  Created wheel for delta: filename=delta-0.4.2-py3-none-any.whl size=2915 sha256=a179be9d2aeca2affc7916cf2926bbad4c6d3b8e8bb05a414df9fefc826e67a7
  Stored in directory: /root/.cache/pip/wheels/a8/86/24/a486f14769cf86a2a9ce6b589a82b7414b14657c6fd515dc75
Successfully built delta
Installing collected packages: delta
Successfully installed delta-0.4.2


In [2]:
!mkdir -p delta_tables/bronze
!mkdir -p delta_tables/silver
!mkdir -p delta_tables/gold

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta import *

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MongoDB to Delta Lake") \
    .config("spark.mongodb.input.uri", "mongodb+srv://llmate-assignment-orders-read-only:cl89aTfXGb4J@order-data.sco2q.mongodb.net/order_data") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

# Read from MongoDB collections
bronze_orders_df = spark.read.format("mongo").option("uri", "mongodb+srv://llmate-assignment-orders-read-only:cl89aTfXGb4J@order-data.sco2q.mongodb.net/order_data.orders").load()
bronze_customers_df = spark.read.format("mongo").option("uri", "mongodb+srv://llmate-assignment-orders-read-only:cl89aTfXGb4J@order-data.sco2q.mongodb.net/order_data.customers").load()



In [4]:
# Count the number of rows in bronze_orders_df
row_count = bronze_orders_df.count()
print("Number of rows in bronze_orders_df:", row_count)


Number of rows in bronze_orders_df: 10


In [5]:
# Write to Parquet
bronze_orders_df.write.mode("overwrite").parquet("delta_tables/bronze/bronze_orders_parquet")


In [6]:
# Load the Parquet file into a DataFrame
bronze_orders_parquet_df = spark.read.parquet("delta_tables/bronze/bronze_orders_parquet")


In [7]:
# Register as a temporary view
bronze_orders_parquet_df.createOrReplaceTempView("bronze_orders_parquet_view")


In [8]:
# Example SQL query
spark.sql("SELECT * FROM bronze_orders_parquet_view WHERE total_amount > 100").show()


+--------------------+-----------+--------------------+----------+--------+-------------+----------+------------+
|                 _id|customer_id|               items|order_date|order_id|shipping_date|    status|total_amount|
+--------------------+-----------+--------------------+----------+--------+-------------+----------+------------+
|{67212c65e20556e9...|       C123|[{P001, 2, 50.0},...|2023-08-15|   O1001|   2023-08-20|   Shipped|      250.75|
|{67212c7de20556e9...|       C124|[{P003, 1, 100.0}...|2023-09-10|   O1002|   2023-09-15|Processing|       150.5|
|{67212c89e20556e9...|       C125|   [{P005, 4, 75.0}]|2024-10-01|   O1003|   2024-10-05| Delivered|       300.0|
|{67212c98e20556e9...|       C126| [{P006, 5, 100.05}]|2024-11-15|   O1004|   2024-11-20|   Shipped|      500.25|
|{67212ca7e20556e9...|       C127|   [{P007, 2, 60.0}]|2024-12-20|   O1005|   2024-12-25| Delivered|       120.0|
|{67212cbae20556e9...|       C128|  [{P008, 4, 100.0}]|2025-01-10|   O1006|   2025-01-15

In [9]:
# Query directly from the Parquet path
spark.sql("SELECT * FROM parquet.`delta_tables/bronze/bronze_orders_parquet` WHERE total_amount > 100").show()


+--------------------+-----------+--------------------+----------+--------+-------------+----------+------------+
|                 _id|customer_id|               items|order_date|order_id|shipping_date|    status|total_amount|
+--------------------+-----------+--------------------+----------+--------+-------------+----------+------------+
|{67212c65e20556e9...|       C123|[{P001, 2, 50.0},...|2023-08-15|   O1001|   2023-08-20|   Shipped|      250.75|
|{67212c7de20556e9...|       C124|[{P003, 1, 100.0}...|2023-09-10|   O1002|   2023-09-15|Processing|       150.5|
|{67212c89e20556e9...|       C125|   [{P005, 4, 75.0}]|2024-10-01|   O1003|   2024-10-05| Delivered|       300.0|
|{67212c98e20556e9...|       C126| [{P006, 5, 100.05}]|2024-11-15|   O1004|   2024-11-20|   Shipped|      500.25|
|{67212ca7e20556e9...|       C127|   [{P007, 2, 60.0}]|2024-12-20|   O1005|   2024-12-25| Delivered|       120.0|
|{67212cbae20556e9...|       C128|  [{P008, 4, 100.0}]|2025-01-10|   O1006|   2025-01-15

In [10]:
bronze_customers_df.write.mode("overwrite").parquet("delta_tables/bronze/bronze_customers_parquet")

In [11]:
#DataCleaning
from pyspark.sql.functions import col, to_date

# Load Bronze tables
bronze_orders_df = spark.read.parquet("delta_tables/bronze/bronze_orders_parquet")
bronze_customers_df = spark.read.parquet("delta_tables/bronze/bronze_customers_parquet")

# Convert date strings to Date type
silver_orders_df = bronze_orders_df \
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) \
    .withColumn("shipping_date", to_date(col("shipping_date"), "yyyy-MM-dd"))

silver_customers_df = bronze_customers_df \
    .withColumn("registration_date", to_date(col("registration_date"), "yyyy-MM-dd"))

# Handle nulls (for demonstration, we’ll fill them with defaults)
silver_orders_df = silver_orders_df.fillna({"status": "Unknown", "total_amount": 0.0})
silver_customers_df = silver_customers_df.fillna({"email": "unknown@example.com", "phone": "000-0000"})

# Remove duplicate records
silver_orders_df = silver_orders_df.dropDuplicates(["order_id"])
silver_customers_df = silver_customers_df.dropDuplicates(["customer_id"])


In [12]:
silver_customers_df.show()

+--------------------+--------------------+-----------+--------------------+----------+---------+--------+-----------------+
|                 _id|             address|customer_id|               email|first_name|last_name|   phone|registration_date|
+--------------------+--------------------+-----------+--------------------+----------+---------+--------+-----------------+
|{67212948e20556e9...|{123 Elm Street, ...|       C123|jane.smith@exampl...|      Jane|    Smith|555-1234|       2020-05-10|
|{67212b1ee20556e9...|{456 Oak Avenue, ...|       C124|john.doe@example.com|      John|      Doe|555-5678|       2021-03-22|
|{67212b2be20556e9...|{789 Pine Road, M...|       C125|emily.johnson@exa...|     Emily|  Johnson|555-8765|       2022-11-15|
|{67212b37e20556e9...|{321 Maple Street...|       C126|michael.brown@exa...|   Michael|    Brown|555-4321|       2023-07-08|
|{67212b47e20556e9...|{654 Cedar Avenue...|       C127|sarah.davis@examp...|     Sarah|    Davis|555-6789|       2024-01-20|


In [13]:
silver_orders_df.write.mode("overwrite").parquet("delta_tables/silver/silver_orders_parquet")

In [14]:
silver_customers_df.write.mode("overwrite").parquet("delta_tables/silver/silver_customers_parquet")

In [15]:
spark.sql("SELECT * FROM parquet.`delta_tables/silver/silver_orders_parquet`").show()


+--------------------+-----------+--------------------+----------+--------+-------------+----------+------------+
|                 _id|customer_id|               items|order_date|order_id|shipping_date|    status|total_amount|
+--------------------+-----------+--------------------+----------+--------+-------------+----------+------------+
|{67212c65e20556e9...|       C123|[{P001, 2, 50.0},...|2023-08-15|   O1001|   2023-08-20|   Shipped|      250.75|
|{67212c7de20556e9...|       C124|[{P003, 1, 100.0}...|2023-09-10|   O1002|   2023-09-15|Processing|       150.5|
|{67212c89e20556e9...|       C125|   [{P005, 4, 75.0}]|2024-10-01|   O1003|   2024-10-05| Delivered|       300.0|
|{67212c98e20556e9...|       C126| [{P006, 5, 100.05}]|2024-11-15|   O1004|   2024-11-20|   Shipped|      500.25|
|{67212ca7e20556e9...|       C127|   [{P007, 2, 60.0}]|2024-12-20|   O1005|   2024-12-25| Delivered|       120.0|
|{67212cbae20556e9...|       C128|  [{P008, 4, 100.0}]|2025-01-10|   O1006|   2025-01-15

In [16]:
# Count the number of rows in bronze_orders_df
row_count = silver_orders_df.count()
print("Number of rows in bronze_orders_df:", row_count)


Number of rows in bronze_orders_df: 10


In [17]:
#DataEnrichment
from pyspark.sql.functions import explode, size

# Compute item_count based on the number of items in the array
bronze_orders_df = bronze_orders_df.withColumn("item_count", size("items"))

# Flatten the items array to create one row per item
flattened_orders_df = bronze_orders_df.withColumn("item", explode("items"))

# Select necessary columns including expanded item fields
flattened_orders_df = flattened_orders_df.select(
    "order_id",
    "customer_id",
    "order_date",
    "shipping_date",
    "status",
    "total_amount",
    "item_count",
    "item.product_id",
    "item.quantity",
    "item.unit_price"
)

# Show the result
flattened_orders_df.show()


+--------+-----------+----------+-------------+----------+------------+----------+----------+--------+----------+
|order_id|customer_id|order_date|shipping_date|    status|total_amount|item_count|product_id|quantity|unit_price|
+--------+-----------+----------+-------------+----------+------------+----------+----------+--------+----------+
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|         2|      P001|       2|      50.0|
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|         2|      P002|       3|     50.25|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|       150.5|         2|      P003|       1|     100.0|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|       150.5|         2|      P004|       1|      50.5|
|   O1003|       C125|2024-10-01|   2024-10-05| Delivered|       300.0|         1|      P005|       4|      75.0|
|   O1004|       C126|2024-11-15|   2024-11-20|   Shipped|      500.25|         1|      

In [18]:
#Data Integration
# Load Bronze tables if not already loaded
bronze_orders_df = spark.read.parquet("delta_tables/bronze/bronze_orders_parquet")
bronze_customers_df = spark.read.parquet("delta_tables/bronze/bronze_customers_parquet")

# Join orders and customers on customer_id to enrich order data with customer information
enriched_orders_df = bronze_orders_df.join(bronze_customers_df, on="customer_id", how="left")

# Select relevant columns (optional, to keep only needed fields)
enriched_orders_df = enriched_orders_df.select(
    "order_id", "customer_id", "order_date", "shipping_date", "status", "total_amount",
    "first_name", "last_name", "email", "phone", "address")
# Show the enriched data
enriched_orders_df.show()


+--------+-----------+----------+-------------+----------+------------+----------+---------+--------------------+--------+--------------------+
|order_id|customer_id|order_date|shipping_date|    status|total_amount|first_name|last_name|               email|   phone|             address|
+--------+-----------+----------+-------------+----------+------------+----------+---------+--------------------+--------+--------------------+
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|      Jane|    Smith|jane.smith@exampl...|555-1234|{123 Elm Street, ...|
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|      Jane|    Smith|jane.smith@exampl...|555-1234|{123 Elm Street, ...|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|       150.5|      John|      Doe|john.doe@example.com|555-5678|{456 Oak Avenue, ...|
|   O1003|       C125|2024-10-01|   2024-10-05| Delivered|       300.0|     Emily|  Johnson|emily.johnson@exa...|555-8765|{789 Pine Road

In [19]:
from pyspark.sql.functions import concat_ws, col

# Derive customer_name by concatenating first_name and last_name
enriched_orders_df = enriched_orders_df.withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name")))

# Extract region from address (assuming region is stored in address.state or another specific field in the nested structure)
# Adjust "address.state" based on actual structure
enriched_orders_df = enriched_orders_df.withColumn("region", col("address.state"))

# Select only necessary columns, including the new derived fields
enriched_orders_df = enriched_orders_df.select(
    "order_id", "customer_id", "order_date", "shipping_date", "status",
    "customer_name", "email", "phone", "region"
)

# Show the enriched and transformed data
enriched_orders_df.show()


+--------+-----------+----------+-------------+----------+--------------+--------------------+--------+------+
|order_id|customer_id|order_date|shipping_date|    status| customer_name|               email|   phone|region|
+--------+-----------+----------+-------------+----------+--------------+--------------------+--------+------+
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|    Jane Smith|jane.smith@exampl...|555-1234|    IL|
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|    Jane Smith|jane.smith@exampl...|555-1234|    IL|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|      John Doe|john.doe@example.com|555-5678|    NE|
|   O1003|       C125|2024-10-01|   2024-10-05| Delivered| Emily Johnson|emily.johnson@exa...|555-8765|    WI|
|   O1004|       C126|2024-11-15|   2024-11-20|   Shipped| Michael Brown|michael.brown@exa...|555-4321|    TX|
|   O1005|       C127|2024-12-20|   2024-12-25| Delivered|   Sarah Davis|sarah.davis@examp...|555-6789|    CO|
|

In [20]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta import *
bronze_orders_df = spark.read.parquet("delta_tables/bronze/bronze_orders_parquet")
bronze_customers_df = spark.read.parquet("delta_tables/bronze/bronze_customers_parquet")

### Step 1: Data Cleaning
# Convert date strings to Date type
silver_orders_df = bronze_orders_df \
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) \
    .withColumn("shipping_date", to_date(col("shipping_date"), "yyyy-MM-dd"))

silver_customers_df = bronze_customers_df \
    .withColumn("registration_date", to_date(col("registration_date"), "yyyy-MM-dd"))

# Handle nulls by filling in default values
silver_orders_df = silver_orders_df.fillna({"status": "Unknown", "total_amount": 0.0})
silver_customers_df = silver_customers_df.fillna({"email": "unknown@example.com", "phone": "000-0000"})

# Remove duplicate records
silver_orders_df = silver_orders_df.dropDuplicates(["order_id"])
silver_customers_df = silver_customers_df.dropDuplicates(["customer_id"])

### Step 2: Data Enrichment
# Compute item_count based on the number of items in the array
silver_orders_df = silver_orders_df.withColumn("item_count", size("items"))

# Flatten items array to create one row per item in the order
silver_orders_flat_df = silver_orders_df.withColumn("item", explode("items"))

# Select necessary columns, including expanded item fields
silver_orders_flat_df = silver_orders_flat_df.select(
    "order_id", "customer_id", "order_date", "shipping_date", "status", "total_amount", "item_count",
    "item.product_id", "item.quantity", "item.unit_price"
)

### Step 3: Data Integration
# Join orders and customers on customer_id to enrich order data with customer information
enriched_orders_df = silver_orders_flat_df.join(silver_customers_df, on="customer_id", how="left")

### Step 4: Derive Additional Fields
# Derive customer_name by concatenating first_name and last_name
enriched_orders_df = enriched_orders_df.withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name")))

# Extract region from address (assuming region is stored in address.state)
enriched_orders_df = enriched_orders_df.withColumn("region", col("address.state"))

# Select only necessary columns, including new derived fields
enriched_orders_df = enriched_orders_df.select(
    "order_id", "customer_id", "order_date", "shipping_date", "status", "total_amount", "item_count",
    "customer_name", "email", "phone", "region", "product_id", "quantity", "unit_price"
)

### Step 5: Store Processed Data in Silver Delta Tables
# Write enriched orders data to Silver Delta Lake table
enriched_orders_df.write.mode("overwrite").parquet("delta_tables/silver/silver_orders_parquet")
# Write cleansed customers data to Silver Delta Lake table
silver_customers_df.write.mode("overwrite").parquet("delta_tables/silver/silver_customers_parquet")


In [21]:
enriched_orders_df.show()

+--------+-----------+----------+-------------+----------+------------+----------+--------------+--------------------+--------+------+----------+--------+----------+
|order_id|customer_id|order_date|shipping_date|    status|total_amount|item_count| customer_name|               email|   phone|region|product_id|quantity|unit_price|
+--------+-----------+----------+-------------+----------+------------+----------+--------------+--------------------+--------+------+----------+--------+----------+
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|         2|    Jane Smith|jane.smith@exampl...|555-1234|    IL|      P001|       2|      50.0|
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|         2|    Jane Smith|jane.smith@exampl...|555-1234|    IL|      P002|       3|     50.25|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|       150.5|         2|      John Doe|john.doe@example.com|555-5678|    NE|      P003|       1|     100.0|
|   

In [22]:
# Load Silver orders table
silver_orders_df = spark.read.parquet("delta_tables/silver/silver_orders_parquet")

# Select relevant columns and ensure correct format
orders_analytics_df = silver_orders_df.select(
    "order_id",
    "customer_id",
    "order_date",
    "shipping_date",
    "status",
    "total_amount",
    "item_count",
    "customer_name",
    "email",
    "region"
)

# Write to Gold Delta Lake table for orders analytics
orders_analytics_df.write.mode("overwrite").parquet("delta_tables/gold/gold_orders_analytics")


In [23]:
orders_analytics_df.show()

+--------+-----------+----------+-------------+----------+------------+----------+--------------+--------------------+------+
|order_id|customer_id|order_date|shipping_date|    status|total_amount|item_count| customer_name|               email|region|
+--------+-----------+----------+-------------+----------+------------+----------+--------------+--------------------+------+
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|         2|    Jane Smith|jane.smith@exampl...|    IL|
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|         2|    Jane Smith|jane.smith@exampl...|    IL|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|       150.5|         2|      John Doe|john.doe@example.com|    NE|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|       150.5|         2|      John Doe|john.doe@example.com|    NE|
|   O1003|       C125|2024-10-01|   2024-10-05| Delivered|       300.0|         1| Emily Johnson|emily.johnson@exa...|

In [24]:
from pyspark.sql.functions import sum, count, avg, min, max

# Aggregate data to compute sales metrics by region
sales_metrics_df = silver_orders_df.groupBy("region").agg(
    sum("total_amount").alias("total_sales"),
    count("order_id").alias("total_orders"),
    avg("total_amount").alias("average_order_val"),
    min("order_date").alias("first_order_date"),
    max("order_date").alias("last_order_date")
)

# Write to Gold Delta Lake table for sales metrics
sales_metrics_df.write.mode("overwrite").parquet("delta_tables/gold/gold_sales_metrics")

In [25]:
sales_metrics_df.show()

+------+-----------+------------+-----------------+----------------+---------------+
|region|total_sales|total_orders|average_order_val|first_order_date|last_order_date|
+------+-----------+------------+-----------------+----------------+---------------+
|    OR|      275.5|           1|            275.5|      2025-03-05|     2025-03-05|
|    WI|     480.75|           2|          240.375|      2024-10-01|     2024-12-01|
|    NE|      501.0|           3|            167.0|      2023-09-10|     2024-11-05|
|    IL|     1151.5|           4|          287.875|      2023-08-15|     2024-10-28|
|    WA|      400.0|           1|            400.0|      2025-01-10|     2025-01-10|
|    TX|     500.25|           1|           500.25|      2024-11-15|     2024-11-15|
|    CO|      120.0|           1|            120.0|      2024-12-20|     2024-12-20|
+------+-----------+------------+-----------------+----------------+---------------+



In [26]:
# Cache DataFrames in memory for faster repeated access
orders_analytics_df.cache()
sales_metrics_df.cache()

DataFrame[region: string, total_sales: double, total_orders: bigint, average_order_val: double, first_order_date: date, last_order_date: date]

In [27]:
# Ensure each order has a unique identifier based on order_id
orders_analytics_df = orders_analytics_df.dropDuplicates(["order_id"])


In [28]:
# Ensure each order has a unique identifier based on order_id
orders_analytics_df.show()


+--------+-----------+----------+-------------+----------+------------+----------+--------------+--------------------+------+
|order_id|customer_id|order_date|shipping_date|    status|total_amount|item_count| customer_name|               email|region|
+--------+-----------+----------+-------------+----------+------------+----------+--------------+--------------------+------+
|   O1001|       C123|2023-08-15|   2023-08-20|   Shipped|      250.75|         2|    Jane Smith|jane.smith@exampl...|    IL|
|   O1002|       C124|2023-09-10|   2023-09-15|Processing|       150.5|         2|      John Doe|john.doe@example.com|    NE|
|   O1003|       C125|2024-10-01|   2024-10-05| Delivered|       300.0|         1| Emily Johnson|emily.johnson@exa...|    WI|
|   O1004|       C126|2024-11-15|   2024-11-20|   Shipped|      500.25|         1| Michael Brown|michael.brown@exa...|    TX|
|   O1005|       C127|2024-12-20|   2024-12-25| Delivered|       120.0|         1|   Sarah Davis|sarah.davis@examp...|

In [29]:
orders_analytics_df.write.mode("overwrite").parquet("delta_tables/gold/gold_orders_analytics")

In [30]:
sales_metrics_df

DataFrame[region: string, total_sales: double, total_orders: bigint, average_order_val: double, first_order_date: date, last_order_date: date]

In [31]:
# Read the Parquet file
df = spark.read.parquet("delta_tables/gold/gold_sales_metrics")

# Write DataFrame to CSV format and save to local directory (e.g., "output_data")
df.write.option("header", "true").mode("overwrite").csv("/dbfs/FileStore/output_data/sales_metrics_df")


In [32]:
df = spark.read.parquet("delta_tables/gold/gold_orders_analytics")

# Write DataFrame to CSV format and save to local directory (e.g., "output_data")
df.write.option("header", "true").mode("overwrite").csv("/dbfs/FileStore/output_data/gold_orders_analytics_csv")
