In [1]:
import findspark

findspark.init("/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark")

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

from datetime import datetime



## 1. Set the schema for all the data sets and load them from different locations using file. 

In [2]:
df_cust_dimen = spark.read.format('csv').option('header','True').option('inferSchema','True').load("/.../cust_dimen.csv")
df_market_fact = spark.read.format('csv').option('header','True').option('inferSchema','True').load("/.../market_fact.csv")
df_orders_dimen = spark.read.format('csv').option('header','True').option('inferSchema','True').load("/.../orders_dimen.csv")
df_prod_dimen = spark.read.format('csv').option('header','True').option('inferSchema','True').load("/.../prod_dimen.csv")
df_shipping_dimen = spark.read.format('csv').option('header','True').option('inferSchema','True').load("/.../shipping_dimen.csv")



## 2. Join all the Data frames and create a new Data frame called Full_DataFrame in such a way that the new data frame does not contain duplicate columns.
(cust_dimen, market_fact, orders_dimen, prod_dimen, shipping_dimen)


In [3]:
Full_DataFrame = df_cust_dimen.join(df_market_fact,["Cust_id"]) \
    .join(df_orders_dimen,["Ord_id"]) \
    .join(df_prod_dimen,["Prod_id"]) \
    .join(df_shipping_dimen,["Order_ID"])

In [4]:
Full_DataFrame.show(5)

+--------+-------+--------+---------+-------------+--------+------+----------------+--------+-------+--------+--------------+------+-------------+-------------------+----------+--------------+----------------+--------------------+-----------+----------+--------+
|Order_ID|Prod_id|  Ord_id|  Cust_id|Customer_Name|Province|Region|Customer_Segment| Ship_id|  Sales|Discount|Order_Quantity|Profit|Shipping_Cost|Product_Base_Margin|Order_Date|Order_Priority|Product_Category|Product_Sub_Category|  Ship_Mode| Ship_Date| Ship_id|
+--------+-------+--------+---------+-------------+--------+------+----------------+--------+-------+--------+--------------+------+-------------+-------------------+----------+--------------+----------------+--------------------+-----------+----------+--------+
|   36262|Prod_16|Ord_5446|Cust_1818|AARON BERGMAN| ALBERTA|  WEST|       CORPORATE|SHP_7609| 136.81|    0.01|            23|-30.51|          3.6|               0.56|27-07-2010| NOT SPECIFIED| OFFICE SUPPLIES|SC

## 3. Convert the Order_Date and Ship_Date columns type into Date type. And print the schema and show the top 5 records for Order_Date and Ship_Date  columns.

In [5]:
#import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import DateType

to_datetype = udf(lambda x: datetime.strptime(x, '%d-%m-%Y'), DateType())

Full_DataFrame = Full_DataFrame.withColumn('Ship_Date', to_datetype(col('Ship_Date'))) \
                .withColumn('Order_Date', to_datetype(col('Order_Date')))

In [6]:
Full_DataFrame.printSchema()

