In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
# initializing spark session
spark = SparkSession(sc) 

# Q1: Set the schema for all the data sets and load them from different locations using file structured streaming. 

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType, LongType, IntegerType, DateType


In [46]:
# Define schemas for each data set
cust_dimen_schema = StructType([
    StructField("Customer_Name", StringType()),
    StructField("Province", StringType()),
    StructField("Region", StringType()),
    StructField("Customer_Segment", StringType()),
    StructField("Cust_id", StringType())
])


market_fact_schema = StructType([
    StructField("Ord_id", StringType()),
    StructField("Prod_id", StringType()),
    StructField("Ship_id", StringType()),
    StructField("Cust_id", StringType()),
    StructField("Sales", DoubleType()),
    StructField("Discount", DoubleType()),
    StructField("Order_Quantity",IntegerType()),
    StructField("Profit", DoubleType()),
    StructField("Shipping_Cost", DoubleType()),
    StructField("Product_Base_Margin", DoubleType())
    
])


orders_dimen_schema = StructType([
    StructField("Order_ID", StringType()),
    StructField("Order_Date", DateType()),
    StructField("Order_Priority", StringType()),
    StructField("Ord_id", StringType())
])


prod_dimen_schema = StructType([
    StructField("Product_Category", StringType()),
    StructField("Product_Sub_Category", StringType()),
    StructField("Prod_id", StringType())
])

shipping_dimen_schema = StructType([
    StructField("Order_ID", StringType()),
    StructField("Ship_Mode", StringType()),
    StructField("Ship_Date", DateType()),
    StructField("Ship_id", StringType())
])


In [79]:
# Load data using file structured streaming
#= spark.readStream \
#    .schema() \
 #   .csv("/ user/ samarthpu42wgre/ ")
cust_dimen_df = spark.read.format("csv").option("header", "true").schema(cust_dimen_schema).load("cust_dimen.csv")
market_fact_df = spark.read.format("csv").option("header", "true").schema(market_fact_schema).load("market_fact.csv")
orders_dimen_df = spark.read.format("csv").option("header", "true").option("dateFormat","dd-mm-yyyy").schema(orders_dimen_schema).load("orders_dimen.csv")
prod_dimen_df = spark.read.format("csv").option("header", "true").schema(prod_dimen_schema).load("prod_dimen.csv")
shipping_dimen_df = spark.read.format("csv").option("header", "true").option("dateFormat","dd-mm-yyyy").schema(shipping_dimen_schema).load("shipping_dimen.csv")


In [80]:
cust_dimen_df.show(2)

+------------------+--------+-------+----------------+-------+
|     Customer_Name|Province| Region|Customer_Segment|Cust_id|
+------------------+--------+-------+----------------+-------+
|MUHAMMED MACINTYRE| NUNAVUT|NUNAVUT|  SMALL BUSINESS| Cust_1|
|      BARRY FRENCH| NUNAVUT|NUNAVUT|        CONSUMER| Cust_2|
+------------------+--------+-------+----------------+-------+
only showing top 2 rows



In [81]:
cust_dimen_df.select("*").show(4, False)

+------------------+--------+-------+----------------+-------+
|Customer_Name     |Province|Region |Customer_Segment|Cust_id|
+------------------+--------+-------+----------------+-------+
|MUHAMMED MACINTYRE|NUNAVUT |NUNAVUT|SMALL BUSINESS  |Cust_1 |
|BARRY FRENCH      |NUNAVUT |NUNAVUT|CONSUMER        |Cust_2 |
|CLAY ROZENDAL     |NUNAVUT |NUNAVUT|CORPORATE       |Cust_3 |
|CARLOS SOLTERO    |NUNAVUT |NUNAVUT|CONSUMER        |Cust_4 |
+------------------+--------+-------+----------------+-------+
only showing top 4 rows



In [82]:
market_fact_df.show(2)


