In [1]:
sc.stop()

In [2]:
spark.stop()

### a) Create a new Spark Session with new SparkConfig

In [3]:

from pyspark import SparkConf, SparkContext
config = SparkConf().setMaster("local[4]").setAppName("PySpark Assignment")
sc = SparkContext(conf=config)

### b) Create new instance of Spark SQL session and define new DataFrame using sales_data_sample.csv dataset.

In [4]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AssignmentSession").getOrCreate()

In [5]:
spark

In [6]:
sales_data = spark.read.csv('file:///home/hadoop/Downloads/sales_data_sample.csv', 
                            header=True,
                            inferSchema=True)

### c) Find the shape of DataFrame.

In [7]:
print("Number of rows:", sales_data.count())
print("Number of cols:", len(sales_data.columns))

Number of rows: 2823
Number of cols: 25


### d) Find the Summary of DataFrame for all numerical data columns.

In [8]:
sales_data.schema.fields

[StructField(ORDERNUMBER,IntegerType,true),
 StructField(QUANTITYORDERED,IntegerType,true),
 StructField(PRICEEACH,DoubleType,true),
 StructField(ORDERLINENUMBER,IntegerType,true),
 StructField(SALES,DoubleType,true),
 StructField(ORDERDATE,StringType,true),
 StructField(STATUS,StringType,true),
 StructField(QTR_ID,IntegerType,true),
 StructField(MONTH_ID,IntegerType,true),
 StructField(YEAR_ID,IntegerType,true),
 StructField(PRODUCTLINE,StringType,true),
 StructField(MSRP,IntegerType,true),
 StructField(PRODUCTCODE,StringType,true),
 StructField(CUSTOMERNAME,StringType,true),
 StructField(PHONE,StringType,true),
 StructField(ADDRESSLINE1,StringType,true),
 StructField(ADDRESSLINE2,StringType,true),
 StructField(CITY,StringType,true),
 StructField(STATE,StringType,true),
 StructField(POSTALCODE,StringType,true),
 StructField(COUNTRY,StringType,true),
 StructField(TERRITORY,StringType,true),
 StructField(CONTACTLASTNAME,StringType,true),
 StructField(CONTACTFIRSTNAME,StringType,true),
 

In [9]:
from pyspark.sql.types import IntegerType, LongType, ShortType, ByteType, FloatType, DoubleType

numerical_types = (IntegerType, LongType, ShortType, ByteType, FloatType, DoubleType)

numerical_columns = [field.name for field in sales_data.schema.fields if isinstance(field.dataType, tuple(numerical_types))]

numerical_columns

['ORDERNUMBER',
 'QUANTITYORDERED',
 'PRICEEACH',
 'ORDERLINENUMBER',
 'SALES',
 'QTR_ID',
 'MONTH_ID',
 'YEAR_ID',
 'MSRP']

In [10]:
sales_data.select(numerical_columns).describe().show()

+-------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|       ORDERNUMBER|  QUANTITYORDERED|         PRICEEACH|  ORDERLINENUMBER|             SALES|            QTR_ID|          MONTH_ID|           YEAR_ID|              MSRP|
+-------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|              2823|             2823|              2823|             2823|              2823|              2823|              2823|              2823|              2823|
|   mean|10258.725115125753|35.09280906836698| 83.65854410201929|6.466170740347148|  3553.88907190932|2.7176762309599716|7.0924548352816155|2003.8150903294368|100.71555083244775|
| stddev|  92.0854775957196| 9.74144273706958|20.174276527840536| 4.22584096469094|1841.8651057401842| 1.

### e) Identify and handle missing or null values in the columns.

In [11]:
from pyspark.sql.functions import col, sum

null_counts = sales_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in sales_data.columns])
null_counts.show()

+-----------+---------------+---------+---------------+-----+---------+------+------+--------+-------+-----------+----+-----------+------------+-----+------------+------------+----+-----+----------+-------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|SALES|ORDERDATE|STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|CUSTOMERNAME|PHONE|ADDRESSLINE1|ADDRESSLINE2|CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-----+---------+------+------+--------+-------+-----------+----+-----------+------------+-----+------------+------------+----+-----+----------+-------+---------+---------------+----------------+--------+
|          0|              0|        0|              0|    0|        0|     0|     0|       0|      0|          0|   0|          0|           0|    0|           0|        2521|   0| 1486|        76|      0|        0|       

In [12]:
#ADDRESSLINE2
#STATE
#POSTALCODE
sales_data.select(['ADDRESSLINE2']).show()

