<a href="https://colab.research.google.com/github/tanvimane655/CODTECH-Internship/blob/main/Task_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark pandas matplotlib notebook


Collecting jedi>=0.16 (from ipython>=7.23.1->ipykernel->notebook)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi
Successfully installed jedi-0.19.2


In [1]:
!pip install -q pyspark==3.5.1


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EcommerceAnalysis") \
    .master("local[*]") \
    .getOrCreate()

spark


In [3]:
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



Step 1: Load the dataset

In [11]:
DATA_PATH = "/content/ecommerce_sales_200k.csv"


In [12]:
df = spark.read.csv(DATA_PATH, header=True, inferSchema=True)
df.show(5)


+--------+-----------+----------+----------------+----------------+--------+-------+--------+--------------------+---------+-----------+------------+-------------+-------------+
|order_id|customer_id|product_id|product_category|  payment_method|quantity|  price|discount|          order_date|ship_city| ship_state|order_status|delivery_time|review_rating|
+--------+-----------+----------+----------------+----------------+--------+-------+--------+--------------------+---------+-----------+------------+-------------+-------------+
|       1|      16795|      1779|         Grocery|     Credit Card|       5|4849.35|    0.02|2023-07-05 22:25:...|   Mumbai|West Bengal|   Cancelled|           13|            4|
|       2|       1860|      7973|         Grocery|     Net Banking|       4|4154.32|    0.26|2024-01-09 09:19:...|     Pune|  Karnataka|     Shipped|            4|            1|
|       3|      39158|       865|     Electronics|Cash on Delivery|       2|3846.68|    0.37|2024-05-15 17:44:

Step 2: Data Cleaning

In [13]:
from pyspark.sql.functions import col, to_date, expr

df_clean = df.withColumn("order_date", to_date(col("order_date"))) \
             .withColumn("total_amount", expr("quantity * price * (1 - discount)"))

df_clean.show(5)


+--------+-----------+----------+----------------+----------------+--------+-------+--------+----------+---------+-----------+------------+-------------+-------------+------------------+
|order_id|customer_id|product_id|product_category|  payment_method|quantity|  price|discount|order_date|ship_city| ship_state|order_status|delivery_time|review_rating|      total_amount|
+--------+-----------+----------+----------------+----------------+--------+-------+--------+----------+---------+-----------+------------+-------------+-------------+------------------+
|       1|      16795|      1779|         Grocery|     Credit Card|       5|4849.35|    0.02|2023-07-05|   Mumbai|West Bengal|   Cancelled|           13|            4|         23761.815|
|       2|       1860|      7973|         Grocery|     Net Banking|       4|4154.32|    0.26|2024-01-09|     Pune|  Karnataka|     Shipped|            4|            1|12296.787199999999|
|       3|      39158|       865|     Electronics|Cash on Deliver

Step 3: Top Product Categories by Sales

In [14]:
category_sales = df_clean.groupBy("product_category") \
                         .sum("total_amount") \
                         .withColumnRenamed("sum(total_amount)", "category_sales") \
                         .orderBy(col("category_sales").desc())

category_sales.show()


+----------------+--------------------+
|product_category|      category_sales|
+----------------+--------------------+
|           Books| 1.747453271623997E8|
|          Beauty|1.7392012003299987E8|
|            Home|1.7362833863220072E8|
|     Electronics|1.7336581485539967E8|
|        Clothing|1.7254397213320005E8|
|         Grocery|1.7226047000560015E8|
|          Sports|1.7193672426100028E8|
+----------------+--------------------+



Step 4: Monthly Sales Trend

In [16]:
from pyspark.sql.functions import date_format

monthly_sales = df_clean.withColumn("month", date_format("order_date", "yyyy-MM")) \
                        .groupBy("month") \
                        .sum("total_amount") \
                        .withColumnRenamed("sum(total_amount)", "monthly_sales") \
                        .orderBy("month")

monthly_sales.show()


+-------+--------------------+
|  month|       monthly_sales|
+-------+--------------------+
|2022-01|3.3686210686300024E7|
|2022-02|3.1198883365699973E7|
|2022-03|     3.36251423029E7|
|2022-04| 3.339740752500005E7|
|2022-05| 3.493266453460004E7|
|2022-06|     3.37065016959E7|
|2022-07|3.4627636997700036E7|
|2022-08| 3.380037265000003E7|
|2022-09| 3.371583597100003E7|
|2022-10| 3.483119760309993E7|
|2022-11| 3.332044841839996E7|
|2022-12| 3.369126508989995E7|
|2023-01| 3.415532619069994E7|
|2023-02| 3.053511698460006E7|
|2023-03| 3.460566346179994E7|
|2023-04|3.2683315928399965E7|
|2023-05|3.4656260053099945E7|
|2023-06| 3.232798408550001E7|
|2023-07| 3.414868674899995E7|
|2023-08| 3.496900981119996E7|
+-------+--------------------+
only showing top 20 rows