+--------+-------+--------+---------+------+--------+--------------+------+-------------+-------------------+
|  Ord_id|Prod_id| Ship_id|  Cust_id| Sales|Discount|Order_Quantity|Profit|Shipping_Cost|Product_Base_Margin|
+--------+-------+--------+---------+------+--------+--------------+------+-------------+-------------------+
|Ord_5446|Prod_16|SHP_7609|Cust_1818|136.81|    0.01|            23|-30.51|          3.6|               0.56|
|Ord_5406|Prod_13|SHP_7549|Cust_1818| 42.27|    0.01|            13|  4.56|         0.93|               0.54|
+--------+-------+--------+---------+------+--------+--------------+------+-------------+-------------------+
only showing top 2 rows



In [83]:
orders_dimen_df.show(2)


+--------+----------+--------------+------+
|Order_ID|Order_Date|Order_Priority|Ord_id|
+--------+----------+--------------+------+
|       3|2010-01-13|           LOW| Ord_1|
|     293|2012-01-01|          HIGH| Ord_2|
+--------+----------+--------------+------+
only showing top 2 rows



In [84]:
prod_dimen_df.show(2)


+----------------+--------------------+-------+
|Product_Category|Product_Sub_Category|Prod_id|
+----------------+--------------------+-------+
| OFFICE SUPPLIES|STORAGE & ORGANIZ...| Prod_1|
| OFFICE SUPPLIES|          APPLIANCES| Prod_2|
+----------------+--------------------+-------+
only showing top 2 rows



In [85]:
shipping_dimen_df.show(2)


+--------+--------------+----------+-------+
|Order_ID|     Ship_Mode| Ship_Date|Ship_id|
+--------+--------------+----------+-------+
|       3|   REGULAR AIR|2010-01-20|  SHP_1|
|     293|DELIVERY TRUCK|2012-01-02|  SHP_2|
+--------+--------------+----------+-------+
only showing top 2 rows



# Q2. Join all the Data frames and create a new Data frame called Full_DataFrame in such a way that the new data frame does not contain duplicate columns.
(cust_dimen, market_fact, orders_dimen, prod_dimen, shipping_dimen)

In [86]:
# Rename duplicate columns
market_fact_df = market_fact_df.withColumnRenamed("Cust_id", "market_customer_id")
market_fact_df = market_fact_df.withColumnRenamed("Prod_id", "market_product_id")
market_fact_df = market_fact_df.withColumnRenamed("Ship_id", "market_ship_id")
market_fact_df = market_fact_df.withColumnRenamed("Ord_id", "market_ord_id")
orders_dimen_df = orders_dimen_df.withColumnRenamed("Order_id", "orders_order_id")


In [87]:
Full_DataFrame = market_fact_df.join(cust_dimen_df, cust_dimen_df.Cust_id == market_fact_df.market_customer_id, "inner") \
    .join(orders_dimen_df, market_fact_df.market_ord_id == orders_dimen_df.Ord_id, "inner") \
    .join(prod_dimen_df, market_fact_df.market_product_id == prod_dimen_df.Prod_id, "inner") \
    .join(shipping_dimen_df, market_fact_df.market_ship_id == shipping_dimen_df.Ship_id, "inner")

In [88]:
# Drop duplicate columns
Full_DataFrame = Full_DataFrame.drop("market_customer_id", "market_product_id", "market_ship_id", "market_ord_id","orders_order_id")


In [89]:
Full_DataFrame.printSchema()