+------------+
|ADDRESSLINE2|
+------------+
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
|     Level 3|
|   Suite 101|
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
|        null|
+------------+
only showing top 20 rows



In [13]:
cleaned_sales_data = sales_data.fillna({'ADDRESSLINE2': 'NA', 'STATE': 'NA', 'POSTALCODE': 'NA'})

In [14]:
cleaned_sales_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in sales_data.columns]).show()

+-----------+---------------+---------+---------------+-----+---------+------+------+--------+-------+-----------+----+-----------+------------+-----+------------+------------+----+-----+----------+-------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|SALES|ORDERDATE|STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|CUSTOMERNAME|PHONE|ADDRESSLINE1|ADDRESSLINE2|CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-----+---------+------+------+--------+-------+-----------+----+-----------+------------+-----+------------+------------+----+-----+----------+-------+---------+---------------+----------------+--------+
|          0|              0|        0|              0|    0|        0|     0|     0|       0|      0|          0|   0|          0|           0|    0|           0|           0|   0|    0|         0|      0|        0|       

### f) Calculate the total revenue generated per country by combining the columns QUANTITYORDERED and PRICEEACH using Spark DataFrame operations?

In [15]:
cleaned_sales_data = cleaned_sales_data.withColumn("TOTAL_REVENUE", col("QUANTITYORDERED") * col("PRICEEACH"))
cleaned_sales_data.show()

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+------------------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|     TOTAL_REVENUE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+------------------+
|      10107|   

In [16]:
cleaned_sales_data.groupBy('COUNTRY').agg(sum('TOTAL_REVENUE').alias('TOTAL_REVENUE'))\
.orderBy('TOTAL_REVENUE', ascending=False).show()


+-----------+------------------+
|    COUNTRY|     TOTAL_REVENUE|
+-----------+------------------+
|        USA|2986425.2099999995|
|      Spain|1021705.9700000002|
|     France| 919257.8499999997|
|  Australia|521598.45999999985|
|         UK|413203.33999999997|
|      Italy| 309402.8699999999|
|    Finland|268714.70000000007|
|     Norway| 246115.8000000001|
|  Singapore| 227985.5000000001|
|     Canada|193504.34000000003|
|    Denmark|         192747.63|
|    Germany|         178689.08|
|     Sweden|174264.10000000006|
|    Austria|172793.05000000002|
|      Japan|153076.68999999994|
|    Belgium|          94528.88|
|Switzerland| 93344.90999999999|
|Philippines| 80291.16999999998|
|    Ireland|          43237.24|
+-----------+------------------+



### g) Determine the top 5 products with the highest total sales revenue using Spark DataFrame?

In [17]:
cleaned_sales_data.groupBy('PRODUCTCODE').agg(sum('TOTAL_REVENUE').alias('TOTAL_REVENUE'))\
.orderBy('TOTAL_REVENUE', ascending=False).show(5)


+-----------+-------------+
|PRODUCTCODE|TOTAL_REVENUE|
+-----------+-------------+
|   S18_3232|    176026.63|
|   S24_3856|    103489.89|
|   S18_4600|     101835.0|
|   S24_2300|      99600.0|
|   S18_2238|      96300.0|
+-----------+-------------+
only showing top 5 rows



### h) Find the average order quantity for each product using groupBy and agg operations?

In [18]:
from pyspark.sql.functions import avg
cleaned_sales_data.groupBy('PRODUCTCODE').agg(avg('QUANTITYORDERED').alias('AVG_QTY')).show()

+-----------+------------------+
|PRODUCTCODE|           AVG_QTY|
+-----------+------------------+
|   S18_4600| 38.18518518518518|
|   S18_1749| 36.45454545454545|
|   S12_3891| 35.42307692307692|
|   S18_2248| 33.77272727272727|
|  S700_1138| 34.69230769230769|
|   S32_1268|32.333333333333336|
|   S12_1099|             33.52|
|   S18_2795|30.346153846153847|
|   S24_1937|             33.76|
|   S32_3522| 35.44444444444444|
|   S18_1097| 35.67857142857143|
|   S18_1662| 36.15384615384615|
|   S12_1666|34.714285714285715|
|   S24_3969| 33.86363636363637|
|   S24_1578| 35.80769230769231|
|   S24_4048| 32.46153846153846|
|   S18_3320| 34.96153846153846|
|   S24_3816| 33.46153846153846|
|   S18_3136|32.333333333333336|
|   S32_2509|34.107142857142854|
+-----------+------------------+
only showing top 20 rows



### i) Using Spark DataFrame, filter orders where the SALES value exceeds 10,000 and sort the results by the ORDERDATE column?

