In [1]:
from pyspark.sql.session import SparkSession
import pyspark.sql.types as tp
spark=SparkSession.builder.appName("pyspark-project").master("local[*]").getOrCreate()


# Q1. Set the schema for all the data sets and load them from different locations using file structured streaming.

In [2]:
#defining the schema for all the csv files

orders_schema = tp.StructType([
    tp.StructField(name= "Order_ID",dataType= tp.IntegerType()),
    tp.StructField(name= "Order_Date",dataType= tp.StringType()),
    tp.StructField(name= "Order_Priority",dataType= tp.StringType()),
    tp.StructField(name= "Ord_id",dataType= tp.StringType()),
])
market_schema = tp.StructType([
    tp.StructField(name= "ord_id",dataType= tp.StringType()),
    tp.StructField(name= "Prod_id",dataType= tp.StringType()),
    tp.StructField(name= "Ship_id",dataType= tp.StringType()),
    tp.StructField(name= "Cust_id",dataType= tp.StringType()),
    tp.StructField(name= "Sales",dataType= tp.DoubleType()),
    tp.StructField(name= "Discount",dataType= tp.DoubleType()),
    tp.StructField(name= "Order_Quantity",dataType= tp.IntegerType()),
    
    tp.StructField(name= "Profit",dataType= tp.DoubleType()),
    tp.StructField(name= "Shipping_cost",dataType= tp.DoubleType()),
    tp.StructField(name= "Product_Base_Margin",dataType= tp.StringType()),
    
])
shipping_schema = tp.StructType([
    tp.StructField(name= "Order_id",dataType= tp.IntegerType()),
    tp.StructField(name= "Ship_Mode",dataType= tp.StringType()),
    tp.StructField(name= "Ship_Date",dataType= tp.StringType()),
    tp.StructField(name= "Ship_id",dataType= tp.StringType()),
    
])
customer_schema = tp.StructType([
    tp.StructField(name= "Customer_Name",dataType= tp.StringType()),
    tp.StructField(name= "Province",dataType= tp.StringType()),
    tp.StructField(name= "Region",dataType= tp.StringType()),
    tp.StructField(name= "Customer_Segment",dataType= tp.StringType()),
    tp.StructField(name= "Cust_id",dataType= tp.StringType()),

])

product_schema = tp.StructType([
    tp.StructField(name= "Product_Category",dataType= tp.StringType()),
    tp.StructField(name= "Product_Sub_Category",dataType= tp.StringType()),
    tp.StructField(name= "Prod_id",dataType= tp.StringType()),
])

In [3]:
df1 = spark.readStream.schema(orders_schema).option("header", True).csv("/user/narendrag85qgre/pyspark/orders") 
df2 = df1.writeStream.format("memory").queryName("orders_query").outputMode("append").start() 

# To convert order to DataFrame
order_df = spark.sql("SELECT * FROM orders_query")  

In [4]:
df3 = spark.readStream.schema(product_schema).option("header", True).csv("/user/narendrag85qgre/pyspark/product") 
df4 = df3.writeStream.format("memory").queryName("product_query").outputMode("append").start() 

# To convert product to DataFrame
product_df = spark.sql("SELECT * FROM product_query") 

In [5]:
df5 = spark.readStream.schema(customer_schema).option("header", True).csv("/user/narendrag85qgre/pyspark/customer") 
df6 = df5.writeStream.format("memory").queryName("customer_query").outputMode("append").start() 

# To convert customer to DataFrame
customer_df = spark.sql("SELECT * FROM customer_query") 

In [6]:
df7 = spark.readStream.schema(market_schema).option("header", True).csv("/user/narendrag85qgre/pyspark/market") 
df8 = df7.writeStream.format("memory").queryName("market_query").outputMode("append").start() 

# To convert market to DataFrame
market_df = spark.sql("SELECT * FROM market_query") 

In [7]:
df9 = spark.readStream.schema(shipping_schema).option("header", True).csv("/user/narendrag85qgre/pyspark/shipping") 
df10= df9.writeStream.format("memory").queryName("shipping_query").outputMode("append").start() 

