In [19]:
# 1) Imports & Setup
import os
import pandas as pd
from faker import Faker
import random
from pyspark.sql.functions import round as spark_round,col,when,to_date,lit,avg
from datetime import datetime,timedelta
from sqlalchemy import create_engine
from pyspark.sql import SparkSession,Window
import numpy as np

In [20]:
# 2) Output directory & Faker setup
out_dir = "../data"# Directory to store CSV files
os.makedirs(out_dir, exist_ok=True)# Create the folder if it doesn't exist
fake = Faker()#Initialize faker to generate test data

In [21]:
#Customers.csv with invalid data
customers = []
for i in range(1,71):
    name=fake.name()
    email_name=name.lower().replace(" ","")
    email=f"{email_name}@gmail.com"
    country=fake.country()
    if i%10==0:
      email=None
    if i%15==0:
      email=None
    if i%20==0:
        name="##INVALID##"

    customers.append({
        "customer_id": i,
        "name": name,
        "email": email,
        "country":country
    })
pd.DataFrame(customers).to_csv(f"{out_dir }/customers.csv", index=False)
customers_df=pd.DataFrame(customers)
print(customers_df.head(30))

    customer_id               name                       email  \
0             1       Aaron Garner       aarongarner@gmail.com   
1             2    Shannon Merritt    shannonmerritt@gmail.com   
2             3      Sherry Murray      sherrymurray@gmail.com   
3             4     Thomas Jimenez     thomasjimenez@gmail.com   
4             5  Christopher Reyes  christopherreyes@gmail.com   
5             6   Katherine Henson   katherinehenson@gmail.com   
6             7   Vanessa Anderson   vanessaanderson@gmail.com   
7             8     Christy Garcia     christygarcia@gmail.com   
8             9        Mary Harvey        maryharvey@gmail.com   
9            10     Jennifer Payne                        None   
10           11    Denise Williams    denisewilliams@gmail.com   
11           12        Grace Brown        gracebrown@gmail.com   
12           13       Jason Arnold       jasonarnold@gmail.com   
13           14     Bradley Obrien     bradleyobrien@gmail.com   
14        

In [22]:
#Orders.csv with invalid data
statuses = ["completed", "pending", "cancelled", "returned"]
orders = []

for i in range(1, 100):
    order_date = fake.date_between(start_date='-6m', end_date='today').strftime("%Y-%m-%d")
    if i % 17 == 0:
        order_date = None
    amount = __builtins__.round(random.uniform(-50, 2000), 2)
    if i % 12 == 0:
        amount = -abs(amount)
    status = random.choice(statuses)
    orders.append({
        "order_id": i,
        "customer_id": random.randint(1, 70),
        "product_id": random.randint(1, 50),
        "order_date": order_date,
        "amount": amount,
        "status": status
    })
orders_df = pd.DataFrame(orders)
orders_df.to_csv(f"{out_dir}/orders.csv", index=False)
print(orders_df.head())

   order_id  customer_id  product_id  order_date  amount     status
0         1           52          44  2025-09-10  697.64   returned
1         2           60           2  2025-09-10  914.00    pending
2         3           15          12  2025-09-10  215.91  cancelled
3         4            2          30  2025-09-10  526.23   returned
4         5           49          48  2025-09-10  826.69    pending