In [19]:
from pyspark.sql.functions import col, to_timestamp

cleaned_sales_data.withColumn('ORDERDATE', to_timestamp(col('ORDERDATE'), 'M/d/yyyy H:mm'))\
.filter((col('SALES') > 10000))\
.orderBy('ORDERDATE')\
.select(['PRODUCTCODE', 'SALES', 'ORDERDATE']).show()

+-----------+-------+-------------------+
|PRODUCTCODE|  SALES|          ORDERDATE|
+-----------+-------+-------------------+
|   S12_1108|11279.2|2003-06-03 00:00:00|
|   S10_1949|10993.5|2003-09-19 00:00:00|
|   S12_1108|10606.2|2004-05-05 00:00:00|
|   S10_1949|10172.7|2004-10-11 00:00:00|
|   S10_1949|11623.7|2004-10-21 00:00:00|
|   S18_2325|12536.5|2004-11-04 00:00:00|
|   S18_3320|11336.7|2004-11-18 00:00:00|
|   S24_3151|10758.0|2004-11-23 00:00:00|
|   S24_4278|10039.6|2005-02-03 00:00:00|
|  S700_1691|10066.6|2005-03-03 00:00:00|
|   S10_4698|11886.6|2005-04-08 00:00:00|
|   S24_3856|11739.7|2005-04-14 00:00:00|
|   S18_3685|10468.9|2005-04-15 00:00:00|
|   S18_1749|14082.8|2005-04-22 00:00:00|
|   S18_3232|11887.8|2005-05-03 00:00:00|
|   S10_1949|12001.0|2005-05-31 00:00:00|
+-----------+-------+-------------------+



### j) Filter out rows where the STATUS is &#39;Cancelled&#39; and calculate the total sales from the remaining orders?

In [20]:
cleaned_sales_data\
.filter(col('STATUS') != 'Cancelled')\
.agg(sum('SALES').alias('TOTAL_SALES'))\
.show()

+-----------------+
|      TOTAL_SALES|
+-----------------+
|9838141.370000018|
+-----------------+



### k) Use Spark Data Frame transformations to derive the yearly sales for each customer (CUSTOMERNAME) based on the ORDERDATE column?

In [21]:
cleaned_sales_data.select(['CUSTOMERNAME']).distinct().orderBy('CUSTOMERNAME').show()

+--------------------+
|        CUSTOMERNAME|
+--------------------+
|      AV Stores, Co.|
|        Alpha Cognac|
|  Amica Models & Co.|
|Anna's Decoration...|
|   Atelier graphique|
|Australian Collec...|
|Australian Collec...|
|Australian Gift N...|
|  Auto Assoc. & Cie.|
|    Auto Canal Petit|
|Auto-Moto Classic...|
|  Baane Mini Imports|
|Bavarian Collecta...|
|Blauer See Auto, Co.|
|   Boards & Toys Co.|
|         CAF Imports|
|Cambridge Collect...|
|Canadian Gift Exc...|
|Classic Gift Idea...|
|Classic Legends Inc.|
+--------------------+
only showing top 20 rows



In [22]:
from pyspark.sql.functions import to_date, year, to_timestamp

yearly_data = cleaned_sales_data.withColumn("YEAR", year(to_timestamp(col("ORDERDATE"), 'M/d/yyyy H:mm')))

yearly_data.groupBy(['CUSTOMERNAME', 'YEAR'])\
.agg(sum('SALES').alias('TOTAL_SALES'))\
.orderBy(['CUSTOMERNAME','YEAR']).show()

+--------------------+----+------------------+
|        CUSTOMERNAME|YEAR|       TOTAL_SALES|
+--------------------+----+------------------+
|      AV Stores, Co.|2003| 51017.91999999999|
|      AV Stores, Co.|2004|         106789.89|
|        Alpha Cognac|2003| 55349.31999999999|
|        Alpha Cognac|2005|15139.119999999999|
|  Amica Models & Co.|2004| 94117.26000000002|
|Anna's Decoration...|2003| 88983.70999999999|
|Anna's Decoration...|2005|          65012.42|
|   Atelier graphique|2003|           16560.3|
|   Atelier graphique|2004|           7619.66|
|Australian Collec...|2003|          37878.55|
|Australian Collec...|2004|          12334.82|
|Australian Collec...|2005|          14378.09|
|Australian Collec...|2003|60135.840000000004|
|Australian Collec...|2004|140859.56999999998|
|Australian Gift N...|2003|37739.090000000004|
|Australian Gift N...|2005|          21730.03|
|  Auto Assoc. & Cie.|2004| 64834.32000000001|
|    Auto Canal Petit|2004| 79103.85999999999|
|    Auto Can