# To convert shipping to DataFrame
shipping_df = spark.sql("SELECT * FROM shipping_query")

In [8]:
#checking schema and dataframes
dataframes={"product":product_df,"order":order_df,"customer":customer_df,"shipping":shipping_df,"market":market_df}
for i in dataframes.keys():
    print(dataframes[i].printSchema(),dataframes[i].show(2))

root
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Prod_id: string (nullable = true)

+----------------+--------------------+-------+
|Product_Category|Product_Sub_Category|Prod_id|
+----------------+--------------------+-------+
| OFFICE SUPPLIES|STORAGE & ORGANIZ...| Prod_1|
| OFFICE SUPPLIES|          APPLIANCES| Prod_2|
+----------------+--------------------+-------+
only showing top 2 rows

None None
root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Ord_id: string (nullable = true)

+--------+----------+--------------+------+
|Order_ID|Order_Date|Order_Priority|Ord_id|
+--------+----------+--------------+------+
|       3|13-10-2010|           LOW| Ord_1|
|     293|01-10-2012|          HIGH| Ord_2|
+--------+----------+--------------+------+
only showing top 2 rows

None None
root
 |-- Customer_Name: string (nullable = true)
 |-- Provi

## Q2. Join all the Data frames and create a new Data frame called Full_DataFrame in such a way that the new data frame does not contain duplicate columns. (cust_dimen, market_fact, orders_dimen, prod_dimen, shipping_dimen)


In [9]:
# joining market and order
join_type="inner"
Full_DataFrame=order_df.join(market_df,on="ord_id",how=join_type)
# joining product and market df
Full_DataFrame=product_df.join(Full_DataFrame,on="Prod_id",how=join_type)
#joining customer and market
Full_DataFrame=customer_df.join(Full_DataFrame,on="Cust_id",how=join_type)
# joining shipping and market 
Full_DataFrame=shipping_df.join(Full_DataFrame,on=["Order_id","Ship_id"],how=join_type)


Full_DataFrame.show(2,False,True)


Full_DataFrame.printSchema()


print("no of columns",len(Full_DataFrame.columns)) #21

-RECORD 0---------------------------------------------
 Order_id             | 36262                         
 Ship_id              | SHP_7609                      
 Ship_Mode            | REGULAR AIR                   
 Ship_Date            | 28-07-2010                    
 Cust_id              | Cust_1818                     
 Customer_Name        | AARON BERGMAN                 
 Province             | ALBERTA                       
 Region               | WEST                          
 Customer_Segment     | CORPORATE                     
 Prod_id              | Prod_16                       
 Product_Category     | OFFICE SUPPLIES               
 Product_Sub_Category | SCISSORS, RULERS AND TRIMMERS 
 Ord_id               | Ord_5446                      
 Order_Date           | 27-07-2010                    
 Order_Priority       | NOT SPECIFIED                 
 Sales                | 136.81                        
 Discount             | 0.01                          
 Order_Qua

# Q3. Convert the Order_Date and Ship_Date columns type into Date type. And print the schema and show the top 5 records for Order_Date and Ship_Date columns.


In [19]:
from pyspark.sql.functions import *
Full_DataFrame = Full_DataFrame.withColumn("Order_Date", to_date(col("Order_Date"), "dd-MM-yyyy"))
Full_DataFrame = Full_DataFrame.withColumn("Ship_Date", to_date(col("Ship_Date"), "dd-MM-yyyy"))
Full_DataFrame.select("order_Date","ship_date").show(5)
Full_DataFrame.printSchema()


+----------+----------+
|order_Date| ship_date|
+----------+----------+
|2010-07-27|2010-07-28|
|2009-07-07|2009-07-08|
|2010-07-27|2010-07-27|
|2010-11-09|2010-11-11|
|2009-07-01|2009-07-08|
+----------+----------+
only showing top 5 rows

