# **The Most Frequently Ordered Products for Each Customer**

## **Problem Statement**
You are given three tables: **Customers, Orders, and Products**.

### **Table: Customers**
| Column Name   | Type    |
|--------------|--------|
| `customer_id` | int    |
| `name`        | varchar |

### **Table: Orders**
| Column Name   | Type    |
|--------------|--------|
| `order_id`    | int    |
| `order_date`  | date   |
| `customer_id` | int    |
| `product_id`  | int    |

### **Table: Products**
| Column Name   | Type    |
|--------------|--------|
| `product_id`  | int    |
| `product_name` | varchar |
| `price`       | int    |

### **Objective**
Find the most frequently ordered product(s) for each customer who has made at least one order. The result should be sorted by `customer_id`.

---



## **Approach 1: PySpark DataFrame API**
### **Steps**
1. **Initialize Spark Session**
2. **Create DataFrames for `Customers`, `Orders`, and `Products`**
3. **Aggregate Order Counts for Each (`customer_id`, `product_id`)**
4. **Determine the Maximum Order Count per Customer**
5. **Join with `Products` Table to Get Product Names**
6. **Display the Result**

### **Code**

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, max

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("MostFrequentProduct").getOrCreate()

# Step 2: Create DataFrames
customers_data = [(1, "Alice"), (2, "Bob"), (3, "Tom"), (4, "Jerry"), (5, "John")]
customers_df = spark.createDataFrame(customers_data, ["customer_id", "name"])

orders_data = [
    (1, "2020-07-31", 1, 1), (2, "2020-07-30", 2, 2), (3, "2020-08-29", 3, 3),
    (4, "2020-07-29", 4, 1), (5, "2020-06-10", 1, 2), (6, "2020-08-01", 2, 1),
    (7, "2020-08-01", 3, 3), (8, "2020-08-03", 1, 2), (9, "2020-08-07", 2, 3),
    (10, "2020-07-15", 1, 2)
]
orders_df = spark.createDataFrame(orders_data, ["order_id", "order_date", "customer_id", "product_id"])

products_data = [(1, "keyboard", 120), (2, "mouse", 80), (3, "screen", 600), (4, "hard disk", 450)]
products_df = spark.createDataFrame(products_data, ["product_id", "product_name", "price"])

# Step 3: Count Orders per (customer_id, product_id)
order_counts = orders_df.groupBy("customer_id", "product_id").agg(count("*").alias("order_count"))

# Step 4: Get Maximum Order Count per Customer
max_orders = order_counts.groupBy("customer_id").agg(max("order_count").alias("max_count"))

# Step 5: Find Most Frequently Ordered Products per Customer
most_frequent = order_counts.join(max_orders, on="customer_id").filter(col("order_count") == col("max_count"))

# Step 6: Join with Products Table to Get Product Names
result_df = most_frequent.join(products_df, on="product_id").select("customer_id", "product_id", "product_name").orderBy("customer_id")

# Step 7: Display the Result
result_df.show()

StatementMeta(, 3f45c77e-48ee-4ddc-aaaa-d166b900c0ff, 6, Finished, Available, Finished)

+-----------+----------+------------+
|customer_id|product_id|product_name|
+-----------+----------+------------+
|          1|         2|       mouse|
|          2|         1|    keyboard|
|          2|         2|       mouse|
|          2|         3|      screen|
|          3|         3|      screen|
|          4|         1|    keyboard|
+-----------+----------+------------+




---

## **Approach 2: SQL Query in PySpark**
### **Steps**
1. **Create Spark DataFrames and Register them as SQL Views**
2. **Write SQL Query to Aggregate and Find Maximum Ordered Product**
3. **Execute SQL Query and Display the Output**

### **Code**

In [5]:
# Step 1: Register DataFrames as SQL Views
customers_df.createOrReplaceTempView("Customers")
orders_df.createOrReplaceTempView("Orders")
products_df.createOrReplaceTempView("Products")

# Step 2: Run SQL Query
sql_query = """
WITH OrderCounts AS (
    SELECT customer_id, product_id, COUNT(*) AS order_count
    FROM Orders
    GROUP BY customer_id, product_id
),
MaxOrders AS (
    SELECT customer_id, MAX(order_count) AS max_count
    FROM OrderCounts
    GROUP BY customer_id
)
SELECT o.customer_id, o.product_id, p.product_name
FROM OrderCounts o
JOIN MaxOrders m ON o.customer_id = m.customer_id AND o.order_count = m.max_count
JOIN Products p ON o.product_id = p.product_id
ORDER BY o.customer_id;
"""

result_sql = spark.sql(sql_query)

# Step 3: Display Output
result_sql.show()

StatementMeta(, 3f45c77e-48ee-4ddc-aaaa-d166b900c0ff, 7, Finished, Available, Finished)

+-----------+----------+------------+
|customer_id|product_id|product_name|
+-----------+----------+------------+
|          1|         2|       mouse|
|          2|         1|    keyboard|
|          2|         2|       mouse|
|          2|         3|      screen|
|          3|         3|      screen|
|          4|         1|    keyboard|
+-----------+----------+------------+



---

## **Summary**
| Approach  | Method                      | Steps  |
|-----------|-----------------------------|--------|
| **Approach 1** | PySpark DataFrame API    | Uses `groupBy()`, `agg()`, `join()`, and `filter()` |
| **Approach 2** | SQL Query in PySpark     | Uses `WITH` CTEs and `JOIN` |