### l) Add a new column to the DataFrame that categorizes orders as High, Medium, or Low sales based on the SALES value?

In [23]:
from pyspark.sql.functions import col, when

percentile_33, percentile_67 = cleaned_sales_data.approxQuantile("SALES", [0.33, 0.67], 0.01)

sales_data_with_category = cleaned_sales_data.withColumn(
    "CATEGORY",
    when(col("SALES") > percentile_67, "High")\
    .when(col("SALES") > percentile_33, "Medium")
    .otherwise("Low")
)


sales_data_with_category.select(['SALES', 'CATEGORY']).show(100)

+-------+--------+
|  SALES|CATEGORY|
+-------+--------+
| 2871.0|  Medium|
| 2765.9|  Medium|
|3884.34|  Medium|
| 3746.7|  Medium|
|5205.27|    High|
|3479.76|  Medium|
|2497.77|  Medium|
|5512.32|    High|
|2168.54|     Low|
|4708.44|    High|
|3965.66|    High|
|2333.12|     Low|
|3188.64|  Medium|
|3676.76|  Medium|
|4177.35|    High|
|4099.68|    High|
|2597.39|  Medium|
|4394.38|    High|
|4358.04|    High|
|4396.14|    High|
|7737.93|    High|
| 1451.0|     Low|
| 733.11|     Low|
|3207.12|  Medium|
|2434.56|     Low|
|7516.08|    High|
|5404.62|    High|
|7209.11|    High|
|7329.06|    High|
| 7374.1|    High|
|10993.5|    High|
|4860.24|    High|
|8014.82|    High|
|5372.57|    High|
|7290.36|    High|
|9064.89|    High|
| 6075.3|    High|
|6463.23|    High|
|6120.34|    High|
|7680.64|    High|
|4905.39|    High|
|8014.82|    High|
|7136.19|    High|
|10172.7|    High|
|11623.7|    High|
| 6000.4|    High|
| 3003.0|  Medium|
| 3944.7|    High|
|5691.84|    High|
|4514.92|   

### m) Assume , If you have another DataFrame with customer demographic data, how would you perform a join to compute the total sales per demographic group?

In [24]:

demographic_data = [
    ("Toms Spezialitten", "Germany", 480000, "Europe"),
    ("Oulu Toy Supplies", "Finland", 55000, "Europe"),
    ("Petit Auto", "Belgium", 160000, "Europe"),
    ("Corporate Gift Ideas Co.", "USA", 331000, "North America"),
    ("Cambridge Collectables Co.", "USA", 310000, "North America"),
    ]


demographic_df = spark.createDataFrame(demographic_data, ["CUSTOMERNAME", "COUNTRY", "INCOME", "REGION"])

joined_df = cleaned_sales_data.join(demographic_df, on=["CUSTOMERNAME",'COUNTRY'], how="inner")

sales_by_country = joined_df.groupBy("COUNTRY","CUSTOMERNAME",'INCOME', 'REGION')\
.agg(sum("SALES").alias("TOTAL_SALES"))


sales_by_country.show()

+-------+--------------------+------+-------------+------------------+
|COUNTRY|        CUSTOMERNAME|INCOME|       REGION|       TOTAL_SALES|
+-------+--------------------+------+-------------+------------------+
|    USA|Cambridge Collect...|310000|North America|          36163.62|
|    USA|Corporate Gift Id...|331000|North America|149882.49999999997|
|Belgium|          Petit Auto|160000|       Europe| 74972.51999999999|
+-------+--------------------+------+-------------+------------------+



### n) Can you implement a cumulative distribution function (CDF) over the SALES value for each CUSTOMERNAME? What insights can you gather from analyzing the CDF distribution for each customer?

In [25]:
cleaned_sales_data.printSchema

<bound method DataFrame.printSchema of DataFrame[ORDERNUMBER: int, QUANTITYORDERED: int, PRICEEACH: double, ORDERLINENUMBER: int, SALES: double, ORDERDATE: string, STATUS: string, QTR_ID: int, MONTH_ID: int, YEAR_ID: int, PRODUCTLINE: string, MSRP: int, PRODUCTCODE: string, CUSTOMERNAME: string, PHONE: string, ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, STATE: string, POSTALCODE: string, COUNTRY: string, TERRITORY: string, CONTACTLASTNAME: string, CONTACTFIRSTNAME: string, DEALSIZE: string, TOTAL_REVENUE: double]>

