In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("ECommerce-Capstone").getOrCreate()
# Customers
customers_data = [
(1, "Rahul Sharma", "Bangalore", 28),
(2, "Priya Singh", "Delhi", 32),
(3, "Aman Kumar", "Hyderabad", 25),
(4, "Sneha Reddy", "Chennai", 35),
(5, "Arjun Mehta", "Mumbai", 30),
(6, "Divya Nair", "Delhi", 29)
]
customers_cols = ["customer_id", "name", "city", "age"]
customers_df = spark.createDataFrame(customers_data, customers_cols)
# Products
products_data = [
(101, "Laptop", "Electronics", 55000),
(102, "Mobile", "Electronics", 25000),
(103, "Headphones", "Electronics", 3000),
(104, "Chair", "Furniture", 5000),
(105, "Book", "Stationery", 700),
(106, "Shoes", "Fashion", 2500)
]
products_cols = ["product_id", "product_name", "category", "price"]
products_df = spark.createDataFrame(products_data, products_cols)
# Orders
orders_data = [
(1001, 1, 101, 1),
(1002, 2, 102, 2),
(1003, 1, 103, 3),
(1004, 3, 104, 1),
(1005, 5, 105, 5),
(1006, 6, 106, 2),
(1007, 7, 101, 1) # Order with non-existent customer
]
orders_cols = ["order_id", "customer_id", "product_id", "quantity"]
orders_df = spark.createDataFrame(orders_data, orders_cols)
customers_df.show()
products_df.show()
orders_df.show()

+-----------+------------+---------+---+
|customer_id|        name|     city|age|
+-----------+------------+---------+---+
|          1|Rahul Sharma|Bangalore| 28|
|          2| Priya Singh|    Delhi| 32|
|          3|  Aman Kumar|Hyderabad| 25|
|          4| Sneha Reddy|  Chennai| 35|
|          5| Arjun Mehta|   Mumbai| 30|
|          6|  Divya Nair|    Delhi| 29|
+-----------+------------+---------+---+

+----------+------------+-----------+-----+
|product_id|product_name|   category|price|
+----------+------------+-----------+-----+
|       101|      Laptop|Electronics|55000|
|       102|      Mobile|Electronics|25000|
|       103|  Headphones|Electronics| 3000|
|       104|       Chair|  Furniture| 5000|
|       105|        Book| Stationery|  700|
|       106|       Shoes|    Fashion| 2500|
+----------+------------+-----------+-----+

+--------+-----------+----------+--------+
|order_id|customer_id|product_id|quantity|
+--------+-----------+----------+--------+
|    1001|         

## 🔹 Basic Operations
1. Select all customer names and cities  
2. List all distinct product categories  
3. Filter customers older than 30  


In [11]:
# 1️⃣ Select all customer names and their cities
customers_df.select("name", "city").show()

# 2️⃣ List all distinct product categories
products_df.select("category").distinct().show()

# 3️⃣ Filter customers older than 30
customers_df.filter(F.col("age") > 30).show()


+------------+---------+
|        name|     city|
+------------+---------+
|Rahul Sharma|Bangalore|
| Priya Singh|    Delhi|
|  Aman Kumar|Hyderabad|
| Sneha Reddy|  Chennai|
| Arjun Mehta|   Mumbai|
|  Divya Nair|    Delhi|
+------------+---------+

+-----------+
|   category|
+-----------+
|Electronics|
| Stationery|
|    Fashion|
|  Furniture|
+-----------+

+-----------+-----------+-------+---+
|customer_id|       name|   city|age|
+-----------+-----------+-------+---+
|          2|Priya Singh|  Delhi| 32|
|          4|Sneha Reddy|Chennai| 35|
+-----------+-----------+-------+---+



## 🔹 Aggregations
4. Find the total number of orders placed per customer  
5. Find the average age of customers per city  
6. Calculate the total revenue generated from each product  


In [5]:
# 4️⃣ Find the total number of orders placed per customer
orders_df.groupBy("customer_id") \
    .agg(F.count("order_id").alias("orders_count")) \
    .show()

# 5️⃣ Find the average age of customers per city
customers_df.groupBy("city") \
    .agg(F.round(F.avg("age"),2).alias("avg_age")) \
    .show()

# 6️⃣ Calculate the total revenue generated from each product
orders_products = orders_df.join(products_df, "product_id", "left") \
    .withColumn("revenue", F.col("price") * F.col("quantity"))

orders_products.groupBy("product_id", "product_name") \
    .agg(F.sum("revenue").alias("total_revenue")) \
    .show()


+-----------+------------+
|customer_id|orders_count|
+-----------+------------+
|          1|           2|
|          2|           1|
|          7|           1|
|          6|           1|
|          5|           1|
|          3|           1|
+-----------+------------+

+---------+-------+
|     city|avg_age|
+---------+-------+
|Bangalore|   28.0|
|    Delhi|   30.5|
|Hyderabad|   25.0|
|  Chennai|   35.0|
|   Mumbai|   30.0|
+---------+-------+

+----------+------------+-------------+
|product_id|product_name|total_revenue|
+----------+------------+-------------+
|       103|  Headphones|         9000|
|       101|      Laptop|       110000|
|       102|      Mobile|        50000|
|       104|       Chair|         5000|
|       106|       Shoes|         5000|
|       105|        Book|         3500|
+----------+------------+-------------+



## 🔹 Joins
7. Join customers with orders to list which customer bought what  
8. Join orders with products to get order details with product name and price  
9. Find all customers who have never placed an order  
10. Find all products that have never been ordered  


In [6]:
# 7️⃣ Join customers with orders to list which customer bought what
customers_df.join(orders_df, "customer_id", "left") \
    .select("customer_id", "name", "order_id", "product_id", "quantity") \
    .show()