root
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping_Cost: double (nullable = true)
 |-- Product_Base_Margin: double (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Ord_id: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Order_ID: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Ship_Date: date (nullable = true)
 |-- Ship_id: string (nullable = true)



In [90]:
Full_DataFrame.show(3, False)

+-------+--------+--------------+------+-------------+-------------------+-------------+--------+------+----------------+---------+----------+--------------+--------+----------------+-----------------------------+-------+--------+-----------+----------+--------+
|Sales  |Discount|Order_Quantity|Profit|Shipping_Cost|Product_Base_Margin|Customer_Name|Province|Region|Customer_Segment|Cust_id  |Order_Date|Order_Priority|Ord_id  |Product_Category|Product_Sub_Category         |Prod_id|Order_ID|Ship_Mode  |Ship_Date |Ship_id |
+-------+--------+--------------+------+-------------+-------------------+-------------+--------+------+----------------+---------+----------+--------------+--------+----------------+-----------------------------+-------+--------+-----------+----------+--------+
|136.81 |0.01    |23            |-30.51|3.6          |0.56               |AARON BERGMAN|ALBERTA |WEST  |CORPORATE       |Cust_1818|2010-01-27|NOT SPECIFIED |Ord_5446|OFFICE SUPPLIES |SCISSORS, RULERS AND TRIMMER

# Q3. Convert the Order_Date and Ship_Date columns type into Date type. And print the schema and show the top 5 records for Order_Date and Ship_Date columns.

In [91]:
# Convert Order_Date and Ship_Date columns to DateType
Full_DataFrame = Full_DataFrame.withColumn("Order_Date",Full_DataFrame["Order_Date"].cast(DateType()))
Full_DataFrame = Full_DataFrame.withColumn("Ship_Date",Full_DataFrame["Ship_Date"].cast(DateType()))


In [92]:
print("Schema after converting Order_Date and Ship_Date columns:")
Full_DataFrame.printSchema()

Schema after converting Order_Date and Ship_Date columns:
root
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping_Cost: double (nullable = true)
 |-- Product_Base_Margin: double (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Ord_id: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Order_ID: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Ship_Date: date (nullable = true)
 |-- Ship_id: string (nullable = true)



In [93]:
# Show top 5 records for Order_Date and Ship_Date columns
print("Top 5 records for Order_Date and Ship_Date columns:")
Full_DataFrame.select("Order_Date", "Ship_Date").show(5)

Top 5 records for Order_Date and Ship_Date columns:
+----------+----------+
|Order_Date| Ship_Date|
+----------+----------+
|2010-01-27|2010-01-28|
|2009-01-07|2009-01-08|
|2010-01-27|2010-01-27|
|2010-01-09|2010-01-11|
|2009-01-01|2009-01-08|
+----------+----------+
only showing top 5 rows



# 4. Find the top 3 customers who have the maximum number of orders. 

In [94]:
# selecting only a few columns
from pyspark.sql.functions import col, column
import pyspark.sql.functions as F

In [95]:
Full_DataFrame.groupBy(F.col("Cust_id")).agg(F.countDistinct("Ord_id").alias("Count")).orderBy(F.desc("Count")).show(3,False)

+---------+-----+
|Cust_id  |Count|
+---------+-----+
|Cust_1140|17   |
|Cust_576 |16   |
|Cust_999 |15   |
+---------+-----+
only showing top 3 rows



# 5. Create a new column DaysTakenForDelivery that contains the date difference between Order_Date and Ship_Date.

In [102]:
Full_DataFrame =  Full_DataFrame.withColumn("DaysTakenForDelivery",F.datediff(Full_DataFrame["Ship_Date"],Full_DataFrame["Order_Date"]))

In [103]:
Full_DataFrame.show(2)

+------+--------+--------------+------+-------------+-------------------+-------------+--------+------+----------------+---------+----------+--------------+--------+----------------+--------------------+-------+--------+-----------+----------+--------+--------------------+
| Sales|Discount|Order_Quantity|Profit|Shipping_Cost|Product_Base_Margin|Customer_Name|Province|Region|Customer_Segment|  Cust_id|Order_Date|Order_Priority|  Ord_id|Product_Category|Product_Sub_Category|Prod_id|Order_ID|  Ship_Mode| Ship_Date| Ship_id|DaysTakenForDelivery|
+------+--------+--------------+------+-------------+-------------------+-------------+--------+------+----------------+---------+----------+--------------+--------+----------------+--------------------+-------+--------+-----------+----------+--------+--------------------+
|136.81|    0.01|            23|-30.51|          3.6|               0.56|AARON BERGMAN| ALBERTA|  WEST|       CORPORATE|Cust_1818|2010-01-27| NOT SPECIFIED|Ord_5446| OFFICE SUPPL

# 6. Find the customer whose order took the maximum time to get delivered. 

In [109]:
Full_DataFrame.groupBy(F.col("Cust_id")).agg(F.max("DaysTakenForDelivery").alias("Max_daysTaken")).orderBy(F.desc("Max_daysTaken")).show(1)

+--------+-------------+
| Cust_id|Max_daysTaken|
+--------+-------------+
|Cust_814|          365|
+--------+-------------+
only showing top 1 row



# 7. Using the windows function, retrieve total sales made by each product from the data.

In [110]:
from pyspark.sql import Window

# Define a window specification over the product_id
window_spec = Window.partitionBy("Prod_id")

# Calculate the total sales made by each product
total_sales_by_product = Full_DataFrame.withColumn("total_sales",
                                                    F.sum("Sales").over(window_spec))

# Select only the required columns
total_sales_by_product = total_sales_by_product.select("Prod_id", "total_sales").distinct()

# Show the total sales made by each product
total_sales_by_product.show()

+-------+------------------+
|Prod_id|       total_sales|
+-------+------------------+
| Prod_4| 1889313.801999998|
| Prod_1|1070182.6000000006|
| Prod_7|15006.630000000001|
| Prod_8| 795875.9399999998|
|Prod_13|167107.21999999997|
|Prod_14|1130361.2999999996|
|Prod_16| 80996.30999999997|
| Prod_2| 736991.5399999997|
|Prod_11|1896008.1420000014|
|Prod_10| 822652.0400000003|
|Prod_17|2168697.1400000006|
| Prod_5| 698093.8100000003|
|Prod_15|        1761836.55|
| Prod_3|1022957.5900000007|
|Prod_12| 38981.55000000002|
| Prod_6| 446452.8599999995|
| Prod_9|174085.80000000008|
+-------+------------------+



# 8. Using the windows function retrieve the total profit made from each product from the data and also do without the windows function using pyspark data frame

In [111]:
from pyspark.sql import Window

# Define a window specification over the product_id
window_spec = Window.partitionBy("Prod_id")

# Calculate the total profit made by each product
total_profit_by_product = Full_DataFrame.withColumn("total_profit",
                                                    F.sum("Profit").over(window_spec))

# Select only the required columns
total_profit_by_product = total_profit_by_product.select("Prod_id", "total_profit").distinct()

# Show the total profit made from each product
total_profit_by_product.show()

+-------+------------------+
|Prod_id|      total_profit|
+-------+------------------+
| Prod_4| 316951.6200000003|
| Prod_1| 6664.149999999999|
| Prod_7|-102.6700000000001|
| Prod_8| 94287.48000000001|
|Prod_13| 7564.780000000003|
|Prod_14|167361.48999999996|
|Prod_16|-7799.250000000001|
| Prod_2| 97158.05999999988|
|Prod_11|-99062.50000000001|
|Prod_10|-33582.13000000002|
|Prod_17|307712.92999999993|
| Prod_5|         100427.93|
|Prod_15|149649.73000000004|
| Prod_3|307413.38999999996|
|Prod_12| 13677.16999999999|
| Prod_6| 45263.20000000003|
| Prod_9| 48182.60000000004|
+-------+------------------+




# 9. Count the total number of unique customers in January and how many of them came back every month over the entire year in 2011.

In [112]:
Full_DataFrame.show(1)

+------+--------+--------------+------+-------------+-------------------+-------------+--------+------+----------------+---------+----------+--------------+--------+----------------+--------------------+-------+--------+-----------+----------+--------+--------------------+
| Sales|Discount|Order_Quantity|Profit|Shipping_Cost|Product_Base_Margin|Customer_Name|Province|Region|Customer_Segment|  Cust_id|Order_Date|Order_Priority|  Ord_id|Product_Category|Product_Sub_Category|Prod_id|Order_ID|  Ship_Mode| Ship_Date| Ship_id|DaysTakenForDelivery|
+------+--------+--------------+------+-------------+-------------------+-------------+--------+------+----------------+---------+----------+--------------+--------+----------------+--------------------+-------+--------+-----------+----------+--------+--------------------+
|136.81|    0.01|            23|-30.51|          3.6|               0.56|AARON BERGMAN| ALBERTA|  WEST|       CORPORATE|Cust_1818|2010-01-27| NOT SPECIFIED|Ord_5446| OFFICE SUPPL

In [116]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, year, countDistinct

# Assuming the DataFrame containing the data is named 'sales_df'

# Filter data for January 2011
january_data = Full_DataFrame.filter((year("Order_Date") == 2011) & (month("Order_Date") == 1))

# Count the total number of unique customers in January
january_unique_customers_count = january_data.select(countDistinct("Cust_id")).collect()[0][0]

# Create an array to store the number of returning customers for each month
returning_customers_counts = []

# Iterate over each month in 2011
for month_num in range(1, 13):
    # Filter data for the current month
    month_data = Full_DataFrame.filter((year("Order_Date") == 2011) & (month("Order_Date") == month_num))
    
    # Count the total number of unique customers for the current month
    unique_customers_count = month_data.select(countDistinct("Cust_id")).collect()[0][0]
    
    # Filter data for customers who appeared in January
    january_customer_ids = january_data.select("Cust_id").collect()
    returning_customers_count = month_data.filter(month_data["Cust_id"].isin([row.Cust_id for row in january_customer_ids])).select(countDistinct("Cust_id")).collect()[0][0]
    
    returning_customers_counts.append((month_num, returning_customers_count))

# Show the total number of unique customers in January
print("Total number of unique customers in January 2011:", january_unique_customers_count)

# Show how many of them came back every month over the entire year in 2011
for month_num, returning_customers_count in returning_customers_counts:
    print("Month:", month_num, "Returning Customers Count:", returning_customers_count)


Total number of unique customers in January 2011: 961
Month: 1 Returning Customers Count: 961
Month: 2 Returning Customers Count: 0
Month: 3 Returning Customers Count: 0
Month: 4 Returning Customers Count: 0
Month: 5 Returning Customers Count: 0
Month: 6 Returning Customers Count: 0
Month: 7 Returning Customers Count: 0
Month: 8 Returning Customers Count: 0
Month: 9 Returning Customers Count: 0
Month: 10 Returning Customers Count: 0
Month: 11 Returning Customers Count: 0
Month: 12 Returning Customers Count: 0


# 10. Calculate the total quantity purchased, discount received by the customer, and calculate the total sales sold and profit earned from each customer. Order the datacframe on Total_profit in descending order.

In [117]:
customer_profit_df = Full_DataFrame.groupBy("Cust_id").agg(
    F.sum("Order_Quantity").alias("Total_Quantity_Purchased"),
    F.sum("Discount").alias("Total_Discount_Received"),
    F.sum("Sales").alias("Total_Sales_Sold"),
    F.sum("Profit").alias("Total_Profit_Earned")
)

# Order the DataFrame on Total_profit in descending order
customer_profit_df = customer_profit_df.orderBy(F.col("Total_Profit_Earned").desc())

# Show the DataFrame
customer_profit_df.show()


+---------+------------------------+-----------------------+------------------+-------------------+
|  Cust_id|Total_Quantity_Purchased|Total_Discount_Received|  Total_Sales_Sold|Total_Profit_Earned|
+---------+------------------------+-----------------------+------------------+-------------------+
|Cust_1151|                     129|                    0.4|         97011.194|           28663.71|
|  Cust_63|                     242|    0.29000000000000004|        54368.9085| 20877.440000000002|
|Cust_1571|                     112|    0.22000000000000003|         47445.573|           19439.52|
|Cust_1421|                     434|                   0.71|         67285.132|           18960.63|
| Cust_937|                     130|                   0.33| 48660.87300000001|           18849.93|
|Cust_1799|                     567|                   0.97| 70426.59000000001| 18760.589999999997|
| Cust_934|                     316|     0.6700000000000002|45000.488999999994| 17044.219999999998|


In [1]:
pip install nbconvert




In [3]:
jupyter nbconvert --to pdf MyNotebook.ipynb


SyntaxError: invalid syntax (1173105302.py, line 1)

In [4]:
pip install notebook-as-pdf


Collecting notebook-as-pdf
  Downloading notebook_as_pdf-0.5.0-py3-none-any.whl (6.5 kB)
Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl (232 kB)

Installing collected packages: PyPDF2, notebook-as-pdf
Successfully installed PyPDF2-3.0.1 notebook-as-pdf-0.5.0


In [5]:
jupyter-nbconvert-toPDFviaHTML

NameError: name 'jupyter' is not defined

In [6]:
--allow-chromium-download

NameError: name 'allow' is not defined