In [26]:
from pyspark.sql.functions import col, count, rank
from pyspark.sql import Window
from pyspark.sql.functions import cume_dist
import numpy as np

window_w = Window.partitionBy("CUSTOMERNAME").orderBy("SALES")

cdf_df = cleaned_sales_data.withColumn(
    "CDF",
    cume_dist().over(window_w)
)

cdf_df.select(["CUSTOMERNAME", "SALES", "CDF"]).show()

+-------------------+-------+-------------------+
|       CUSTOMERNAME|  SALES|                CDF|
+-------------------+-------+-------------------+
|Suominen Souveniers| 891.03|0.03333333333333333|
|Suominen Souveniers| 1086.6|0.06666666666666667|
|Suominen Souveniers|1103.76|                0.1|
|Suominen Souveniers|1629.04|0.13333333333333333|
|Suominen Souveniers| 1988.4|0.16666666666666666|
|Suominen Souveniers|2140.11|                0.2|
|Suominen Souveniers|2447.76|0.23333333333333334|
|Suominen Souveniers|2632.89|0.26666666666666666|
|Suominen Souveniers| 2773.8|                0.3|
|Suominen Souveniers|2775.08| 0.3333333333333333|
|Suominen Souveniers|2817.87|0.36666666666666664|
|Suominen Souveniers|2851.84|                0.4|
|Suominen Souveniers|2931.98|0.43333333333333335|
|Suominen Souveniers|3128.65| 0.4666666666666667|
|Suominen Souveniers|3288.82|                0.5|
|Suominen Souveniers|3595.62| 0.5333333333333333|
|Suominen Souveniers|3686.54| 0.5666666666666667|


50 percentage of Suominen Souveniers sales is below 3288.82

20% of customers have total upto 2000, indicating low spending customers\
median value is around 4000, half of the customers have total sales below 4000\
after 8000 there are fewer high spending customers


### o) Write spark dataframe code to rank products by total revenue within each country (COUNTRY)?

In [27]:
from pyspark.sql.functions import dense_rank, desc
from pyspark.sql.window import Window

cleaned_sales_data\
.groupBy(['PRODUCTCODE', 'COUNTRY']).agg(sum("TOTAL_REVENUE").alias("TOTAL_REVENUE"))\
.withColumn('RANK', dense_rank().over(Window.partitionBy('COUNTRY').orderBy(desc('TOTAL_REVENUE'))))\
.select(['COUNTRY', 'PRODUCTCODE','RANK', 'TOTAL_REVENUE']).show()

+-------+-----------+----+------------------+
|COUNTRY|PRODUCTCODE|RANK|     TOTAL_REVENUE|
+-------+-----------+----+------------------+
| Sweden|   S18_4600|   1|            9700.0|
| Sweden|   S24_2300|   2|            9000.0|
| Sweden|   S24_2011|   3|            7400.0|
| Sweden|   S18_2949|   4|            7000.0|
| Sweden|   S10_1949|   5|            6600.0|
| Sweden|   S12_1099|   6|           5675.04|
| Sweden|   S10_4962|   7|            5600.0|
| Sweden|  S700_1138|   8| 5579.620000000001|
| Sweden|   S12_3990|   9|           5319.32|
| Sweden|   S12_3380|  10|            5309.5|
| Sweden|   S24_3151|  11| 5113.049999999999|
| Sweden|   S12_4675|  12|            4700.0|
| Sweden|   S18_2319|  13|            4600.0|
| Sweden|   S24_1578|  14|            4500.0|
| Sweden|   S10_4757|  15|            4400.0|
| Sweden|   S18_4522|  16|            4300.5|
| Sweden|   S18_1662|  17|            4300.0|
| Sweden|   S24_3816|  18|4276.9400000000005|
| Sweden|   S18_1097|  19|        

### p) Calculate a running total of SALES for each customer and show the top 5 customers by this cumulative total?

In [28]:
w = Window.partitionBy('CUSTOMERNAME')\
  .orderBy('SALES')\
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

max_sales_df = cleaned_sales_data\
.withColumn('RUNNINGTOTAL', sum('SALES').over(w))\
.select(['CUSTOMERNAME', 'SALES', 'RUNNINGTOTAL']).show()