root
 |-- Order_ID: integer (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Ord_id: string (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Ship_id: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping_Cost: double (nullable = true)
 |-- Product_Base_Margin: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Ship_Date: date (nullable = true)
 |-- Ship_id: string (nullable = true)



In [7]:
Full_DataFrame["Ship_Date","Order_Date"].show(5)

+----------+----------+
| Ship_Date|Order_Date|
+----------+----------+
|2010-07-27|2010-07-27|
|2010-07-28|2010-07-27|
|2010-07-28|2010-07-27|
|2009-07-08|2009-07-07|
|2010-07-27|2010-07-27|
+----------+----------+
only showing top 5 rows



## 4. Find the top 3 customers who have the maximum number of orders

In [8]:
Full_DataFrame.createOrReplaceTempView("order_data")


In [9]:
spark.sql("select * from order_data").show(10)

+--------+-------+--------+---------+-------------+--------+------+----------------+--------+-------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+--------+
|Order_ID|Prod_id|  Ord_id|  Cust_id|Customer_Name|Province|Region|Customer_Segment| Ship_id|  Sales|Discount|Order_Quantity| Profit|Shipping_Cost|Product_Base_Margin|Order_Date|Order_Priority|Product_Category|Product_Sub_Category|     Ship_Mode| Ship_Date| Ship_id|
+--------+-------+--------+---------+-------------+--------+------+----------------+--------+-------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+--------+
|   36262|Prod_16|Ord_5446|Cust_1818|AARON BERGMAN| ALBERTA|  WEST|       CORPORATE|SHP_7609| 136.81|    0.01|            23| -30.51|          3.6|               0.56|2010-07-27| NOT SPECIFIED| OFFIC

In [10]:
spark.sql("select Cust_id,Customer_Name, count(Order_ID) as `Order_Count` from order_data group by Cust_id,Customer_Name order by Order_Count desc").show(3)


+---------+--------------+-----------+
|  Cust_id| Customer_Name|Order_Count|
+---------+--------------+-----------+
|Cust_1140| PATRICK JONES|         62|
| Cust_572|LENA CREIGHTON|         52|
| Cust_188| JUSTIN KNIGHT|         47|
+---------+--------------+-----------+
only showing top 3 rows



## 5. Create a new column DaysTakenForDelivery that contains the date difference between Order_Date and Ship_Date. 

In [11]:
Full_DataFrame = Full_DataFrame.withColumn("DaysTakenForDelivery",datediff('Ship_Date','Order_Date'))

In [12]:
Full_DataFrame["DaysTakenForDelivery","Cust_id"].show(5)

+--------------------+---------+
|DaysTakenForDelivery|  Cust_id|
+--------------------+---------+
|                   0|Cust_1818|
|                   1|Cust_1818|
|                   1|Cust_1818|
|                   1|Cust_1818|
|                   0|Cust_1818|
+--------------------+---------+
only showing top 5 rows



## 6. Find the customer whose order took the maximum time to get delivered

In [13]:
Full_DataFrame.createOrReplaceTempView("ship_data")


In [14]:
spark.sql("select Cust_id,Customer_Name, DaysTakenForDelivery from ship_data order by DaysTakenForDelivery desc limit 1").show()


+---------+-------------+--------------------+
|  Cust_id|Customer_Name|DaysTakenForDelivery|
+---------+-------------+--------------------+
|Cust_1460|  DEAN PERCER|                  92|
+---------+-------------+--------------------+



## 7. Using the windows function, retrieve total sales made by each product from the data. 

In [15]:
from pyspark.sql.window import Window

windowPartition  = Window.partitionBy("Product_Sub_Category")


Full_DataFrame.withColumn("Total_Sales",sum(col("Sales")).over(windowPartition)).show()


+--------+-------+--------+---------+----------------+--------------------+--------------------+----------------+--------+------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+--------+--------------------+------------------+
|Order_ID|Prod_id|  Ord_id|  Cust_id|   Customer_Name|            Province|              Region|Customer_Segment| Ship_id| Sales|Discount|Order_Quantity| Profit|Shipping_Cost|Product_Base_Margin|Order_Date|Order_Priority|Product_Category|Product_Sub_Category|     Ship_Mode| Ship_Date| Ship_id|DaysTakenForDelivery|       Total_Sales|
+--------+-------+--------+---------+----------------+--------------------+--------------------+----------------+--------+------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+--------+--------------------+---------------

## 8.  Using the windows function retrieve the total profit made from each product from the data and also do without the windows function using pyspark data frame. 

In [16]:
Full_DataFrame.withColumn("Total_Profit",sum(col("Profit")).over(windowPartition)).show()

+--------+-------+--------+---------+----------------+--------------------+--------------------+----------------+--------+------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+--------+--------------------+------------------+
|Order_ID|Prod_id|  Ord_id|  Cust_id|   Customer_Name|            Province|              Region|Customer_Segment| Ship_id| Sales|Discount|Order_Quantity| Profit|Shipping_Cost|Product_Base_Margin|Order_Date|Order_Priority|Product_Category|Product_Sub_Category|     Ship_Mode| Ship_Date| Ship_id|DaysTakenForDelivery|      Total_Profit|
+--------+-------+--------+---------+----------------+--------------------+--------------------+----------------+--------+------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+--------+--------------------+---------------

In [17]:
Full_DataFrame.groupBy("Product_Sub_Category").agg(round(sum("Profit"),2).alias("Total_Profit")).show()


+--------------------+------------+
|Product_Sub_Category|Total_Profit|
+--------------------+------------+
|              LABELS|    21502.61|
|        RUBBER BANDS|       59.35|
|     COPIERS AND FAX|    374265.5|
| PENS & ART SUPPLIES|    14484.48|
|          APPLIANCES|   171173.79|
|     OFFICE MACHINES|   489496.99|
|  CHAIRS & CHAIRMATS|   295724.99|
|STORAGE & ORGANIZ...|   -26756.57|
|SCISSORS, RULERS ...|   -10862.49|
|           BOOKCASES|   -76670.38|
|               PAPER|    80857.66|
|              TABLES|  -159324.18|
|  OFFICE FURNISHINGS|   188356.49|
|BINDERS AND BINDE...|   492766.44|
|COMPUTER PERIPHERALS|   153821.73|
|TELEPHONES AND CO...|   527413.12|
|           ENVELOPES|     75764.4|
+--------------------+------------+



## 9. Count the total number of unique customers in January and how many of them came back every month over the entire year in 2011. 

In [18]:
Jan_Cust_Unq = Full_DataFrame.select(['Cust_id']).where((month('Order_Date')=='01')).distinct()

Jan_Cust_Unq.count()

412

In [19]:
Repeat_Cust = Full_DataFrame.select(['Cust_id']).where((year('Order_Date')=='2011') & (Full_DataFrame['Cust_id'].isin(Jan_Cust_Unq['Cust_id']) == True)).distinct()

In [20]:
Repeat_Cust.count()

961

## 10. Calculate the total quantity purchased, discount received by the customer, and calculate the total sales sold and profit earned from each customer. Order the data frame on Total_profit in descending order. 

In [21]:
spark.sql("select Cust_id, Customer_Name, sum(Order_Quantity) as `Total Quantity Purchased`,round(sum(Discount),2) as `Total Discount`, round(sum(Sales),2) as `Total Sales`, round(sum(Profit),2) as `Total_Profit` from ship_data group by Cust_id,Customer_Name order by Total_Profit desc ").show()

+---------+-----------------+------------------------+--------------+-----------+------------+
|  Cust_id|    Customer_Name|Total Quantity Purchased|Total Discount|Total Sales|Total_Profit|
+---------+-----------------+------------------------+--------------+-----------+------------+
|Cust_1151|       EMILY PHAN|                     182|          0.48|  192708.91|     57508.1|
|Cust_1307|      ANDY REITER|                     317|          0.96|  107288.48|    45292.26|
| Cust_934| LOGAN HAUSHALTER|                     592|           1.3|  105246.32|    41093.72|
| Cust_725|DEBORAH BRUMFIELD|                     454|          0.12|  116280.59|    39982.44|
| Cust_937|   JOHN STEVENSON|                     242|          0.52|   96926.38|    37766.56|
|Cust_1763|      RICK WILSON|                     300|          0.54|   80437.41|    37658.25|
|Cust_1799|     RAYMOND BOOK|                    1062|          1.64|  136939.63|    37267.98|
| Cust_424| LYCORIS SAUNDERS|                     