Step 5: Top 50 Customers by Revenue

In [17]:
top_customers = df_clean.groupBy("customer_id") \
                        .sum("total_amount") \
                        .withColumnRenamed("sum(total_amount)", "customer_spend") \
                        .orderBy(col("customer_spend").desc()) \
                        .limit(50)

top_customers.show()


+-----------+------------------+
|customer_id|    customer_spend|
+-----------+------------------+
|      49110|       114719.6257|
|      33995|109179.03330000001|
|      22972|        106153.597|
|      10862|105885.03510000001|
|      39458|104283.40539999999|
|      10613|       103704.5784|
|      45334|       103570.8507|
|      11559|101426.77789999999|
|      30920|101267.49919999999|
|       9116|101012.05539999998|
|      12164|        99430.0276|
|       2823| 98059.80949999999|
|      24267| 98004.04879999999|
|      47730|        96677.1286|
|      22831| 96556.04620000001|
|      40624| 96428.56619999999|
|       2418| 95684.90479999999|
|      14288|        94946.6975|
|       5944|        94177.1593|
|       5971|        93876.2471|
+-----------+------------------+
only showing top 20 rows



In [18]:
top_customers.show(50, truncate=False)


+-----------+------------------+
|customer_id|customer_spend    |
+-----------+------------------+
|49110      |114719.6257       |
|33995      |109179.03330000001|
|22972      |106153.597        |
|10862      |105885.03510000001|
|39458      |104283.40539999999|
|10613      |103704.5784       |
|45334      |103570.8507       |
|11559      |101426.77789999999|
|30920      |101267.49919999999|
|9116       |101012.05539999998|
|12164      |99430.0276        |
|2823       |98059.80949999999 |
|24267      |98004.04879999999 |
|47730      |96677.1286        |
|22831      |96556.04620000001 |
|40624      |96428.56619999999 |
|2418       |95684.90479999999 |
|14288      |94946.6975        |
|5944       |94177.1593        |
|5971       |93876.2471        |
|35173      |93655.1329        |
|48914      |93597.27750000001 |
|1645       |93324.83559999999 |
|33879      |93130.57149999999 |
|44381      |92963.8256        |
|43918      |92239.6352        |
|37880      |91795.9662        |
|33891    

**1. Compare Pandas vs PySpark Load Time (Scalability Proof)**

In [19]:
#Load with Pandas

In [20]:
import pandas as pd
import time

start = time.time()
pdf = pd.read_csv("/content/ecommerce_sales_200k.csv")
end = time.time()

print("Pandas Load Time:", end - start)


Pandas Load Time: 0.8021409511566162


In [21]:
#Load with PySpark

In [22]:
start = time.time()
df = spark.read.csv("/content/ecommerce_sales_200k.csv", header=True, inferSchema=True)
end = time.time()

print("PySpark Load Time:", end - start)


PySpark Load Time: 2.6401350498199463


This shows that Pandas loaded the dataset in 0.80 seconds, while PySpark required 2.64 seconds.PySpark is designed for scalability — it can efficiently process millions of rows by dividing data into multiple partitions and executing tasks in parallel. Pandas cannot scale beyond memory, while PySpark can.

In [24]:
#Pandas GroupBy Time
start = time.time()
pdf.groupby("product_category")["price"].sum()
end = time.time()

print("Pandas GroupBy Time:", end - start)


Pandas GroupBy Time: 0.05921053886413574


In [25]:
#pyspark group by time
from pyspark.sql.functions import sum

start = time.time()
df.groupBy("product_category").agg(sum("price")).collect()
end = time.time()

print("PySpark GroupBy Time:", end - start)


PySpark GroupBy Time: 2.2891788482666016


###scalability analysis

In [26]:
#number of Spark partitions
df_clean.rdd.getNumPartitions() ### very low parallelism


2

In [27]:
##Increase partitions to 16
df_re = df_clean.repartition(16)
df_re.rdd.getNumPartitions()


16

In [28]:
##distributed GroupBy
from pyspark.sql.functions import sum
import time

start = time.time()
df_re.groupBy("product_category").agg(sum("total_amount")).collect()
end = time.time()

print("Distributed GroupBy with Repartition Time:", end - start)


Distributed GroupBy with Repartition Time: 3.5543136596679688


Even though PySpark is slower on a single machine, its ability to repartition the dataset into many distributed chunks demonstrates true big-data scalability, which Pandas cannot do.

###This project demonstrates how PySpark scales large datasets by distributing data across multiple partitions, unlike Pandas which is limited to single-machine processing. By performing operations such as GroupBy, repartitioning, and aggregations on over 200K records, PySpark successfully generated insights on top categories, customer spending, and sales patterns. Although slightly slower on a single system, PySpark’s architecture is designed for horizontal scaling, making it ideal for real-world big data workloads