+-------------------+-------+------------------+
|       CUSTOMERNAME|  SALES|      RUNNINGTOTAL|
+-------------------+-------+------------------+
|Suominen Souveniers| 891.03|            891.03|
|Suominen Souveniers| 1086.6|1977.6299999999999|
|Suominen Souveniers|1103.76|           3081.39|
|Suominen Souveniers|1629.04|           4710.43|
|Suominen Souveniers| 1988.4|           6698.83|
|Suominen Souveniers|2140.11|           8838.94|
|Suominen Souveniers|2447.76|           11286.7|
|Suominen Souveniers|2632.89|          13919.59|
|Suominen Souveniers| 2773.8|          16693.39|
|Suominen Souveniers|2775.08|          19468.47|
|Suominen Souveniers|2817.87|          22286.34|
|Suominen Souveniers|2851.84|          25138.18|
|Suominen Souveniers|2931.98|          28070.16|
|Suominen Souveniers|3128.65|          31198.81|
|Suominen Souveniers|3288.82|34487.630000000005|
|Suominen Souveniers|3595.62| 38083.25000000001|
|Suominen Souveniers|3686.54| 41769.79000000001|
|Suominen Souveniers

### q) Find and handle Invalid and Outliers values in entire DataFrame. [Check for only continuous dataset].

In [29]:
cleaned_sales_data.printSchema

<bound method DataFrame.printSchema of DataFrame[ORDERNUMBER: int, QUANTITYORDERED: int, PRICEEACH: double, ORDERLINENUMBER: int, SALES: double, ORDERDATE: string, STATUS: string, QTR_ID: int, MONTH_ID: int, YEAR_ID: int, PRODUCTLINE: string, MSRP: int, PRODUCTCODE: string, CUSTOMERNAME: string, PHONE: string, ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, STATE: string, POSTALCODE: string, COUNTRY: string, TERRITORY: string, CONTACTLASTNAME: string, CONTACTFIRSTNAME: string, DEALSIZE: string, TOTAL_REVENUE: double]>

In [30]:
def capp_outliers(df, column):
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.0001)
    Q1, Q3 = quantiles[0], quantiles[1]

    IQR = Q3 - Q1

    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    count = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)).count()
    print(f"{column}: {count}")
    capped_sales_data = df.withColumn(
        column,
        when(col(column) < lower_bound, lower_bound)\
        .when(col(column) > upper_bound, upper_bound)\
        .otherwise(col(column))
    )
    
    return capped_sales_data

capped_data = capp_outliers(cleaned_sales_data, "SALES")
capped_data = capp_outliers(capped_data, "MSRP")
capped_data = capp_outliers(capped_data, "QUANTITYORDERED")
capped_data = capp_outliers(capped_data, "PRICEEACH")

capped_data.show()

SALES: 81
MSRP: 28
QUANTITYORDERED: 8
PRICEEACH: 0
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+------------------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|     TOTAL_REVENUE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+-----------

### r) How would you cache a DataFrame containing sales data from the top 10 countries by sales to avoid recomputation in subsequent transformations? What persistence level (e.g. MEMORY_ONLY, MEMORY_AND_DISK) would you choose and why?

In [31]:
from pyspark.storagelevel import StorageLevel

top_10_countries_df = cleaned_sales_data \
    .groupBy("COUNTRY").agg(sum("SALES").alias("TOTAL_SALES")) \
    .orderBy(col("TOTAL_SALES").desc()) \
    .limit(10) \
    .select(["COUNTRY", "TOTAL_SALES"])

cached_df = cleaned_sales_data.join(top_10_countries_df, on="COUNTRY", how="inner")
cached_df.persist(StorageLevel.MEMORY_ONLY)

cached_df.count()


2421

- MEMORY ONLY Caches the data in memory(RAM) only
- MEMORY_AND_DISK caches data in memory and disk
- Since top 10 data is small memory only is sufficient

### s) How would you pivot the data to show PRODUCTLINE as columns and the total SALES for each ORDERDATE as the values? What are the implications of pivoting large datasets in Spark?

In [32]:
from pyspark.sql.functions import avg, round

pivoted_df = cleaned_sales_data.groupBy("ORDERDATE").pivot("PRODUCTLINE").agg(round(avg("SALES"), 2))
pivoted_df = pivoted_df.fillna(0)
pivoted_df.show()

+--------------+------------+-----------+-------+-------+-------+----------------+------------+
|     ORDERDATE|Classic Cars|Motorcycles| Planes|  Ships| Trains|Trucks and Buses|Vintage Cars|
+--------------+------------+-----------+-------+-------+-------+----------------+------------+
|3/29/2004 0:00|         0.0|        0.0|    0.0|2466.86|    0.0|             0.0|      3788.4|
|5/30/2005 0:00|         0.0|        0.0|    0.0|    0.0|    0.0|             0.0|     2082.68|
|3/19/2004 0:00|     7665.35|        0.0|    0.0|    0.0|    0.0|             0.0|         0.0|
| 9/7/2004 0:00|         0.0|        0.0|    0.0|    0.0|    0.0|             0.0|     3836.69|
| 5/4/2004 0:00|     5113.62|        0.0|    0.0|    0.0|    0.0|         3787.66|     2694.15|
|11/9/2004 0:00|         0.0|        0.0|    0.0|3336.65|3807.68|             0.0|     3221.78|
|11/4/2003 0:00|     3441.18|    3146.19|    0.0|    0.0|    0.0|             0.0|         0.0|
| 7/1/2003 0:00|         0.0|     3660.7

