In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
df=spark.read.format("csv").option("header",True).option("delimiter",",").option("inferschema",True).load("dbfs:/FileStore/tables/Sales/SalesStore")
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10023012 |Office Supplies|Tape        |5    |1       |0.15  |OH   |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10002828 |Office Supplies|Tape        |5    |1       |0.15  |OH   |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10000482 |Office Supplies|Tape      

#Data Manuliplation Task

In [0]:
# 1. Order_Month:
#     Derivation: Extract the month from the Order_Date column.
#     Example: If Order_Date is '2021-02-15', the derived column would be 'February'.
df=df.withColumn("Order_Month",date_format(df.Order_Date,"MMMM"))
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10023012 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10002828 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |
|2021-02-15|2

In [0]:
# 2. Shipping Duration:
#     Derivation: Calculate the duration between Order_Date and Ship_Date.
#     Example: If Order_Date is '2021-02-15' and Ship_Date is '2021-02-20', the derived column could be '5 days'.
# df=df.withColumn("Shipping_Duration",concat_ws(datediff(df.Ship_Date,df.Order_Date),lit("Days")))
df = df.withColumn("Shipping_Duration", concat(datediff(df.Ship_Date, df.Order_Date),lit(" Days")))
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|Shipping_Duration|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10023012 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10002828 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |
|2021-02-15|2021-02-20|Ground   |44104      |M

In [0]:
# 3. Sales per Quantity:
#     Derivation: Calculate the ratio of Sales to Quantity.
#     Example: If Sales is 500 and Quantity is 2, the derived column would be 250 (Sales per Quantity)
df=df.withColumn("Sales_Per_Quantity",round((df.Sales/df.Quantity),2))
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|Shipping_Duration|Sales_Per_Quantity|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10023012 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10002828 |Office Supplies|Tape        |5    |1   

In [0]:
# 4. Profit Margin:
#     Derivation: Calculate the percentage profit margin based on Profit and Sales.
#     Example: If Profit is 10 and Sales is 500, the derived column could be 2% (Profit Margin).
df=df.withColumn("Profit_Margin",round((df.Profit/df.Sales)*100,2))
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|Shipping_Duration|Sales_Per_Quantity|Profit_Margin|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |3.0          |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10023012 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |3.0          |
|2021-02-15|2021-02-20|Ground   |44104  

In [0]:
# 5. Year from Order Date:
#     Derivation: Extract the year from the Order_Date column.
#     Example: If Order_Date is '2021-02-15', the derived column would be '2021'.

df=df.withColumn("Year_from_Order_Date",date_format(df.Order_Date,"yyyy"))
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|Shipping_Duration|Sales_Per_Quantity|Profit_Margin|Year_from_Order_Date|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |3.0          |2021                |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10023012 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days  

In [0]:
# 8. Order Priority:
#     Derivation: Assign priority levels based on the Ship_Mode column (e.g., 'High', 'Medium', 'Low').
#     Example: If Ship_Mode is 'Air', the derived column could be 'High'.
df=df.withColumn("Order_Priority",when(col("Ship_Mode")=="Air","High").when(df.Ship_Mode=="Ground","Low").otherwise("Medium"))
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+--------------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|Shipping_Duration|Sales_Per_Quantity|Profit_Margin|Year_from_Order_Date|Order_Priority|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+--------------+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |3.0          |2021                |Low           |
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10023012 |Office Supplies|T

In [0]:
# 10. Order Value:
#     Derivation: Calculate the total value of each order by multiplying Sales and Quantity.
#     Example: If Sales is 200 and Quantity is 3, the derived column would be 600 (Order Value).
df=df.withColumn("Order_Value",df.Sales*df.Quantity)
df.show(5,False)

+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+--------------+-----------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region |Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|Shipping_Duration|Sales_Per_Quantity|Profit_Margin|Year_from_Order_Date|Order_Priority|Order_Value|
+----------+----------+---------+-----------+-------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+--------------+-----------+
|2021-02-15|2021-02-20|Ground   |44104      |Midwest|OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |3.0          |2021                |Low           |5          |
|2021-02-15|2021-02-20|Ground   |44104  

In [0]:
# Change the region value of all the Midwest to west
df=df.withColumn("Region",when(col("Region")=="Midwest","West").otherwise(col("Region")))
df.show(5,False)
df.createOrReplaceTempView("a_df")

+----------+----------+---------+-----------+------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+--------------+-----------+
|Order_Date|Ship_Date |Ship_Mode|Postal_Code|Region|Product_Reference|Category       |Sub_Category|Sales|Quantity|Profit|State|Order_Month|Shipping_Duration|Sales_Per_Quantity|Profit_Margin|Year_from_Order_Date|Order_Priority|Order_Value|
+----------+----------+---------+-----------+------+-----------------+---------------+------------+-----+--------+------+-----+-----------+-----------------+------------------+-------------+--------------------+--------------+-----------+
|2021-02-15|2021-02-20|Ground   |44104      |West  |OFF-TAP-10022901 |Office Supplies|Tape        |5    |1       |0.15  |OH   |February   |5 Days           |5.0               |3.0          |2021                |Low           |5          |
|2021-02-15|2021-02-20|Ground   |44104      