In [23]:
#Products.csv with invalid data
products_catalog = [
    ("iPhone 14", "Electronics"), ("Samsung Galaxy S23", "Electronics"),
    ("MacBook Pro 14", "Electronics"), ("Sony WH-1000XM5 Headphones", "Electronics"),
    ("Apple Watch Series 9", "Wearables"), ("Fitbit Charge 5", "Wearables"),
    ("Dell XPS 13 Laptop", "Electronics"), ("iPad Air", "Tablets"),
    ("Bose QuietComfort Earbuds", "Audio"), ("Samsung Galaxy Buds", "Audio"),
    ("Canon EOS Rebel Camera", "Photography"), ("GoPro HERO10", "Photography"),
    ("Nike Air Max Shoes", "Footwear"), ("Puma Running Shoes", "Footwear"),
    ("Levi's 501 Jeans", "Apparel"), ("Adidas Hoodie", "Apparel"),
    ("Zara T-Shirt", "Apparel"), ("H&M Casual Dress", "Apparel"),
    ("Gucci Leather Belt", "Accessories"), ("Under Armour Shorts", "Sportswear"),
    ("Harry Potter Book Set", "Books"), ("The Lord of the Rings Trilogy", "Books"),
    ("Atomic Habits", "Books"), ("Rich Dad Poor Dad", "Books"),
    ("Python Crash Course", "Books"), ("Clean Code", "Books"),
    ("Game of Thrones Box Set", "Books"), ("The Lean Startup", "Books"),
    ("To Kill a Mockingbird", "Books"), ("1984", "Books"),
    ("Ikea Dining Table", "Furniture"), ("Sealy Memory Foam Mattress", "Furniture"),
    ("Ikea Chair Set", "Furniture"), ("Philips Air Fryer", "Kitchen Appliances"),
    ("NutriBullet Blender", "Kitchen Appliances"), ("Instant Pot Cooker", "Kitchen Appliances"),
    ("Hamilton Beach Toaster", "Kitchen Appliances"), ("Keurig Coffee Maker", "Kitchen Appliances"),
    ("Philips LED Desk Lamp", "Lighting"), ("Dyson V15 Vacuum Cleaner", "Home Appliances"),
    ("Wilson Tennis Racket", "Sports Equipment"), ("Adidas Football", "Sports Equipment"),
    ("Spalding Basketball", "Sports Equipment"), ("Yonex Badminton Racket", "Sports Equipment"),
    ("Nike Yoga Mat", "Fitness"), ("Reebok Jump Rope", "Fitness"),
    ("Speedo Swimming Goggles", "Sports Accessories"), ("Titleist Golf Balls", "Sports Equipment"),
    ("LEGO Star Wars Set", "Toys & Games"), ("Barbie Dreamhouse", "Dolls"),
    ("Hot Wheels Track Set", "Toys & Games"), ("Monopoly Board Game", "Board Games"),
    ("Rubik's Cube", "Puzzles"), ("Nerf Elite Blaster", "Outdoor Toys"),
    ("Play-Doh Fun Pack", "Arts & Crafts"), ("Fisher-Price Baby Gym", "Baby Toys"),
    ("Disney Princess Doll", "Dolls"), ("LEGO Technic Car", "Toys & Games")
]
products = []
for i, (name, category) in enumerate(products_catalog, start=1):
    products.append({
        "product_id": i,
        "name": name,
        "category": category,
        "price": round(random.uniform(10, 1200), 2)
    })
pd.DataFrame(products).to_csv(f"{out_dir}/products.csv", index=False)
products_df=pd.DataFrame(products)
print(products_df.head())

   product_id                        name     category   price
0           1                   iPhone 14  Electronics   88.85
1           2          Samsung Galaxy S23  Electronics  604.05
2           3              MacBook Pro 14  Electronics  317.84
3           4  Sony WH-1000XM5 Headphones  Electronics  920.36
4           5        Apple Watch Series 9    Wearables  349.81


In [24]:
# 6) Setup SparkSession with JDBC
jdbc_jar = r"C:\spark\spark-3.5.6-bin-hadoop3\jars\postgresql-42.7.7.jar"

spark = SparkSession.builder \
    .appName("DataEngineeringPipeline") \
    .config("spark.driver.extraClassPath", jdbc_jar) \
    .getOrCreate()#Initialize spark

In [25]:
# 7) Convert pandas DataFrames to Spark DataFrames & lowercase columns
customers_df = spark.read.csv(f"{out_dir}/customers.csv", header=True, inferSchema=True)
products_df= spark.read.csv(f"{out_dir}/products.csv", header=True, inferSchema=True)
orders_df = spark.read.csv(f"{out_dir}/orders.csv", header=True, inferSchema=True)

In [26]:
customers_df = customers_df.toDF(*[c.lower().strip() for c in customers_df.columns])
products_df = products_df.toDF(*[c.lower().strip() for c in products_df.columns])
orders_df   = orders_df.toDF(*[c.lower().strip() for c in orders_df.columns])

In [27]:
# Replace invalid names with NULL
customers_df=customers_df.withColumn("name",when((col("name")=="##INVALID##")| (col("name").isNull()),None).otherwise(col("name")))
# Replace invalid email strings with NULL
customers_df = customers_df.withColumn("email",when((col("email")=="None") | (col("email").isNull()), None).otherwise(col("email")))
# Remove duplicates based on customer_id
customers_df = customers_df.dropDuplicates(["customer_id"])

In [28]:
# Convert order_date to Spark DateType
orders_df = orders_df.withColumn("order_date",to_date("order_date", "yyyy-MM-dd"))
# Set any negative or zero amounts to 0
orders_df = orders_df.withColumn("amount",when(col("amount") <= 0, 0).otherwise(col("amount")))

In [29]:
category_avg = products_df.groupBy("category").agg(avg("price").alias("avg_price"))# Calculate category average price
products_df = products_df.join(category_avg, on="category", how="left")# Join average back to products
products_df = products_df.withColumn("price",when(col("price") <= 0, col("avg_price")).otherwise(col("price")))# Replace invalid or non-positive price with category average
products_df = products_df.drop("avg_price")# Drop helper column

