In [7]:
# Step 1: Setup SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, trim, sum, avg, countDistinct, rank, lag,
    date_format, expr, broadcast, count, upper, lit, udf
)
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("OnlineRetailAssignment").getOrCreate()

In [9]:
customers_sdf = spark.read.csv("/home/hduser/Downloads/Customers_Dataset.csv", header=True, inferSchema=True)
orders_sdf = spark.read.csv("/home/hduser/Downloads/Orders_Dataset.csv", header=True, inferSchema=True)



In [10]:
# Print schemas and sample data
customers_sdf.printSchema()
orders_sdf.printSchema()
customers_sdf.show(10)
orders_sdf.show(10)

# Step 4: Create Temp Views
customers_sdf.createOrReplaceTempView("customers")
orders_sdf.createOrReplaceTempView("orders")


root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- order_date: timestamp (nullable = true)

+-----------+-----------+-------+---+------+
|customer_id|       name|country|age|gender|
+-----------+-----------+-------+---+------+
|          1| Customer_1|    USA| 50|Female|
|          2| Customer_2|     UK| 55|Female|
|          3| Customer_3|  India| 45|Female|
|          4| Customer_4|  India| 38|  Male|
|          5| Customer_5|     UK| 23|  Male|
|          6| Customer_6|    USA| 49|Female|
|          7| Customer_7|Germany| 53|Female|
|          8| Customer_8|  India| 68|  Male|
|          9|

In [11]:
# Step 5: Data Cleaning
customers_clean = customers_sdf.dropDuplicates().na.drop()
orders_clean = orders_sdf.dropDuplicates().na.drop()

for col_name in ['name', 'country', 'gender']:
    customers_clean = customers_clean.withColumn(col_name, trim(col(col_name)))
for col_name in ['product', 'category']:
    orders_clean = orders_clean.withColumn(col_name, trim(col(col_name)))

customers_clean = customers_clean.withColumn("age", col("age").cast("int"))
orders_clean = orders_clean.withColumn("price", col("price").cast("double"))     .withColumn("order_date", col("order_date").cast("date"))

# Step 6: EDA
customers_clean.groupBy("country").count().show()
customers_clean.groupBy("age").count().orderBy("age").show()
customers_clean.groupBy("country").count().orderBy(col("count").desc()).show(5)
orders_clean.groupBy("category").count().orderBy(col("count").desc()).show(5)

+-------+-----+
|country|count|
+-------+-----+
|Germany|    2|
| France|    1|
|  India|    3|
|    USA|    2|
|     UK|    2|
+-------+-----+

+---+-----+
|age|count|
+---+-----+
| 18|    1|
| 23|    1|
| 38|    1|
| 45|    1|
| 49|    1|
| 50|    1|
| 53|    1|
| 55|    1|
| 64|    1|
| 68|    1|
+---+-----+

+-------+-----+
|country|count|
+-------+-----+
|  India|    3|
|Germany|    2|
|    USA|    2|
|     UK|    2|
| France|    1|
+-------+-----+

+-----------+-----+
|   category|count|
+-----------+-----+
|Electronics|   11|
|Accessories|    9|
+-----------+-----+



In [12]:
# Step 7: DataFrame API Insights
orders_clean = orders_clean.withColumn("revenue", col("price") * col("quantity"))

orders_clean.groupBy("category").agg(sum("revenue").alias("total_revenue")).show()

orders_with_country = orders_clean.join(customers_clean, "customer_id")
orders_with_country.withColumn("order_value", col("price") * col("quantity")).groupBy("country").agg(avg("order_value").alias("avg_order_value")).show()

orders_clean.groupBy("customer_id").agg(sum("revenue").alias("total_revenue")).orderBy(col("total_revenue").desc()).show(10)

orders_clean.select("product").distinct().count()
orders_clean.groupBy("product").agg(sum("quantity").alias("total_quantity")).orderBy(col("total_quantity").desc()).show(1)

+-----------+------------------+
|   category|     total_revenue|
+-----------+------------------+
|Electronics|19597.899999999998|
|Accessories|          20183.04|
+-----------+------------------+