# 8️⃣ Join orders with products to get order details with product name and price
orders_df.join(products_df, "product_id", "left") \
    .select("order_id", "customer_id", "product_name", "price", "quantity") \
    .show()

# 9️⃣ Find all customers who have never placed an order
customers_df.join(orders_df, "customer_id", "left_anti").show()

# 🔟 Find all products that have never been ordered
products_df.join(orders_df, "product_id", "left_anti").show()


+-----------+------------+--------+----------+--------+
|customer_id|        name|order_id|product_id|quantity|
+-----------+------------+--------+----------+--------+
|          1|Rahul Sharma|    1003|       103|       3|
|          1|Rahul Sharma|    1001|       101|       1|
|          3|  Aman Kumar|    1004|       104|       1|
|          2| Priya Singh|    1002|       102|       2|
|          6|  Divya Nair|    1006|       106|       2|
|          5| Arjun Mehta|    1005|       105|       5|
|          4| Sneha Reddy|    NULL|      NULL|    NULL|
+-----------+------------+--------+----------+--------+

+--------+-----------+------------+-----+--------+
|order_id|customer_id|product_name|price|quantity|
+--------+-----------+------------+-----+--------+
|    1003|          1|  Headphones| 3000|       3|
|    1001|          1|      Laptop|55000|       1|
|    1002|          2|      Mobile|25000|       2|
|    1004|          3|       Chair| 5000|       1|
|    1006|          6|    

## 🔹 Sorting & Grouping
11. Show the top 3 most expensive products purchased  
12. Group orders by category and calculate total revenue per category  
13. List customers sorted by total money spent (highest first)  


In [7]:
# 1️⃣1️⃣ Show the top 3 most expensive products purchased
orders_df.join(products_df, "product_id", "inner") \
    .select("product_id", "product_name", "price") \
    .distinct() \
    .orderBy(F.col("price").desc()) \
    .limit(3) \
    .show()

# 1️⃣2️⃣ Group orders by category and calculate total revenue per category
orders_df.join(products_df, "product_id", "left") \
    .withColumn("revenue", F.col("price") * F.col("quantity")) \
    .groupBy("category") \
    .agg(F.sum("revenue").alias("total_revenue")) \
    .orderBy(F.col("total_revenue").desc()) \
    .show()

# 1️⃣3️⃣ List customers sorted by total money spent (highest first)
spending_per_customer = orders_df.join(products_df, "product_id", "left") \
    .withColumn("revenue", F.col("price") * F.col("quantity")) \
    .groupBy("customer_id") \
    .agg(F.sum("revenue").alias("total_spent"))

spending_per_customer.join(customers_df, "customer_id", "left") \
    .select("customer_id", "name", "total_spent") \
    .orderBy(F.col("total_spent").desc_nulls_last()) \
    .show()


+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|       101|      Laptop|55000|
|       102|      Mobile|25000|
|       104|       Chair| 5000|
+----------+------------+-----+

+-----------+-------------+
|   category|total_revenue|
+-----------+-------------+
|Electronics|       169000|
|    Fashion|         5000|
|  Furniture|         5000|
| Stationery|         3500|
+-----------+-------------+

+-----------+------------+-----------+
|customer_id|        name|total_spent|
+-----------+------------+-----------+
|          1|Rahul Sharma|      64000|
|          7|        NULL|      55000|
|          2| Priya Singh|      50000|
|          6|  Divya Nair|       5000|
|          3|  Aman Kumar|       5000|
|          5| Arjun Mehta|       3500|
+-----------+------------+-----------+



## 🔹 SQL Queries
14. Register all three DataFrames as temp views (customers, products, orders)  
15. Find the top 2 cities by total revenue  
16. Find customers who spent more than 50,000 in total  
17. Find which product category contributes the most revenue  


In [8]:
# 1️⃣4️⃣ Register DataFrames as temp views
customers_df.createOrReplaceTempView("customers")
products_df.createOrReplaceTempView("products")
orders_df.createOrReplaceTempView("orders")

# 1️⃣5️⃣ Find the top 2 cities by total revenue
spark.sql("""
SELECT c.city, SUM(p.price * o.quantity) AS total_revenue
FROM orders o
JOIN products p ON o.product_id = p.product_id
JOIN customers c ON o.customer_id = c.customer_id
GROUP BY c.city
ORDER BY total_revenue DESC
LIMIT 2
""").show()

# 1️⃣6️⃣ Find customers who spent more than 50,000 in total
spark.sql("""
SELECT c.customer_id, c.name, SUM(p.price * o.quantity) AS total_spent
FROM orders o
JOIN products p ON o.product_id = p.product_id
JOIN customers c ON o.customer_id = c.customer_id
GROUP BY c.customer_id, c.name
HAVING SUM(p.price * o.quantity) > 50000
""").show()

# 1️⃣7️⃣ Find which product category contributes the most revenue
spark.sql("""
SELECT p.category, SUM(p.price * o.quantity) AS category_revenue
FROM orders o
JOIN products p ON o.product_id = p.product_id
GROUP BY p.category
ORDER BY category_revenue DESC
LIMIT 1
""").show()


+---------+-------------+
|     city|total_revenue|
+---------+-------------+
|Bangalore|        64000|
|    Delhi|        55000|
+---------+-------------+

+-----------+------------+-----------+
|customer_id|        name|total_spent|
+-----------+------------+-----------+
|          1|Rahul Sharma|      64000|
+-----------+------------+-----------+

+-----------+----------------+
|   category|category_revenue|
+-----------+----------------+
|Electronics|          169000|
+-----------+----------------+