root
 |-- Order_id: integer (nullable = true)
 |-- Ship_id: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Ship_Date: date (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Ord_id: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = tru

## Q4. Find the top 3 customers who have the maximum number of orders.

In [11]:
Full_DataFrame.createOrReplaceTempView("fulltable")
spark.sql("""SELECT Cust_id,Customer_Name, COUNT(DISTINCT Ord_id) AS num_orders
FROM fulltable
GROUP BY Cust_id,Customer_name
ORDER BY num_orders DESC
LIMIT 3""").show()

+---------+-----------------+----------+
|  Cust_id|    Customer_Name|num_orders|
+---------+-----------------+----------+
|Cust_1140|    PATRICK JONES|        17|
| Cust_576|MICHAEL DOMINGUEZ|        16|
| Cust_999|    SALLY HUGHSBY|        15|
+---------+-----------------+----------+



## Q5. Create a new column DaysTakenForDelivery that contains the date difference between Order_Date and Ship_Date.


In [12]:
Full_DataFrame=Full_DataFrame.withColumn("DaysTakenForDelivery",datediff(col("ship_Date"),col("order_Date")))

Full_DataFrame.select("ord_id",'order_id',"ship_date","order_date","DaysTakenForDelivery").show(10)

+--------+--------+----------+----------+--------------------+
|  ord_id|order_id| ship_date|order_date|DaysTakenForDelivery|
+--------+--------+----------+----------+--------------------+
|Ord_5446|   36262|2010-07-28|2010-07-27|                   1|
|Ord_5406|   20513|2009-07-08|2009-07-07|                   1|
|Ord_5446|   36262|2010-07-27|2010-07-27|                   0|
|Ord_5456|   39682|2010-11-11|2010-11-09|                   2|
|Ord_5485|   54019|2009-07-08|2009-07-01|                   7|
|Ord_5446|   36262|2010-07-28|2010-07-27|                   1|
|  Ord_31|    4132|2011-05-30|2011-05-28|                   2|
|Ord_4725|   46662|2011-12-31|2011-12-29|                   2|
|Ord_4725|   46662|2011-12-31|2011-12-29|                   2|
|Ord_4725|   46662|2011-12-31|2011-12-29|                   2|
+--------+--------+----------+----------+--------------------+
only showing top 10 rows



# Q6. Find the customer whose order took the maximum time to get delivered.

In [13]:
Full_DataFrame.orderBy("DaysTakenForDelivery",ascending=False).select("Customer_Name","Ord_id","DaysTakenForDelivery").show(1)

+-------------+--------+--------------------+
|Customer_Name|  Ord_id|DaysTakenForDelivery|
+-------------+--------+--------------------+
|  DEAN PERCER|Ord_4335|                  92|
+-------------+--------+--------------------+
only showing top 1 row



## Q7. Using the windows function, retrieve total sales made by each product from the data.


In [14]:
from pyspark.sql.window import Window
window_specification= Window.partitionBy("Prod_id")
Full_DataFrame=Full_DataFrame.withColumn("total_sales",sum(col('Sales')).over(window_specification))
Full_DataFrame.select("prod_id","total_Sales").distinct().show()

+-------+------------------+
|prod_id|       total_Sales|
+-------+------------------+
| Prod_4| 1889313.801999998|
| Prod_1|1070182.6000000006|
| Prod_7|15006.630000000001|
| Prod_8| 795875.9399999998|
|Prod_13|167107.21999999997|
|Prod_14|1130361.2999999996|
|Prod_16| 80996.30999999997|
| Prod_2| 736991.5399999997|
|Prod_11|1896008.1420000014|
|Prod_10| 822652.0400000003|
|Prod_17|2168697.1400000006|
| Prod_5| 698093.8100000003|
|Prod_15|        1761836.55|
| Prod_3|1022957.5900000007|
|Prod_12| 38981.55000000002|
| Prod_6| 446452.8599999995|
| Prod_9|174085.80000000008|
+-------+------------------+



## Q8. Using the windows function retrieve the total profit made from each product from the data and also do without the windows function using pyspark data frame.


In [15]:

Full_DataFrame=Full_DataFrame.withColumn('total_profit',sum(col("profit")).over(window_specification))
Full_DataFrame.select("prod_id","total_profit").distinct().show(5)

+-------+------------------+
|prod_id|      total_profit|
+-------+------------------+
| Prod_4| 316951.6200000003|
| Prod_1| 6664.149999999999|
| Prod_7|-102.6700000000001|
| Prod_8| 94287.48000000001|
|Prod_13| 7564.780000000003|
+-------+------------------+
only showing top 5 rows



In [16]:
# using normal pyspark
Full_DataFrame.groupBy("prod_id").agg(sum(col("profit")).alias("total_profit")).show()

+-------+-------------------+
|prod_id|       total_profit|
+-------+-------------------+
| Prod_4| 316951.62000000005|
| Prod_1| 6664.1500000000015|
| Prod_7|-102.66999999999996|
| Prod_8|  94287.47999999998|
|Prod_13|            7564.78|
|Prod_14| 167361.48999999996|
|Prod_16|           -7799.25|
| Prod_2|           97158.06|
|Prod_11| -99062.49999999999|
|Prod_10|-33582.130000000005|
|Prod_17|          307712.93|
| Prod_5|          100427.93|
|Prod_15| 149649.72999999998|
| Prod_3|          307413.39|
|Prod_12| 13677.170000000002|
| Prod_6|  45263.19999999999|
| Prod_9|  48182.59999999999|
+-------+-------------------+



## Q9. Count the total number of unique customers in January and how many of them came back every month over the entire year in 2011.


In [17]:

# filter out 2011 and january month from full dataframe
df_jan_2011=Full_DataFrame.filter((year('Order_Date') == 2011) & (month('Order_Date') == 1))

# Total number of unique customer in january
unique_cust_jan=df_jan_2011.select('Cust_id').distinct().count()



every_month_cust=Full_DataFrame.filter((year('Order_Date') == 2011)).groupBy("cust_id")\
.agg(count(month("order_date")).alias("monthly_parchased"))
customer_come_backs=every_month_cust.filter
every_month_cust.orderBy("monthly_parchased",ascending=False).show()


print("unique customer in jan is",unique_cust_jan)

#no unique customers



+---------+-----------------+
|  cust_id|monthly_parchased|
+---------+-----------------+
|Cust_1337|                9|
| Cust_572|                9|
| Cust_478|                8|
|Cust_1799|                8|
| Cust_501|                8|
|Cust_1682|                8|
| Cust_525|                8|
| Cust_458|                8|
| Cust_487|                7|
| Cust_595|                7|
|  Cust_65|                7|
|Cust_1675|                7|
|Cust_1742|                7|
|Cust_1334|                7|
|  Cust_68|                6|
| Cust_214|                6|
| Cust_138|                6|
|Cust_1338|                6|
| Cust_939|                6|
|Cust_1063|                6|
+---------+-----------------+
only showing top 20 rows

unique customer in jan is 99


## Q10. Calculate the total quantity purchased, discount received by the customer, and calculate the total sales sold and profit earned from each customer. Order the data frame on Total_profit in descending order.


In [18]:

Full_DataFrame.groupBy('cust_id','customer_name')\
.agg(sum(col('order_quantity')).alias('total_quantity'),\
sum(col('discount')).alias('total_discount'),\
sum(col('sales')).alias('total_sales'),\
sum(col('profit')).alias('total_profit'))\
.orderBy('Total_Profit',ascending=False).show()

+---------+------------------+--------------+-------------------+------------------+------------------+
|  cust_id|     customer_name|total_quantity|     total_discount|       total_sales|      total_profit|
+---------+------------------+--------------+-------------------+------------------+------------------+
|Cust_1151|        EMILY PHAN|           129|                0.4|         97011.194|          28663.71|
|  Cust_63|     GRANT CARROLL|           242|0.29000000000000004|        54368.9085|20877.440000000002|
|Cust_1571|    KAREN CARLISLE|           112|0.22000000000000003|         47445.573|          19439.52|
|Cust_1421|   LIZ MACKENDRICK|           434|               0.71|         67285.132|          18960.63|
| Cust_937|    JOHN STEVENSON|           130|               0.33| 48660.87300000001|          18849.93|
|Cust_1799|      RAYMOND BOOK|           567|               0.97| 70426.59000000001|18760.589999999997|
| Cust_934|  LOGAN HAUSHALTER|           316| 0.6700000000000002