In [30]:
# Rename columns to avoid collisions after join
customers_df = customers_df.withColumnRenamed("name", "customer_name")
products_df  = products_df.withColumnRenamed("name", "product_name")

In [31]:
#JDBC connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/de_pipeline"

connection_props = {
    "user": "postgres",
    "password": "root",
    "driver": "org.postgresql.Driver"
}

In [32]:
# Create order_details by joining
order_details = (
    orders_df
    .join(customers_df, "customer_id", "inner")
    .join(products_df, "product_id", "inner")
    .select(
        "order_id",
        customers_df["customer_name"],
        "country",
        products_df["product_name"],
        "category",
        "amount",
        "status",
        "order_date"
    )
)

In [33]:
order_details.show(20) # Preview first 20 rows

+--------+-----------------+----------------+--------------------+---------------+-------+---------+----------+
|order_id|    customer_name|         country|        product_name|       category| amount|   status|order_date|
+--------+-----------------+----------------+--------------------+---------------+-------+---------+----------+
|      28|     Aaron Garner|          Greece|    Reebok Jump Rope|        Fitness| 967.82|completed|2025-09-10|
|       4|  Shannon Merritt|Saint Barthelemy|                1984|          Books| 526.23| returned|2025-09-10|
|      50|    Sherry Murray|     Timor-Leste|The Lord of the R...|          Books| 155.71|  pending|2025-09-10|
|      13|    Sherry Murray|     Timor-Leste|          Clean Code|          Books| 995.55|completed|2025-09-10|
|      63|   Thomas Jimenez|     Switzerland|Dyson V15 Vacuum ...|Home Appliances| 464.64|  pending|2025-09-10|
|      30|   Thomas Jimenez|     Switzerland|       Atomic Habits|          Books|1902.23| returned|2025

In [34]:
# Persist order_details & create temp view
order_details.write.jdbc(url=jdbc_url, table="order_details", mode="append", properties=connection_props)

In [35]:
order_details.createOrReplaceTempView("order_details")

In [36]:
# Top Customers
top_customers_df = spark.sql("""
    SELECT customer_name, ROUND(SUM(amount),2)AS total_spend
    FROM order_details
    where status ="completed"
    GROUP BY customer_name
    ORDER BY total_spend DESC
    LIMIT 5
""")
top_customers_df.show()

+-----------------+-----------+
|    customer_name|total_spend|
+-----------------+-----------+
|      John Kramer|    1939.62|
|   Linda Jennings|    1928.84|
|    Mary Mitchell|    1850.47|
| William Harrison|    1797.07|
|Jacqueline Burton|    1614.39|
+-----------------+-----------+



In [37]:
# Monthly Revenue last 6 months
monthly_revenue_df = spark.sql("""
    SELECT DATE_FORMAT(order_date, 'yyyy-MM') AS month,
           ROUND(SUM(amount),2) AS revenue
    FROM order_details
    WHERE order_date >= add_months(current_date(), -6)
    GROUP BY DATE_FORMAT(order_date, 'yyyy-MM')
    ORDER BY month
""")
monthly_revenue_df.show()

+-------+--------+
|  month| revenue|
+-------+--------+
|2025-09|92133.91|
+-------+--------+



In [38]:
# Most popular category
popular_category_df = spark.sql("""
    SELECT category, COUNT(order_id) AS total_orders
    FROM order_details
    GROUP BY category
    ORDER BY total_orders DESC
    LIMIT 1
""")
popular_category_df.show()

+--------+------------+
|category|total_orders|
+--------+------------+
|   Books|          26|
+--------+------------+



In [39]:
# Count of completed vs cancelled orders
order_status_df = spark.sql("""
    SELECT 
        LOWER(TRIM(status)) AS order_status,
        COUNT(*) AS count
    FROM order_details
    WHERE LOWER(TRIM(status)) IN ('completed', 'cancelled')
    GROUP BY LOWER(TRIM(status))
""")
order_status_df.show()

+------------+-----+
|order_status|count|
+------------+-----+
|   completed|   14|
|   cancelled|   25|
+------------+-----+



In [40]:
# Save top_customers back to PostgreSQL
top_customers_df.write.jdbc(url=jdbc_url, table="top_customers", mode="overwrite", properties=connection_props)

In [41]:
# Save monthly_revenue back to PostgreSQL
monthly_revenue_df.write.jdbc(url=jdbc_url, table="monthly_revenue", mode="overwrite", properties=connection_props)

In [42]:
# Save popular_category back to PostgreSQL
popular_category_df.write.jdbc(url=jdbc_url, table="popular_category", mode="overwrite", properties=connection_props)

In [43]:
# Save order_status back to PostgreSQL
order_status_df.write.jdbc(url=jdbc_url, table="order_status", mode="overwrite", properties=connection_props)