<a href="https://colab.research.google.com/github/tejpalla/SparkProjects/blob/main/SupplyChainAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Supply Chain Data Analysis**

How to open this setup in Google colab



*   Go to Google Colab (https://colab.research.google.com/).Sign up if you haven't.
*   In Colab, go to "File" -> "Open notebook".
*   Select the "GitHub" tab.
*   Search for the repository tejpalla/SparkProjects
*   Navigate to the .ipynb file and select it to open.
*   You can either make a copy of this notebook to your drive or store it in git



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, current_timestamp, when, lit

#loading spark engine
spark = SparkSession.builder \
    .appName("Simple_SupplyChain_Cleaner") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

print("Spark engine is ready.")

In [None]:
import kagglehub

input_file_path = kagglehub.dataset_download('shashwatwork/dataco-smart-supply-chain-for-big-data-analysis')
output_folder = "cleaned_supply_chain_data"

raw_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(input_file_path)

print(f"Loaded {raw_df.count()} rows.")
raw_df.printSchema()

Downloading from https://www.kaggle.com/api/v1/datasets/download/shashwatwork/dataco-smart-supply-chain-for-big-data-analysis?dataset_version_number=1...


100%|██████████| 25.7M/25.7M [00:00<00:00, 159MB/s]

Extracting files...





Loaded 650548 rows.
root
 |-- Type: string (nullable = true)
 |-- Days for shipping (real): string (nullable = true)
 |-- Days for shipment (scheduled): string (nullable = true)
 |-- Benefit per order: string (nullable = true)
 |-- Sales per customer: string (nullable = true)
 |-- Delivery Status: string (nullable = true)
 |-- Late_delivery_risk: string (nullable = true)
 |-- Category Id: string (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Customer City: string (nullable = true)
 |-- Customer Country: string (nullable = true)
 |-- Customer Email: string (nullable = true)
 |-- Customer Fname: string (nullable = true)
 |-- Customer Id: integer (nullable = true)
 |-- Customer Lname: string (nullable = true)
 |-- Customer Password: string (nullable = true)
 |-- Customer Segment: string (nullable = true)
 |-- Customer State: string (nullable = true)
 |-- Customer Street: string (nullable = true)
 |-- Customer Zipcode: integer (nullable = true)
 |-- Department Id: inte

In [29]:
#Transform
#renaming columns for better reading
chaindf = raw_df.toDF(*[c.lower().replace(' ', '_').replace('(', '').replace(')', '') for c in raw_df.columns])

print("Columns are now clean:")
print(chaindf.columns)
chaindf.printSchema()

Columns are now clean:
['type', 'days_for_shipping_real', 'days_for_shipment_scheduled', 'benefit_per_order', 'sales_per_customer', 'delivery_status', 'late_delivery_risk', 'category_id', 'category_name', 'customer_city', 'customer_country', 'customer_email', 'customer_fname', 'customer_id', 'customer_lname', 'customer_password', 'customer_segment', 'customer_state', 'customer_street', 'customer_zipcode', 'department_id', 'department_name', 'latitude', 'longitude', 'market', 'order_city', 'order_country', 'order_customer_id', 'order_date_dateorders', 'order_id', 'order_item_cardprod_id', 'order_item_discount', 'order_item_discount_rate', 'order_item_id', 'order_item_product_price', 'order_item_profit_ratio', 'order_item_quantity', 'sales', 'order_item_total', 'order_profit_per_order', 'order_region', 'order_state', 'order_status', 'order_zipcode', 'product_card_id', 'product_category_id', 'product_description', 'product_image', 'product_name', 'product_price', 'product_status', 'shippi

In [35]:
chaindf.dropna(subset=["delivery_status"])

DataFrame[type: string, days_for_shipping_real: string, days_for_shipment_scheduled: string, benefit_per_order: string, sales_per_customer: string, delivery_status: string, late_delivery_risk: string, category_id: string, category_name: string, customer_city: string, customer_country: string, customer_email: string, customer_fname: string, customer_id: int, customer_lname: string, customer_password: string, customer_segment: string, customer_state: string, customer_street: string, customer_zipcode: int, department_id: int, department_name: string, latitude: double, longitude: double, market: string, order_city: string, order_country: string, order_customer_id: int, order_date_dateorders: string, order_id: int, order_item_cardprod_id: int, order_item_discount: double, order_item_discount_rate: double, order_item_id: int, order_item_product_price: double, order_item_profit_ratio: double, order_item_quantity: int, sales: double, order_item_total: double, order_profit_per_order: double, or

In [36]:
chaindf.show()

+--------+----------------------+---------------------------+-----------------+------------------+-----------------+------------------+-----------+--------------+--------------+----------------+--------------+--------------+-----------+--------------+-----------------+----------------+--------------+--------------------+----------------+-------------+---------------+-----------+------------+------------+----------+-------------+-----------------+---------------------+--------+----------------------+-------------------+------------------------+-------------+------------------------+-----------------------+-------------------+------+----------------+----------------------+--------------+--------------------+---------------+-------------+---------------+-------------------+-------------------+--------------------+------------+-------------+--------------+------------------------+--------------+
|    type|days_for_shipping_real|days_for_shipment_scheduled|benefit_per_order|sales_per_custom

In [84]:
processed_df = chaindf.withColumn(
    "clean_order_date",
    to_date(col("order_date_dateorders"), "yyyy-MM-dd")
).withColumn(
    # Add a column showing the exact time we ran this cleaning job
    "cleaning_timestamp",
    current_timestamp()
).withColumn(
    # Add a simple 'True/False' flag: is the item total over $1000?
    "is_high_value_item",
    when(col("order_item_total") > 1000, lit(True)).otherwise(lit(False))
)
# Fix the date column so Spark knows it's a calendar date, not just text

processed_df.show()

+--------+----------------------+---------------------------+-----------------+------------------+-----------------+------------------+-----------+--------------+--------------+----------------+--------------+--------------+-----------+--------------+-----------------+----------------+--------------+--------------------+----------------+-------------+---------------+-----------+------------+------------+----------+-------------+-----------------+---------------------+--------+----------------------+-------------------+------------------------+-------------+------------------------+-----------------------+-------------------+------+----------------+----------------------+--------------+--------------------+---------------+-------------+---------------+-------------------+-------------------+--------------------+------------+-------------+--------------+------------------------+--------------+----------------+--------------------+------------------+
|    type|days_for_shipping_real|days_

In [48]:
final_df = processed_df.filter(
    (col("order_status") != "Canceled") &
    (col("order_status") != "Suspected Fraud")
)

print(f"Rows removed. Final count: {final_df.count()}")
final_df.printSchema()

Rows removed. Final count: 180519
root
 |-- type: string (nullable = true)
 |-- days_for_shipping_real: string (nullable = true)
 |-- days_for_shipment_scheduled: string (nullable = true)
 |-- benefit_per_order: string (nullable = true)
 |-- sales_per_customer: string (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- late_delivery_risk: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_name: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_country: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_segment: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_zipcode: integer (nullable = true)
 |-- departmen

In [63]:
from pyspark.sql.functions import month

final_df.withColumn("order_month", month("clean_order_date")) \
    .groupBy("order_month").agg({"order_item_total" : "sum"}) \
    .orderBy("order_month").show()

+-----------+---------------------+
|order_month|sum(order_item_total)|
+-----------+---------------------+
|       NULL| 3.3054402380243767E7|
+-----------+---------------------+



In [61]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

#ranking regions by total revenue

rankedo_regions_df = final_df.groupBy("order_region").agg({"order_item_total": "sum"})
ranked_regions_df = ranked_regions_df.withColumnRenamed("sum(order_item_total)", "order_item_total")
window_spec = Window.orderBy(col("order_item_total").desc())
ranked_regions_df = ranked_regions_df.withColumn("rank", rank().over(window_spec))
ranked_regions_df.orderBy("rank").show(truncate=False)

+---------------+------------------+----+
|order_region   |order_item_total  |rank|
+---------------+------------------+----+
|Western Europe |5296002.889842125 |1   |
|Central America|5093849.716162516 |2   |
|South America  |2660243.6894642645|3   |
|Northern Europe|1939362.4270527153|4   |
|Southern Europe|1837525.8170129675|5   |
|Oceania        |1809996.5475716766|6   |
|Southeast Asia |1738553.4361501161|7   |
|Caribbean      |1481668.6862140293|8   |
|West of USA    |1412253.596013557 |9   |
|South Asia     |1397364.5733737191|10  |
|Eastern Asia   |1334313.4126545307|11  |
|East of USA    |1231955.1338964938|12  |
|West Asia      |1056080.9921746957|13  |
|US Center      |1034129.4315050505|14  |
|South of  USA  |706904.3379142467 |15  |
|Eastern Europe |696307.1877618035 |16  |
|West Africa    |654168.0272417009 |17  |
|North Africa   |572241.9663370887 |18  |
|East Africa    |338054.31389209436|19  |
|Central Africa |292912.56316194165|20  |
+---------------+-----------------

In [56]:
#average item price
final_df.agg({"product_price" : "avg"}).show()

+------------------+
|avg(product_price)|
+------------------+
|141.23254992874294|
+------------------+



In [None]:
#Total Revenue by region


In [50]:
#dlivery status count
final_df.groupBy("delivery_status").count().show()

+-----------------+-----+
|  delivery_status|count|
+-----------------+-----+
| Shipping on time|32196|
| Advance shipping|41592|
|Shipping canceled| 7754|
|    Late delivery|98977|
+-----------------+-----+



In [None]:
final_df.groupBy("order_region")

In [54]:
#DeliveryStatus count per region
final_df.groupBy("order_region").pivot("delivery_status").agg({"customer_id":"count"}).show()

+---------------+----------------+-------------+-----------------+----------------+
|   order_region|Advance shipping|Late delivery|Shipping canceled|Shipping on time|
+---------------+----------------+-------------+-----------------+----------------+
|     South Asia|            1765|         4350|              276|            1340|
| Eastern Europe|             879|         2182|              135|             724|
|Southern Europe|            2246|         5129|              401|            1655|
|   West of USA |            1835|         4313|              398|            1447|
|   Central Asia|             112|          306|               11|             124|
|Central America|            6566|        15518|             1167|            5090|
|    East of USA|            1560|         3849|              298|            1208|
|   North Africa|             760|         1762|              146|             564|
|Northern Europe|            2341|         5292|              384|          

In [76]:
from pyspark.sql.window import Window as W
final_df.groupBy("delivery_status").agg(F.count("*").alias("status_count")).withColumn("rate", F.col("status_count") * 100 / F.sum("status_count").over(W.partitionBy())).filter(F.col("delivery_status") == "Late delivery").show()

+---------------+------------+------------------+
|delivery_status|status_count|              rate|
+---------------+------------+------------------+
|  Late delivery|       98977|54.829131559558824|
+---------------+------------+------------------+



In [83]:
#Average product price per category
final_df.groupBy("category_name").agg(F.avg("order_item_product_price").alias("avg_category_price")).show()

+--------------------+------------------+
|       category_name|avg_category_price|
+--------------------+------------------+
|    Men's Golf Clubs|117.79918860671388|
| Children's Clothing|  357.100006099996|
|      Sporting Goods|            327.75|
|    Camping & Hiking| 299.9800109999285|
| Fitness Accessories| 53.27479095249206|
|            Cameras |452.04000849999267|
|          Golf Shoes| 78.00190839694656|
|           Computers|            1500.0|
|Consumer Electronics| 252.8800048999986|
|          Basketball| 404.4676021402983|
|   Health and Beauty|293.04000850000006|
|        Pet Supplies|  84.4000015299998|
|                DVDs| 164.3800048999987|
|      Men's Footwear|129.99000550001438|
|              Crafts|461.48001100000477|
|    Women's Clothing|215.82000730000178|
|         Electronics| 39.43360013087904|
|         Video Games|             39.75|
|     Women's Apparel|              50.0|
|      Girls' Apparel| 41.75020857951714|
+--------------------+------------