if the data is very large pivot data may take huge amount of memory and time due to shuffling of data. 
If the dataset is large and the data is not evenly distributed, it may cause incosistancies in data

### t) How would you calculate the percentage growth of total sales month over month for each PRODUCTLINE using Spark DataFrame?

In [33]:
from pyspark.sql.functions import sum, lag, col, coalesce, lit

monthly_sales = cleaned_sales_data.groupBy("PRODUCTLINE", "YEAR_ID", "MONTH_ID") \
                                   .agg(sum("SALES").alias("TOTAL_SALES"))

window= Window.partitionBy("PRODUCTLINE").orderBy("YEAR_ID", "MONTH_ID")

monthly_sales_with_prev = monthly_sales.withColumn("PREV_TOTAL_SALES", lag("TOTAL_SALES").over(window))

monthly_sales_with_growth = monthly_sales_with_prev.withColumn(
    "PERCENTAGE_GROWTH",
    (col("TOTAL_SALES") - coalesce(col("PREV_TOTAL_SALES"), lit(0))) /
    coalesce(col("PREV_TOTAL_SALES"), lit(0)) * 100
)

monthly_sales_with_growth.show()

+-----------+-------+--------+------------------+------------------+-------------------+
|PRODUCTLINE|YEAR_ID|MONTH_ID|       TOTAL_SALES|  PREV_TOTAL_SALES|  PERCENTAGE_GROWTH|
+-----------+-------+--------+------------------+------------------+-------------------+
|Motorcycles|   2003|       2|25783.760000000002|              null|               null|
|Motorcycles|   2003|       3|          12639.15|25783.760000000002| -50.98019063162239|
|Motorcycles|   2003|       4|23475.590000000004|          12639.15|   85.7370946622202|
|Motorcycles|   2003|       5|          22097.32|23475.590000000004|-5.8710771486467594|
|Motorcycles|   2003|       6|           2642.01|          22097.32| -88.04375372217082|
|Motorcycles|   2003|       7| 37924.23000000001|           2642.01|  1335.430978686682|
|Motorcycles|   2003|       8|44164.909999999996| 37924.23000000001| 16.455653812878953|
|Motorcycles|   2003|       9|           3155.58|44164.909999999996| -92.85500638402749|
|Motorcycles|   2003|

### u) How can you rebalance the data by portioning based on the COUNTRY column to ensure that large data partitions are avoided?

In [34]:
cleaned_sales_data.rdd.getNumPartitions()

1

In [35]:
cleaned_sales_data.select("COUNTRY").distinct().count()

19

In [36]:
# Partition by each country
cleaned_sales_data.repartition(19,"COUNTRY").rdd.getNumPartitions()


19

### v) Suppose you have a smaller lookup table with customer details. How would you perform a broadcast join with the large sales_data_sample dataset to improve join performance? What are the key considerations when using broadcast joins?

In [37]:
from pyspark.sql.functions import broadcast

customer_details = spark.createDataFrame([
    (1, "Land of Toys Inc.", "landoftoys@example.com"),
    (2, "Reims Collectables", "reims.co@example.com")
], ["customer_id", "customer_name", "customer_email"])

broadcast_df = broadcast(customer_details)

cleaned_sales_data = cleaned_sales_data.cache()

joined_df = cleaned_sales_data.join(
    broadcast_df,
    cleaned_sales_data["CUSTOMERNAME"] == customer_details["customer_name"],
    "inner"
)

joined_df.select(["ORDERNUMBER", "CUSTOMERNAME", "customer_email"]).show()