# Data Analysis 

In [0]:
# Question: Rank orders based on profit margin in descending order
a_df=df.select("Sub_Category","Profit_Margin").withColumn("Rank_Of_Orders",dense_rank().over(Window.orderBy(col("Profit_Margin").desc()))).distinct()
a_df.show(5,False)

+------------+-------------+--------------+
|Sub_Category|Profit_Margin|Rank_Of_Orders|
+------------+-------------+--------------+
|Tape        |3.0          |1             |
|Erasers     |3.0          |1             |
|Staplers    |3.0          |1             |
|Rulers      |3.0          |1             |
|Scissors    |3.0          |1             |
+------------+-------------+--------------+
only showing top 5 rows



In [0]:
%sql select Sub_Category,Profit_Margin,dense_rank() over( Order by Profit_Margin desc) as Rank_Of_Orders  from a_df

Sub_Category,Profit_Margin,Rank_Of_Orders
Tape,3.0,1
Tape,3.0,1
Tape,3.0,1
Tape,3.0,1
Tape,3.0,1
Erasers,3.0,1
Erasers,3.0,1
Erasers,3.0,1
Erasers,3.0,1
Erasers,3.0,1


In [0]:
# Question: Find orders with the same Order_Date, Ship_Date, Product_Reference, and State.
a_df=df.select("Order_Date","Ship_Date","Product_Reference","State")
a_df=a_df.withColumn("Same_Orders",row_number().over(Window.partitionBy(df.Order_Date,df.Ship_Date,df.Product_Reference,df.State).orderBy(df.Order_Date,df.Ship_Date,df.Product_Reference,df.State)))
a_df=a_df.filter(a_df.Same_Orders>=2).drop("Same_Orders").distinct()
a_df.show(5,False)

+----------+----------+-----------------+-----+
|Order_Date|Ship_Date |Product_Reference|State|
+----------+----------+-----------------+-----+
|2021-02-15|2021-02-20|OFF-TAP-10022901 |OH   |
|2021-03-10|2021-03-15|FUR-CH-10000454  |TX   |
|2021-04-06|2021-04-11|FUR-SOF-10009567 |FL   |
|2021-05-03|2021-05-08|FUR-CHA-10023012 |TX   |
|2021-05-22|2021-05-28|OFF-PAP-10001234 |GA   |
+----------+----------+-----------------+-----+
only showing top 5 rows



In [0]:
%sql select Order_Date, Ship_Date, Product_Reference,State from (select Order_Date, Ship_Date, Product_Reference,State,count(*) from a_df
group by 1,2,3,4
having count(*)>1)

Order_Date,Ship_Date,Product_Reference,State
2022-07-29,2022-08-03,FUR-STO-10014123,IL
2023-01-02,2023-01-08,OFF-GLU-10016345,OH
2021-05-03,2021-05-08,FUR-CHA-10023012,TX
2023-04-21,2023-04-28,OFF-STP-10009123,GA
2022-04-03,2022-04-10,FUR-TAB-10005678,FL
2022-01-24,2022-01-30,FUR-DES-10026345,IL
2022-11-27,2022-12-03,FUR-BOO-10008901,OH
2022-09-09,2022-09-15,OFF-NOT-10007890,UT
2021-02-15,2021-02-20,OFF-TAP-10022901,OH
2021-03-10,2021-03-15,FUR-CH-10000454,TX


In [0]:
# Question: Retrieve the latest order for each unique Product_Reference.
a_df=df.select("Product_Reference","Order_Date").groupBy("Product_Reference").agg(max(col("Order_Date")).alias("Latest_orderDate"))
a_df.show(5,False)

+-----------------+----------------+
|Product_Reference|Latest_orderDate|
+-----------------+----------------+
|OFF-NOT-10000491 |2023-02-13      |
|FUR-TAB-10000489 |2022-09-15      |
|FUR-CH-10002345  |2021-08-17      |
|ELE-ACC-10018567 |2023-05-29      |
|ELE-PRI-10009345 |2023-09-30      |
+-----------------+----------------+
only showing top 5 rows



In [0]:
%sql select Product_Reference,max(Order_Date) as Latest_orderDate from a_df
group by 1

Product_Reference,Latest_orderDate
OFF-NOT-10000491,2023-02-13
FUR-TAB-10000489,2022-09-15
FUR-CH-10002345,2021-08-17
ELE-ACC-10018567,2023-05-29
ELE-PRI-10009345,2023-09-30
FUR-BED-10012890,2021-11-12
FUR-DES-10027456,2022-01-24
FUR-BOO-10000462,2022-11-27
ELE-LAP-10000490,2022-11-30
FUR-LIG-10020789,2023-10-28


