In [1]:
import pyspark

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.window import *

In [3]:
spark = SparkSession.builder.appName("ecomm_sales_analysis").getOrCreate()

In [4]:
df = spark.read.csv("/content/ecommerce_transactions.csv",header = True,inferSchema = True)

In [5]:
df.show(4)

+-------+----------+-------------------+---------------+---------+--------+------+--------+---------+-----------+
|OrderID|CustomerID|          OrderDate|ProductCategory|ProductID|Quantity| Price|Discount|     City|TotalAmount|
+-------+----------+-------------------+---------------+---------+--------+------+--------+---------+-----------+
|      1|      1102|2023-01-01 00:00:00|      Groceries|      267|       2|141.25|     0.1|Bangalore|     254.25|
|      2|      1435|2023-01-01 01:00:00|           Home|      136|       1|352.76|    0.11|Bangalore|   313.9564|
|      3|      1860|2023-01-01 02:00:00|       Clothing|      166|       3|127.93|    0.18|Bangalore|   314.7078|
|      4|      1270|2023-01-01 03:00:00|       Clothing|      210|       2|201.78|    0.04|Hyderabad|   387.4176|
+-------+----------+-------------------+---------------+---------+--------+------+--------+---------+-----------+
only showing top 4 rows



In [6]:
df.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- ProductCategory: string (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- City: string (nullable = true)
 |-- TotalAmount: double (nullable = true)



In [7]:
df.columns

['OrderID',
 'CustomerID',
 'OrderDate',
 'ProductCategory',
 'ProductID',
 'Quantity',
 'Price',
 'Discount',
 'City',
 'TotalAmount']

In [8]:
df_clean = df.dropna

In [9]:
# Drop rows with null values
df_clean = df.dropna()

# Make sure data types are correct (just in case)
df_clean = df_clean.withColumn("Quantity", col("Quantity").cast("int")) \
                   .withColumn("Price", col("Price").cast("double")) \
                   .withColumn("Discount", col("Discount").cast("double")) \
                   .withColumn("TotalAmount", col("TotalAmount").cast("double"))


In [10]:
df_clean.describe().show()

+-------+------------------+------------------+---------------+------------------+-----------------+------------------+-------------------+---------+-----------------+
|summary|           OrderID|        CustomerID|ProductCategory|         ProductID|         Quantity|             Price|           Discount|     City|      TotalAmount|
+-------+------------------+------------------+---------------+------------------+-----------------+------------------+-------------------+---------+-----------------+
|  count|             12000|             12000|          12000|             12000|            12000|             12000|              12000|    12000|            12000|
|   mean|            6000.5|        1503.07775|           NULL|299.84433333333334|5.062333333333333|252.97765750000124| 0.1514066666666642|     NULL|1085.698542550001|
| stddev|3464.2459496981446|289.84651447809534|           NULL|117.12000338323183| 2.56547520500092|143.14218494727658|0.08652337919208777|     NULL|893.2985234

**Top 5 Products by Revenue**

In [58]:
top_products = df_clean.groupBy("ProductID").agg(round(sum("TotalAmount"),2).alias("Revenue")).orderBy(desc("Revenue")).limit(5)



top_products.show()

+---------+--------+
|ProductID| Revenue|
+---------+--------+
|      399|56983.24|
|      116|55290.78|
|      308|55001.79|
|      479|49975.63|
|      450| 48652.6|
+---------+--------+



**Monthly Revenue Trends**

In [63]:
monthly_revenue = df_clean.withColumn("Month",month("OrderDate")).groupBy("Month").agg(round(sum("TotalAmount"),2).alias("Revenue")).orderBy(asc("Month"))

monthly_revenue.show()

+-----+----------+
|Month|   Revenue|
+-----+----------+
|    1|1577473.75|
|    2|1503039.01|
|    3|1647222.09|
|    4|1547644.45|
|    5| 1141600.4|
|    6| 779457.49|
|    7| 819879.88|
|    8| 779707.55|
|    9| 797134.02|
|   10| 850604.79|
|   11| 776114.68|
|   12| 808504.41|
+-----+----------+



**City Level Revenue**

In [11]:
city_kpi =  df_clean.groupBy("City").agg(round(sum("TotalAmount"),2).alias("Total_Revenue"))

city_kpi.show()

+---------+-------------+
|     City|Total_Revenue|
+---------+-------------+
|Bangalore|   2654310.96|
|  Chennai|   2522120.99|
|   Mumbai|   2660998.99|
|    Delhi|   2631832.35|
|Hyderabad|   2559119.22|
+---------+-------------+



In [12]:
city_level_revenue_kpi = df_clean.groupBy("City").agg(
    round(sum("TotalAmount"),2).alias("Total_Revenue"),
    round(max("TotalAmount"),2).alias("Max_Revenue"),
    round(min("TotalAmount"),2).alias("Min_Revenue"),
    round(avg("TotalAmount"),2).alias("Avg_Revenue")
)


city_level_revenue_kpi.show()

+---------+-------------+-----------+-----------+-----------+
|     City|Total_Revenue|Max_Revenue|Min_Revenue|Avg_Revenue|
+---------+-------------+-----------+-----------+-----------+
|Bangalore|   2654310.96|    4352.52|       6.97|    1081.63|
|  Chennai|   2522120.99|    4445.91|       5.64|    1069.15|
|   Mumbai|   2660998.99|    4197.12|       5.73|    1099.13|
|    Delhi|   2631832.35|    4328.39|       5.32|    1105.35|
|Hyderabad|   2559119.22|    4177.48|      10.25|    1073.01|
+---------+-------------+-----------+-----------+-----------+



**Top 10 Customer Lifetime Value**

In [64]:
cust_ltv = df_clean.groupBy("CustomerID").agg(round(sum("TotalAmount"),2).alias("CustomerLifetimeValue")).orderBy("CustomerID").limit(10)

cust_ltv.show()

+----------+---------------------+
|CustomerID|CustomerLifetimeValue|
+----------+---------------------+
|      1000|             22000.49|
|      1001|             15094.88|
|      1002|              15745.5|
|      1003|              9509.35|
|      1004|             12562.45|
|      1005|              8167.86|
|      1006|             16417.14|
|      1007|             13136.36|
|      1008|              7952.39|
|      1009|             14770.54|
+----------+---------------------+



**top_categories**

In [14]:
top_categories = df_clean.groupBy("ProductCategory").agg(sum("Quantity").alias("Total_sold")).orderBy(desc("Total_sold"))

top_categories.show()

+---------------+----------+
|ProductCategory|Total_sold|
+---------------+----------+
|    Electronics|     12534|
|       Clothing|     12213|
|           Home|     12092|
|          Books|     12006|
|      Groceries|     11903|
+---------------+----------+



**Top Discounted Cities**

In [66]:
top_discount_cities = df_clean.groupBy("City") \
                              .agg(round(sum("Discount"),2).alias("TotalDiscount")) \
                              .orderBy(desc("TotalDiscount")) \
                              .limit(5)
top_discount_cities.show()

+---------+-------------+
|     City|TotalDiscount|
+---------+-------------+
|Bangalore|       370.07|
|   Mumbai|       363.83|
|Hyderabad|       362.62|
|  Chennai|       360.64|
|    Delhi|       359.72|
+---------+-------------+



**Top-Selling Product Category by City**

In [15]:
#window_spec = Window.partitionBy("City").orderBy(desc("TotalItemsSold"))

category_by_city = df_clean.groupBy("city","ProductCategory").agg(sum("Quantity").alias("TotalItemsSold"))

category_ranked = category_by_city.withColumn("Rank", rank().over(Window.partitionBy("City").orderBy(desc("TotalItemsSold"))))

category_ranked.filter(col("Rank") == 1 ).show()


+---------+---------------+--------------+----+
|     city|ProductCategory|TotalItemsSold|Rank|
+---------+---------------+--------------+----+
|Bangalore|       Clothing|          2517|   1|
|  Chennai|    Electronics|          2536|   1|
|    Delhi|       Clothing|          2508|   1|
|Hyderabad|    Electronics|          2596|   1|
|   Mumbai|    Electronics|          2624|   1|
+---------+---------------+--------------+----+



**Top discounted product category per city with their total and average discount.**


In [28]:
product_cate = df_clean.groupBy("City", "ProductCategory").agg(round(sum("Discount"),2).alias("Total_Discount"),round(avg("Discount"),2).alias("avg_discount"))

cate_rank  = product_cate.withColumn("Rank",rank().over(Window.partitionBy("City").orderBy(desc("Total_Discount"))))

cate_rank.filter(col("Rank") == 1).show()

+---------+---------------+--------------+------------+----+
|     City|ProductCategory|Total_Discount|avg_discount|Rank|
+---------+---------------+--------------+------------+----+
|Bangalore|           Home|          76.8|        0.15|   1|
|  Chennai|    Electronics|         77.83|        0.15|   1|
|    Delhi|      Groceries|         74.51|        0.15|   1|
|Hyderabad|    Electronics|         76.09|        0.15|   1|
|   Mumbai|    Electronics|         76.89|        0.15|   1|
+---------+---------------+--------------+------------+----+



**Maximum and Minimum Product Price by City**

In [68]:
df_max_min = df_clean.withColumn("MaxPriceInCity", max("Price").over(Window.partitionBy("City"))) \
                     .withColumn("MinPriceInCity", min("Price").over(Window.partitionBy("City")))

df_max_min.select("City", "Price","MinPriceInCity","MaxPriceInCity").show(10)


+---------+------+--------------+--------------+
|     City| Price|MinPriceInCity|MaxPriceInCity|
+---------+------+--------------+--------------+
|Bangalore|141.25|          5.48|         499.9|
|Bangalore|352.76|          5.48|         499.9|
|Bangalore|127.93|          5.48|         499.9|
|Bangalore|131.46|          5.48|         499.9|
|Bangalore|334.57|          5.48|         499.9|
|Bangalore|455.33|          5.48|         499.9|
|Bangalore|333.16|          5.48|         499.9|
|Bangalore|144.47|          5.48|         499.9|
|Bangalore|153.89|          5.48|         499.9|
|Bangalore|324.72|          5.48|         499.9|
+---------+------+--------------+--------------+
only showing top 10 rows



**Average Spend per Order by Customer**

In [70]:
avg_byorder = df_clean.withColumn("Avg_per_Spend_By_Order", avg("TotalAmount").over(Window.partitionBy("CustomerID")))


avg_byorder.select("CustomerID","TotalAmount",(round("Avg_per_Spend_By_Order",2).alias("Avg_per_Spend_By_Order"))).show()

+----------+------------------+----------------------+
|CustomerID|       TotalAmount|Avg_per_Spend_By_Order|
+----------+------------------+----------------------+
|      1000|           393.666|               1222.25|
|      1000|1503.1535999999999|               1222.25|
|      1000|          335.1744|               1222.25|
|      1000|         2967.2838|               1222.25|
|      1000|           441.518|               1222.25|
|      1000|           868.912|               1222.25|
|      1000|176.23839999999998|               1222.25|
|      1000|            45.675|               1222.25|
|      1000|           229.035|               1222.25|
|      1000|          1547.344|               1222.25|
|      1000|         1993.5948|               1222.25|
|      1000|          401.8608|               1222.25|
|      1000|          557.6756|               1222.25|
|      1000|2365.4862000000003|               1222.25|
|      1000|          873.0612|               1222.25|
|      100

**Transaction Ranking by Revenue within Each City**

In [47]:
df_ranks = df_clean.withColumn("Row_Number", row_number().over(Window.partitionBy("City").orderBy(desc("TotalAmount")))) \
                   .withColumn("Rank", rank().over(Window.partitionBy("City").orderBy(desc("TotalAmount")))) \
                   .withColumn("DenseRank", dense_rank().over(Window.partitionBy("City").orderBy(desc("TotalAmount"))))

df_ranks.select("City", "TotalAmount", "Row_Number", "Rank", "DenseRank").show(5)

+---------+------------------+----------+----+---------+
|     City|       TotalAmount|Row_Number|Rank|DenseRank|
+---------+------------------+----------+----+---------+
|Bangalore|         4352.5161|         1|   1|        1|
|Bangalore|         4293.3996|         2|   2|        2|
|Bangalore|4290.4800000000005|         3|   3|        3|
|Bangalore|         4195.4832|         4|   4|        4|
|Bangalore|           4172.13|         5|   5|        5|
+---------+------------------+----------+----+---------+
only showing top 5 rows



**Order Amount Change Over Time (Per Customer)**

In [51]:

df_lead_lag = df_clean.withColumn("PrevOrderAmt", lag("TotalAmount").over(Window.partitionBy("CustomerID").orderBy("OrderDate")
)) \
                      .withColumn("NextOrderAmt", lead("TotalAmount").over(Window.partitionBy("CustomerID").orderBy("OrderDate")
))

df_lead_lag.select("CustomerID", "OrderDate", "TotalAmount", "PrevOrderAmt", "NextOrderAmt").show(5)

+----------+-------------------+------------------+------------------+------------------+
|CustomerID|          OrderDate|       TotalAmount|      PrevOrderAmt|      NextOrderAmt|
+----------+-------------------+------------------+------------------+------------------+
|      1000|2023-02-07 09:00:00|           393.666|              NULL|1503.1535999999999|
|      1000|2023-02-13 17:00:00|1503.1535999999999|           393.666|          335.1744|
|      1000|2023-02-26 13:00:00|          335.1744|1503.1535999999999|         2967.2838|
|      1000|2023-03-08 01:00:00|         2967.2838|          335.1744|           441.518|
|      1000|2023-04-26 00:00:00|           441.518|         2967.2838|           868.912|
+----------+-------------------+------------------+------------------+------------------+
only showing top 5 rows