+-----------+------------------+--------------------+
|ORDERNUMBER|      CUSTOMERNAME|      customer_email|
+-----------+------------------+--------------------+
|      10107| Land of Toys Inc.|landoftoys@exampl...|
|      10121|Reims Collectables|reims.co@example.com|
|      10329| Land of Toys Inc.|landoftoys@exampl...|
|      10107| Land of Toys Inc.|landoftoys@exampl...|
|      10329| Land of Toys Inc.|landoftoys@exampl...|
|      10107| Land of Toys Inc.|landoftoys@exampl...|
|      10329| Land of Toys Inc.|landoftoys@exampl...|
|      10248| Land of Toys Inc.|landoftoys@exampl...|
|      10359|Reims Collectables|reims.co@example.com|
|      10329| Land of Toys Inc.|landoftoys@exampl...|
|      10359|Reims Collectables|reims.co@example.com|
|      10107| Land of Toys Inc.|landoftoys@exampl...|
|      10121|Reims Collectables|reims.co@example.com|
|      10329| Land of Toys Inc.|landoftoys@exampl...|
|      10329| Land of Toys Inc.|landoftoys@exampl...|
|      10359|Reims Collectab

- data is send to all worker nodes, reduced number of shuffling
- considerations:
    - when one data in significantly smaller than the other one, broadcast join will be ideal
    - small data will be distributed among all worker nodes

### w) Create a UDF that categorizes the sales values (SALES) into custom buckets like “Low”, “Medium”, “High”. Apply this UDF to the DataFrame and calculate the count of orders in each category per COUNTRY.

In [38]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def categorize_sales(sales_amount):
    if sales_amount > percentile_67:
        return "High"
    elif sales_amount > percentile_33:
        return "Medium"
    else:
        return "Low"
    
categorize_sales_udf = udf(categorize_sales, StringType())

In [39]:
sales_data_with_category = cleaned_sales_data.withColumn(
    "SALES_CATEGORY", categorize_sales_udf(col("SALES"))
)

sales_data_with_category.select(['SALES','SALES_CATEGORY']).show()

+-------+--------------+
|  SALES|SALES_CATEGORY|
+-------+--------------+
| 2871.0|        Medium|
| 2765.9|        Medium|
|3884.34|        Medium|
| 3746.7|        Medium|
|5205.27|          High|
|3479.76|        Medium|
|2497.77|        Medium|
|5512.32|          High|
|2168.54|           Low|
|4708.44|          High|
|3965.66|          High|
|2333.12|           Low|
|3188.64|        Medium|
|3676.76|        Medium|
|4177.35|          High|
|4099.68|          High|
|2597.39|        Medium|
|4394.38|          High|
|4358.04|          High|
|4396.14|          High|
+-------+--------------+
only showing top 20 rows



In [40]:
sales_counts_per_country = sales_data_with_category.groupBy("COUNTRY", "SALES_CATEGORY") \
    .count() \
    .orderBy("COUNTRY", "SALES_CATEGORY")

sales_counts_per_country.show()

+---------+--------------+-----+
|  COUNTRY|SALES_CATEGORY|count|
+---------+--------------+-----+
|Australia|          High|   60|
|Australia|           Low|   66|
|Australia|        Medium|   59|
|  Austria|          High|   20|
|  Austria|           Low|   15|
|  Austria|        Medium|   20|
|  Belgium|          High|   11|
|  Belgium|           Low|   14|
|  Belgium|        Medium|    8|
|   Canada|          High|   18|
|   Canada|           Low|   28|
|   Canada|        Medium|   24|
|  Denmark|          High|   23|
|  Denmark|           Low|   18|
|  Denmark|        Medium|   22|
|  Finland|          High|   32|
|  Finland|           Low|   24|
|  Finland|        Medium|   36|
|   France|          High|  101|
|   France|           Low|  110|
+---------+--------------+-----+
only showing top 20 rows



### x) Create a Python UDF to calculate discounts for specific product lines. For example, give a 10% discount for Classic Cars and 5% for Motorcycles. Apply this UDF to derive new discounted sales values.

In [41]:
def apply_discount(product_line, sales_amount):
    if product_line == "Classic Cars":
        return sales_amount * 0.90  
    elif product_line == "Motorcycles":
        return sales_amount * 0.95 
    else:
        return sales_amount 

apply_discount_udf = udf(apply_discount, DoubleType())

sales_data_with_discount = cleaned_sales_data.withColumn(
    "DISCOUNTED_SALES", apply_discount_udf(col("PRODUCTLINE"), col("SALES"))
)

sales_data_with_discount.show()

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+------------------+------------------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|     TOTAL_REVENUE|  DISCOUNTED_SALES|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+-------

### y) How would you set up an incremental loading mechanism for orders placed daily based on the ORDERDATE column? How can Spark checkpointing can be used with incremental load to ensure no data loss occurs during failures?

### z) How do you implement a cumulative distribution function (CDF) over the SALES value for each CUSTOMERNAME? What insights can you gather from analyzing the CDF distribution for each customer?

same as n