+-------+------------------+
|country|   avg_order_value|
+-------+------------------+
|Germany|2800.4449999999997|
| France|1194.3566666666666|
|  India|2457.6433333333334|
|    USA|1542.3033333333333|
|     UK|          2092.335|
+-------+------------------+

+-----------+------------------+
|customer_id|     total_revenue|
+-----------+------------------+
|          5|7726.5599999999995|
|          7| 6925.030000000001|
|          1|           4829.83|
|          6|           4423.99|
|          9|           4276.75|
|          3|           3764.48|
|          4|3608.4500000000003|
|         10|3583.0699999999997|
|          2|            642.78|
+-----------+------------------+

+-------+--------------+
|product|total_quantity|
+-------+--------------+
| Tablet|            25|
+-------+

In [13]:
# Step 8: SQL Insights
spark.sql("""
    SELECT category, SUM(quantity) AS total_quantity, SUM(price * quantity) AS total_revenue
    FROM orders
    GROUP BY category
""").show()

spark.sql("""
    SELECT customer_id, SUM(price * quantity) AS total_spent
    FROM orders
    GROUP BY customer_id
    ORDER BY total_spent DESC
    LIMIT 1
""").show()

spark.sql("""
    SELECT DATE_FORMAT(order_date, 'yyyy-MM') AS month, SUM(price * quantity) AS monthly_revenue
    FROM orders
    GROUP BY month
    ORDER BY month
""").show()

spark.sql("""
    SELECT customer_id, COUNT(DISTINCT product) AS distinct_products
    FROM orders
    GROUP BY customer_id
    HAVING COUNT(DISTINCT product) > 5
""").show()

# Step 9: Joins and Group Aggregates
enriched_orders = orders_clean.join(customers_clean, "customer_id")

customers_with_orders = customers_clean.join(orders_clean, "customer_id", "left")
customers_with_orders.filter(col("order_id").isNull()).select("customer_id", "name").show()

enriched_orders = enriched_orders.withColumn("revenue", col("price") * col("quantity"))
enriched_orders.groupBy("country").agg(sum("revenue").alias("total_revenue")).show()

enriched_orders.groupBy("country", "customer_id").agg(sum("revenue").alias("cust_rev")).groupBy("country").agg(avg("cust_rev").alias("avg_rev_per_cust")).show()

windowSpec = Window.partitionBy("country").orderBy(col("revenue").desc())
enriched_orders.groupBy("country", "customer_id").agg(sum("revenue").alias("revenue")).withColumn("rank", rank().over(windowSpec)).filter(col("rank") == 1).show()

# Step 10: Window Functions
rev_cust = enriched_orders.groupBy("country", "customer_id").agg(sum("revenue").alias("total_revenue"))
win = Window.partitionBy("country").orderBy(col("total_revenue").desc())
rev_cust.withColumn("rank", rank().over(win)).filter(col("rank") <= 3).show()

monthly_rev = orders_clean.withColumn("month", date_format("order_date", "yyyy-MM")).groupBy("month").agg(sum("revenue").alias("monthly_revenue")).orderBy("month")

monthly_rev.withColumn("prev_month_revenue", lag("monthly_revenue", 1).over(Window.orderBy("month"))).withColumn("change", col("monthly_revenue") - col("prev_month_revenue")).show()

+-----------+--------------+-------------+
|   category|total_quantity|total_revenue|
+-----------+--------------+-------------+
|Electronics|            43|      19597.9|
|Accessories|            30|     20183.04|
+-----------+--------------+-------------+

+-----------+------------------+
|customer_id|       total_spent|
+-----------+------------------+
|          5|7726.5599999999995|
+-----------+------------------+

+-------+-----------------+
|  month|  monthly_revenue|
+-------+-----------------+
|2023-01|         10235.25|
|2023-03|           889.62|
|2023-04|          1017.95|
|2023-05|          2330.55|
|2023-06|            639.4|
|2023-08|7151.849999999999|
|2023-09|          3925.36|
|2023-10|          5611.08|
|2023-11|7979.879999999999|
+-------+-----------------+

+-----------+-----------------+
|customer_id|distinct_products|
+-----------+-----------------+
+-----------+-----------------+

+-----------+----------+
|customer_id|      name|
+-----------+----------+
|     