In [0]:
# Question: Calculate the cumulative sales for each month, ordered by Order_Date.
a_df=df.select("Order_Date","Order_Month","Sales").withColumn("Rolling_Sum_By_Month",sum(col("Sales")).over(Window.partitionBy(df.Order_Date,df.Order_Month).orderBy(df.Order_Date.asc()))).drop("Sales").distinct()
a_df.show(5,False)

+----------+-----------+--------------------+
|Order_Date|Order_Month|Rolling_Sum_By_Month|
+----------+-----------+--------------------+
|2021-02-15|February   |25                  |
|2021-03-10|March      |3000                |
|2021-04-06|April      |4200                |
|2021-05-03|May        |1000                |
|2021-05-22|May        |600                 |
+----------+-----------+--------------------+
only showing top 5 rows



In [0]:
%sql select distinct Order_Date,Order_Month,sum(Sales) over(partition by order_date,Order_Month order by order_Date Asc) Rolling_Sum_By_Month from a_df

Order_Date,Order_Month,Rolling_Sum_By_Month
2021-02-15,February,25
2021-03-10,March,3000
2021-04-06,April,4200
2021-05-03,May,1000
2021-05-22,May,600
2021-06-25,June,2700
2021-07-18,July,600
2021-08-17,August,1800
2021-09-20,September,120
2021-10-05,October,4800


In [0]:
# Question: Determine the average sales for each month and identify any seasonal trends.
a_df=df.select("Order_Date","Order_Month","Sales").groupBy(df.Order_Date,df.Order_Month).agg(avg(col("Sales")).alias("Avg_Sales_Per_Month"))
a_df.show(5,False)

+----------+-----------+-------------------+
|Order_Date|Order_Month|Avg_Sales_Per_Month|
+----------+-----------+-------------------+
|2021-04-06|April      |700.0              |
|2023-07-18|July       |180.0              |
|2023-04-21|April      |30.0               |
|2022-06-14|June       |150.0              |
|2021-05-03|May        |200.0              |
+----------+-----------+-------------------+
only showing top 5 rows



In [0]:
%sql select Order_Date,Order_Month,avg(Sales) Avg_Sales_Per_Month from a_df 
group by 1,2

Order_Date,Order_Month,Avg_Sales_Per_Month
2021-04-06,April,700.0
2023-07-18,July,180.0
2023-04-21,April,30.0
2022-06-14,June,150.0
2021-05-03,May,200.0
2021-10-05,October,800.0
2023-07-15,July,350.0
2022-11-27,November,600.0
2021-11-12,November,900.0
2022-10-18,October,600.0


In [0]:
# Question: Find products (based on State) with the highest cumulative order values.
a_df=df.select("Sub_Category","State","Order_Value").groupBy(col("State")).agg(max(col("Order_Value")).alias("Highest_Order_Value"))
a_df.show(5,False)

+-----+-------------------+
|State|Highest_Order_Value|
+-----+-------------------+
|DC   |2100               |
|CA   |900                |
|IL   |1650               |
|WA   |450                |
|OH   |2400               |
+-----+-------------------+
only showing top 5 rows



In [0]:
%sql select State,max(Order_Value) from a_df
group by 1

State,max(Order_Value)
DC,2100
CA,900
IL,1650
WA,450
OH,2400
TX,3600
GA,3400
FL,2100
UT,900


In [0]:
# Question: Analyze and identify what mode is the most prefered way to transport.
a_df=df.select("Ship_Mode","Order_Priority").groupBy(col("Ship_Mode")).agg(count(col("Order_Priority")).alias("Preffered_mode"))
a_df.show(5,False)

+---------+--------------+
|Ship_Mode|Preffered_mode|
+---------+--------------+
|Air      |110           |
|Ground   |106           |
+---------+--------------+



In [0]:
%sql select Ship_Mode,count(Order_Priority) Preffered_mode from a_df
group by 1

Ship_Mode,Preffered_mode
Air,110
Ground,106


#only Pyspark

In [0]:
# Question: Identify products where the profit margin has been decreasing over time.
a_df=df.select("Order_Date","Sub_Category","Profit_Margin").withColumn("Lag",lag("Profit_Margin").over(Window.partitionBy(col("Sub_Category")).orderBy(col("Order_Date").asc())))
a_df=a_df.select("Sub_Category").filter((col("Profit_Margin")>col("Lag")))
a_df.show(5,False)

+------------+
|Sub_Category|
+------------+
+------------+



In [0]:
# Question: Calculate the average profit margin for each region.
a_df=df.select("Region","Profit_Margin").groupBy(col("Region")).agg(round(avg(col("Profit_Margin")),2).alias("Profit_By_Region"))
a_df.show(5,False)

+------+----------------+
|Region|Profit_By_Region|
+------+----------------+
|South |2.08            |
|East  |2.2             |
|West  |2.17            |
+------+----------------+

