In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Spark SQL Read From Excel") \
    .getOrCreate()


we can read from excel in two ways 
1. Using spark-excel library where we have to download spark-excel jar -> create a folder named spark_jars inside SPARK_HOME(env variable) path and paste the downloaded jar in this directory.
   then -> add the jar path into spark.executor.extraClassPath using below syntax :
   
   spark-submit --jars myjar.jar \ 
  --driver-class-path myjar.jar \      # adding to driver's class path
  
  
  --conf spark.executor.extraClassPath=myjar.jar \  # adding to executor's class path
  
  
  --class SampleApplication my-application.jar
  
  or with below syntax:
  
  conf = SparkConf().set("spark.jars", "/path-to-jar/jar_name.jar")
    
    
  sc = SparkContext( conf=conf)
  
  or
  
  conf.set("spark.executor.extraClassPath", "C:\\Users\\supri\\AppData\\Roaming\\Python\\Python39\\site-    packages\\pyspark\\jars\\")
  
(In short: Create directory spark_jars in the SPARK_HOME then store the spark-excel package in spark_jars directory -> Add the spark_jars to spark.executor.extraClassPath of Spark session)

2. Using Python pandas read the excel and convert the dataframe to spark dataframe

In [3]:
# Using spark-excel 
# Define the file path and options
file_path = "C:/Supriyaa-spark-notes/Superstore_Sales.xls"
options = {
    "sheetName": "Sheet1",
    "Header": "true",
    "treatEmptyValuesAsNulls": "true",
    "inferSchema": "true"
}

# Load the Excel file into a DataFrame
df = spark.read.format("com.crealytics.spark.excel") \
    .options(**options) \
    .load(file_path)

# Show the DataFrame
df.show()


+------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+
|Row ID|Order ID|Order Date|Order Priority|Order Quantity|            Sales|Discount|     Ship Mode|             Profit|Unit Price|Shipping Cost|     Customer Name|Province| Region|Customer Segment|Product Category|Product Sub-Category|        Product Name|Product Container|Product Base Margin|Ship Date|
+------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+
|   1.0|     3.0|   40464.0|           Low|           6.0|           261.54|    0.

In [20]:
# Using Python pandas
pandas_df = pd.read_excel("C:/Supriyaa-spark-notes/Superstore_Sales.xls")
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

+------+--------+-------------------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+-------------------+
|Row ID|Order ID|         Order Date|Order Priority|Order Quantity|            Sales|Discount|     Ship Mode|             Profit|Unit Price|Shipping Cost|     Customer Name|Province| Region|Customer Segment|Product Category|Product Sub-Category|        Product Name|Product Container|Product Base Margin|          Ship Date|
+------+--------+-------------------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+-------------------+
|     1|       3|2010-10-

In [24]:
df.printSchema()

root
 |-- Row ID: double (nullable = true)
 |-- Order ID: double (nullable = true)
 |-- Order Date: double (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Quantity: double (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Shipping Cost: double (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer Segment: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Product Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Product Container: string (nullable = true)
 |-- Product Base Margin: double (nullable = true)
 |-- Ship Date: double (nullable = true)



In [27]:
df.columns

['Row ID',
 'Order ID',
 'Order Date',
 'Order Priority',
 'Order Quantity',
 'Sales',
 'Discount',
 'Ship Mode',
 'Profit',
 'Unit Price',
 'Shipping Cost',
 'Customer Name',
 'Province',
 'Region',
 'Customer Segment',
 'Product Category',
 'Product Sub-Category',
 'Product Name',
 'Product Container',
 'Product Base Margin',
 'Ship Date']

In [64]:
#pandas_df.describe()
#spark_df.describe()
df.describe()

DataFrame[summary: string, Row ID: string, Order ID: string, Order Date: string, Order Priority: string, Order Quantity: string, Sales: string, Discount: string, Ship Mode: string, Profit: string, Unit Price: string, Shipping Cost: string, Customer Name: string, Province: string, Region: string, Customer Segment: string, Product Category: string, Product Sub-Category: string, Product Name: string, Product Container: string, Product Base Margin: string, Ship Date: string]

In [38]:
from pyspark.sql.types import StringType,IntegerType,StructField,StructType
data_schema = [StructField('row ID',IntegerType(),True),
              StructField('Order Priority',StringType(),True)]
final_struct = StructType(fields=data_schema)
options = {
    "sheetName": "Sheet1",
    "Header": "true",
    "treatEmptyValuesAsNulls": "true",
    "inferSchema": "true"
}
df1 = spark.read.format("com.crealytics.spark.excel").options(**options).load("C:/Supriyaa-spark-notes/Superstore_Sales.xls",schema=final_struct)

In [39]:
df1.printSchema()

root
 |-- row ID: integer (nullable = true)
 |-- Order Priority: string (nullable = true)



In [40]:
df1['Order Priority']

Column<'Order Priority'>

In [41]:
type(df1['Order Priority'])

pyspark.sql.column.Column

In [62]:
df.select('Order Priority').show()

+--------------+
|Order Priority|
+--------------+
|           Low|
|          High|
|          High|
|          High|
| Not Specified|
| Not Specified|
|          High|
|          High|
|          High|
|           Low|
|        Medium|
|        Medium|
| Not Specified|
| Not Specified|
| Not Specified|
|        Medium|
| Not Specified|
|      Critical|
|      Critical|
|           Low|
+--------------+
only showing top 20 rows



In [66]:
df.head(2)   # df.head(2) gives two rows 

[Row(Row ID=1.0, Order ID=3.0, Order Date=40464.0, Order Priority='Low', Order Quantity=6.0, Sales=261.54, Discount=0.04, Ship Mode='Regular Air', Profit=-213.25, Unit Price=38.94, Shipping Cost=35.0, Customer Name='Muhammed MacIntyre', Province='Nunavut', Region='Nunavut', Customer Segment='Small Business', Product Category='Office Supplies', Product Sub-Category='Storage & Organization', Product Name='Eldon Base for stackable storage shelf, platinum', Product Container='Large Box', Product Base Margin=0.8, Ship Date=40471.0),
 Row(Row ID=49.0, Order ID=293.0, Order Date=41183.0, Order Priority='High', Order Quantity=49.0, Sales=10123.02, Discount=0.07, Ship Mode='Delivery Truck', Profit=457.81, Unit Price=208.16, Shipping Cost=68.02, Customer Name='Barry French', Province='Nunavut', Region='Nunavut', Customer Segment='Consumer', Product Category='Office Supplies', Product Sub-Category='Appliances', Product Name='1.7 Cubic Foot Compact "Cube" Office Refrigerators', Product Container='

In [67]:
df.head(2)[0]  #Out of two rows gives 1st row

Row(Row ID=1.0, Order ID=3.0, Order Date=40464.0, Order Priority='Low', Order Quantity=6.0, Sales=261.54, Discount=0.04, Ship Mode='Regular Air', Profit=-213.25, Unit Price=38.94, Shipping Cost=35.0, Customer Name='Muhammed MacIntyre', Province='Nunavut', Region='Nunavut', Customer Segment='Small Business', Product Category='Office Supplies', Product Sub-Category='Storage & Organization', Product Name='Eldon Base for stackable storage shelf, platinum', Product Container='Large Box', Product Base Margin=0.8, Ship Date=40471.0)

In [68]:
type(df.select('Row ID'))

pyspark.sql.dataframe.DataFrame

In [69]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [6]:
df.select(['Row ID','Order Priority']).show()

+------+--------------+
|Row ID|Order Priority|
+------+--------------+
|   1.0|           Low|
|  49.0|          High|
|  50.0|          High|
|  80.0|          High|
|  85.0| Not Specified|
|  86.0| Not Specified|
|  97.0|          High|
|  98.0|          High|
| 103.0|          High|
| 107.0|           Low|
| 127.0|        Medium|
| 128.0|        Medium|
| 134.0| Not Specified|
| 135.0| Not Specified|
| 149.0| Not Specified|
| 160.0|        Medium|
| 161.0| Not Specified|
| 175.0|      Critical|
| 176.0|      Critical|
| 203.0|           Low|
+------+--------------+
only showing top 20 rows



In [5]:
df.withColumn('double_rowid',df['Row ID']*2).show()

+------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+------------+
|Row ID|Order ID|Order Date|Order Priority|Order Quantity|            Sales|Discount|     Ship Mode|             Profit|Unit Price|Shipping Cost|     Customer Name|Province| Region|Customer Segment|Product Category|Product Sub-Category|        Product Name|Product Container|Product Base Margin|Ship Date|double_rowid|
+------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+------------+
|   1.0|     3.0|   40464.0|           Low|

In [7]:
df.select(['Row ID','double_rowid']).show()  # error as the above operation withColumn() was not an inplace operation so column double_rowid is not present in df

AnalysisException: cannot resolve 'double_rowid' given input columns: [Customer Name, Customer Segment, Discount, Order Date, Order ID, Order Priority, Order Quantity, Product Base Margin, Product Category, Product Container, Product Name, Product Sub-Category, Profit, Province, Region, Row ID, Sales, Ship Date, Ship Mode, Shipping Cost, Unit Price];
'Project [Row ID#0, 'double_rowid]
+- Relation [Row ID#0,Order ID#1,Order Date#2,Order Priority#3,Order Quantity#4,Sales#5,Discount#6,Ship Mode#7,Profit#8,Unit Price#9,Shipping Cost#10,Customer Name#11,Province#12,Region#13,Customer Segment#14,Product Category#15,Product Sub-Category#16,Product Name#17,Product Container#18,Product Base Margin#19,Ship Date#20] ExcelRelation(com.crealytics.spark.excel.CellRangeAddressDataLocator@660238c9,true,true,false,true,false,false,None,None,10,com.crealytics.spark.excel.DefaultWorkbookReader@63c44ce3)


In [8]:
df.withColumnRenamed('Row ID','new_rowid').show()

+---------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+
|new_rowid|Order ID|Order Date|Order Priority|Order Quantity|            Sales|Discount|     Ship Mode|             Profit|Unit Price|Shipping Cost|     Customer Name|Province| Region|Customer Segment|Product Category|Product Sub-Category|        Product Name|Product Container|Product Base Margin|Ship Date|
+---------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+
|      1.0|     3.0|   40464.0|           Low|           6.0|           2

In [4]:
df.createOrReplaceTempView('dataset_to_view') #create a view
output = spark.sql("select * from dataset_to_view")
output.show()

+------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+
|Row ID|Order ID|Order Date|Order Priority|Order Quantity|            Sales|Discount|     Ship Mode|             Profit|Unit Price|Shipping Cost|     Customer Name|Province| Region|Customer Segment|Product Category|Product Sub-Category|        Product Name|Product Container|Product Base Margin|Ship Date|
+------+--------+----------+--------------+--------------+-----------------+--------+--------------+-------------------+----------+-------------+------------------+--------+-------+----------------+----------------+--------------------+--------------------+-----------------+-------------------+---------+
|   1.0|     3.0|   40464.0|           Low|           6.0|           261.54|    0.

In [5]:
# logic to convert unix times cast as doubles to datetime format
import datetime
serial = 40464.0
seconds = (serial - 25569) * 86400.0
print(datetime.datetime.utcfromtimestamp(seconds))

2010-10-13 00:00:00


In [20]:
# in spark SQL
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
from pyspark.sql.functions import * 
output = spark.sql("""SELECT from_unixtime
(
 (CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd'
) 
AS my_timestamp,
`Order Date` as order_date 
FROM dataset_to_view
""")
output.show()

+------------+----------+
|my_timestamp|order_date|
+------------+----------+
|  2010-10-13|   40464.0|
|  2012-10-01|   41183.0|
|  2012-10-01|   41183.0|
|  2011-07-10|   40734.0|
|  2010-08-28|   40418.0|
|  2010-08-28|   40418.0|
|  2011-06-17|   40711.0|
|  2011-06-17|   40711.0|
|  2011-03-24|   40626.0|
|  2010-02-26|   40235.0|
|  2010-11-23|   40505.0|
|  2010-11-23|   40505.0|
|  2012-06-08|   41068.0|
|  2012-06-08|   41068.0|
|  2012-08-04|   41125.0|
|  2011-05-30|   40693.0|
|  2009-11-25|   40142.0|
|  2012-02-14|   40953.0|
|  2012-02-14|   40953.0|
|  2012-04-15|   41014.0|
+------------+----------+
only showing top 20 rows



In [28]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY") # used to resolve date format issue
from pyspark.sql.functions import * 
output = spark.sql("""
SELECT to_date(from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400)) AS my_timestamp,
       `Order Date` as order_date 
FROM dataset_to_view
""")
output.show()

+------------+----------+
|my_timestamp|order_date|
+------------+----------+
|  2010-10-13|   40464.0|
|  2012-10-01|   41183.0|
|  2012-10-01|   41183.0|
|  2011-07-10|   40734.0|
|  2010-08-28|   40418.0|
|  2010-08-28|   40418.0|
|  2011-06-17|   40711.0|
|  2011-06-17|   40711.0|
|  2011-03-24|   40626.0|
|  2010-02-26|   40235.0|
|  2010-11-23|   40505.0|
|  2010-11-23|   40505.0|
|  2012-06-08|   41068.0|
|  2012-06-08|   41068.0|
|  2012-08-04|   41125.0|
|  2011-05-30|   40693.0|
|  2009-11-25|   40142.0|
|  2012-02-14|   40953.0|
|  2012-02-14|   40953.0|
|  2012-04-15|   41014.0|
+------------+----------+
only showing top 20 rows



In [141]:
#Query to get count of  "High","low","Medium" and "Not Specified" values in Order Priority column.
output =spark.sql("""SELECT `Order Priority`, COUNT(*)  as count
FROM dataset_to_view
GROUP BY `Order Priority`
                    """)
output.show()

+--------------+-----+
|Order Priority|count|
+--------------+-----+
|          High| 1768|
|           Low| 1720|
|        Medium| 1631|
|      Critical| 1608|
| Not Specified| 1672|
+--------------+-----+



In [5]:
#Query to get count of  "High","low","Medium" and "Not Specified" values in Order Priority column.
output =spark.sql("""select `Order Priority`, count from
(SELECT `Order Priority`, COUNT(`Order Priority`) over (partition by `Order Priority`) as count,
row_number() over (partition by `Order Priority` order by 1) as rn
FROM dataset_to_view) where rn=1
                    """)
output.show()

+--------------+-----+
|Order Priority|count|
+--------------+-----+
|      Critical| 1608|
|          High| 1768|
|           Low| 1720|
|        Medium| 1631|
| Not Specified| 1672|
+--------------+-----+



In [10]:
#Query to display latest 5 records per row_id with respect to Order Date.
output =spark.sql("""select * from (select `Row ID`, order_date from
(SELECT `Row ID`, 
from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd HH:mm:ss') AS order_date,
rank() over 
(partition by `Row ID` 
order by from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd HH:mm:ss') desc) as rn
FROM dataset_to_view) where rn<6) where `Row ID`='14.0'
                    """)
output.show()

+------+-------------------+
|Row ID|         order_date|
+------+-------------------+
|  14.0|2010-12-17 05:30:00|
+------+-------------------+



In [20]:
output =spark.sql("""select `Order ID`,count(`Order ID`)
FROM dataset_to_view
group by `Order ID` having count(`Order ID`)>1
                    """)
output.show()
output =spark.sql("""select `Row ID`,count(`Row ID`)
FROM dataset_to_view
group by `Row ID` having count(`Row ID`)>1
                    """)
output.show()
# We can see that Row ID is having unique values

+--------+---------------+
|Order ID|count(Order ID)|
+--------+---------------+
| 18308.0|              3|
| 58626.0|              3|
| 39846.0|              2|
| 16193.0|              2|
| 24007.0|              3|
| 53511.0|              2|
|  9537.0|              2|
| 24387.0|              2|
| 30658.0|              2|
| 24128.0|              2|
| 24097.0|              3|
| 26370.0|              2|
| 29287.0|              2|
| 31781.0|              2|
| 10435.0|              2|
| 21889.0|              2|
| 12096.0|              2|
| 43109.0|              3|
| 32193.0|              2|
|  1539.0|              2|
+--------+---------------+
only showing top 20 rows

+------+-------------+
|Row ID|count(Row ID)|
+------+-------------+
+------+-------------+



In [24]:
#Query to display latest 5 records per order_id with respect to Order Date.
output =spark.sql("""select * from (select `Row ID`,`Order ID`, order_date from
(SELECT `Order ID`, `Row ID`,
from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd HH:mm:ss') AS order_date,
rank() over 
(partition by `Order ID` 
order by from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd HH:mm:ss') desc) as rn
FROM dataset_to_view) where rn<6) where `Order ID`='24007.0'
                    """)
output.show()

+------+--------+-------------------+
|Row ID|Order ID|         order_date|
+------+--------+-------------------+
|3355.0| 24007.0|2010-08-26 05:30:00|
|3356.0| 24007.0|2010-08-26 05:30:00|
|3357.0| 24007.0|2010-08-26 05:30:00|
+------+--------+-------------------+



In [36]:
#top customers who have made the most orders in the last 11 years.
output =spark.sql("""select `Customer Name`,
from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd HH:mm:ss') AS order_date,
`Order Quantity`
FROM dataset_to_view
where from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd HH:mm:ss') > current_date()-3960
order by `Order Quantity` desc
                    """)
output.show()

+-----------------+-------------------+--------------+
|    Customer Name|         order_date|Order Quantity|
+-----------------+-------------------+--------------+
|       Sam Craven|2012-05-05 05:30:00|          50.0|
|   David Philippe|2012-10-06 05:30:00|          50.0|
| Mitch Willingham|2012-07-13 05:30:00|          50.0|
|Jason Klamczynski|2012-08-10 05:30:00|          50.0|
|   Jeremy Ellison|2012-10-29 05:30:00|          50.0|
|    Harold Pawlan|2012-12-18 05:30:00|          50.0|
|      Meg Tillman|2012-07-15 05:30:00|          50.0|
|       Tony Sayre|2012-12-16 05:30:00|          50.0|
|    Dave Hallsten|2012-12-02 05:30:00|          50.0|
| Natalie Fritzler|2012-05-17 05:30:00|          50.0|
|   Thea Hendricks|2012-05-29 05:30:00|          50.0|
|     Dan Campbell|2012-08-13 05:30:00|          50.0|
|Christine Abelman|2012-09-29 05:30:00|          50.0|
|    Bobby Trafton|2012-07-19 05:30:00|          50.0|
|    Brendan Murry|2012-09-02 05:30:00|          50.0|
|  Laurel 

In [40]:
# As we can see most of them are having same order quantity lets consider Order Priority i.e., whose order priority is Critical-High
output =spark.sql("""select `Customer Name`,
from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd') AS order_date,
`Order Quantity`, `Order Priority`
FROM dataset_to_view
where from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd') > current_date()-3960
order by `Order Quantity` desc, `Order Priority`
                    """)
output.show()

+-----------------+----------+--------------+--------------+
|    Customer Name|order_date|Order Quantity|Order Priority|
+-----------------+----------+--------------+--------------+
|   Jeremy Ellison|2012-10-29|          50.0|      Critical|
|       Tony Sayre|2012-12-16|          50.0|      Critical|
|  Penelope Sewall|2012-09-18|          50.0|      Critical|
|   Thea Hendricks|2012-05-29|          50.0|          High|
|Matthew Grinstein|2012-10-28|          50.0|          High|
|  Laura Armstrong|2012-08-22|          50.0|          High|
| Natalie Fritzler|2012-05-17|          50.0|          High|
|      Meg Tillman|2012-07-15|          50.0|          High|
|    Harold Pawlan|2012-12-18|          50.0|          High|
|    Hilary Holden|2012-08-11|          50.0|          High|
|       Sam Craven|2012-05-05|          50.0|           Low|
|    Brendan Murry|2012-09-02|          50.0|           Low|
|       Tony Sayre|2012-09-02|          50.0|           Low|
|       Roy Skaria|2012-

In [45]:
# Which product has the maximum cost(First Highest cost) : Unit Price + Shipping Cost
output = spark.sql("""
    SELECT `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price` + `Shipping Cost`
    FROM dataset_to_view
    WHERE (`Unit Price` + `Shipping Cost`) = (
        SELECT MAX(`Unit Price` + `Shipping Cost`) 
        FROM dataset_to_view
    )
""")
output.show()

+---------------+----------------+--------------------+--------------------+----------------------------+
|  Customer Name|Product category|Product Sub-Category|        Product Name|(Unit Price + Shipping Cost)|
+---------------+----------------+--------------------+--------------------+----------------------------+
|Jasper Cacioppo|      Technology|     Office Machines|Polycom ViewStati...|                     6807.51|
|     Emily Phan|      Technology|     Office Machines|Polycom ViewStati...|                     6807.51|
| Craig Carreira|      Technology|     Office Machines|Polycom ViewStati...|                     6807.51|
|    Roger Demir|      Technology|     Office Machines|Polycom ViewStati...|                     6807.51|
| Laurel Workman|      Technology|     Office Machines|Polycom ViewStati...|                     6807.51|
|  Adrian Barton|      Technology|     Office Machines|Polycom ViewStati...|                     6807.51|
|       Roy Phan|      Technology|     Office 

In [47]:
# Which product has the 2nd maximum cost(Second Highest cost) : Unit Price + Shipping Cost
output = spark.sql("""
    SELECT `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price` + `Shipping Cost`
    FROM dataset_to_view
    WHERE (`Unit Price` + `Shipping Cost`) = (
        SELECT MAX(`Unit Price` + `Shipping Cost`) 
        FROM dataset_to_view
        where (`Unit Price` + `Shipping Cost`) < (select MAX(`Unit Price` + `Shipping Cost`) from dataset_to_view)
    )
""")
output.show()

+----------------+----------------+--------------------+--------------------+----------------------------+
|   Customer Name|Product category|Product Sub-Category|        Product Name|(Unit Price + Shipping Cost)|
+----------------+----------------+--------------------+--------------------+----------------------------+
|      Erica Bern|      Technology|     Copiers and Fax|Canon imageCLASS ...|          3524.4799999999996|
|    Tony Chapman|      Technology|     Copiers and Fax|Canon imageCLASS ...|          3524.4799999999996|
|  Parhena Norris|      Technology|     Copiers and Fax|Canon imageCLASS ...|          3524.4799999999996|
|     Dennis Kane|      Technology|     Copiers and Fax|Canon imageCLASS ...|          3524.4799999999996|
|    Nathan Mautz|      Technology|     Copiers and Fax|Canon imageCLASS ...|          3524.4799999999996|
|Maxwell Schwartz|      Technology|     Copiers and Fax|Canon imageCLASS ...|          3524.4799999999996|
|     Cyra Reiten|      Technology|  

In [8]:
# Which product has the 3rd maximum cost(Third Highest cost) : Unit Price + Shipping Cost
output = spark.sql("""
    SELECT `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price` + `Shipping Cost`
    FROM dataset_to_view
    WHERE (`Unit Price` + `Shipping Cost`) = 
    (
        SELECT MAX(`Unit Price` + `Shipping Cost`) FROM dataset_to_view 
        where (`Unit Price` + `Shipping Cost`) < 
        (select MAX(`Unit Price` + `Shipping Cost`) from dataset_to_view
        where (`Unit Price` + `Shipping Cost`) < (select MAX(`Unit Price` + `Shipping Cost`) from dataset_to_view)
    )
    )
""")
output.show()

+------------------+----------------+--------------------+--------------------+----------------------------+
|     Customer Name|Product category|Product Sub-Category|        Product Name|(Unit Price + Shipping Cost)|
+------------------+----------------+--------------------+--------------------+----------------------------+
| Tamara Willingham|      Technology|     Office Machines|Okidata Pacemark ...|                     3510.87|
|      Irene Maddox|      Technology|     Office Machines|Okidata Pacemark ...|                     3510.87|
|     Juliana Krohn|      Technology|     Office Machines|Okidata Pacemark ...|                     3510.87|
|Lauren Leatherbury|      Technology|     Office Machines|Okidata Pacemark ...|                     3510.87|
|     Clay Cheatham|      Technology|     Office Machines|Okidata Pacemark ...|                     3510.87|
|    Alan Dominguez|      Technology|     Office Machines|Okidata Pacemark ...|                     3510.87|
+------------------

In [28]:
# Which product has the 3rd maximum cost(Third Highest cost) : Unit Price + Shipping Cost
#Approach using Rank() vs Dense_rank()
output = spark.sql("""
    select `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price` + `Shipping Cost` as Total_Price,
           rn
    from (
    SELECT `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price`,
           `Shipping Cost`,
           rank() over (order by (`Unit Price` + `Shipping Cost`) desc) rn
    FROM dataset_to_view
    ) temp
    where rn<100
""")

output.show()

+------------------+----------------+--------------------+--------------------+------------------+---+
|     Customer Name|Product category|Product Sub-Category|        Product Name|       Total_Price| rn|
+------------------+----------------+--------------------+--------------------+------------------+---+
|   Jasper Cacioppo|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|        Emily Phan|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|    Craig Carreira|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|       Roger Demir|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|    Laurel Workman|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|     Adrian Barton|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|          Roy Phan|      Technology|     Office Machines|Polycom ViewSta

In [30]:
# Which product has the 3rd maximum cost(Third Highest cost) : Unit Price + Shipping Cost
#Approach using Rank() vs Dense_rank()
output = spark.sql("""
    select `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price` + `Shipping Cost` as Total_Price,
           rn
    from (
    SELECT `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price`,
           `Shipping Cost`,
           dense_rank() over (order by (`Unit Price` + `Shipping Cost`) desc) rn
    FROM dataset_to_view
    ) temp
    where rn<100
""")

output.show()

+------------------+----------------+--------------------+--------------------+------------------+---+
|     Customer Name|Product category|Product Sub-Category|        Product Name|       Total_Price| rn|
+------------------+----------------+--------------------+--------------------+------------------+---+
|   Jasper Cacioppo|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|        Emily Phan|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|    Craig Carreira|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|       Roger Demir|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|    Laurel Workman|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|     Adrian Barton|      Technology|     Office Machines|Polycom ViewStati...|           6807.51|  1|
|          Roy Phan|      Technology|     Office Machines|Polycom ViewSta

In [7]:
# Which product has the 3rd maximum cost(Third Highest cost) : Unit Price + Shipping Cost
#Approach using Rank() vs Dense_rank()
output = spark.sql("""
    select `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price` + `Shipping Cost` as Total_Price
           --rn
    from (
    SELECT `Customer Name`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           `Unit Price`,
           `Shipping Cost`,
           dense_rank() over (order by (`Unit Price` + `Shipping Cost`) desc) rn
    FROM dataset_to_view
    ) temp
    where rn=3
""")

output.show()

+------------------+----------------+--------------------+--------------------+-----------+
|     Customer Name|Product category|Product Sub-Category|        Product Name|Total_Price|
+------------------+----------------+--------------------+--------------------+-----------+
| Tamara Willingham|      Technology|     Office Machines|Okidata Pacemark ...|    3510.87|
|      Irene Maddox|      Technology|     Office Machines|Okidata Pacemark ...|    3510.87|
|     Juliana Krohn|      Technology|     Office Machines|Okidata Pacemark ...|    3510.87|
|Lauren Leatherbury|      Technology|     Office Machines|Okidata Pacemark ...|    3510.87|
|     Clay Cheatham|      Technology|     Office Machines|Okidata Pacemark ...|    3510.87|
|    Alan Dominguez|      Technology|     Office Machines|Okidata Pacemark ...|    3510.87|
+------------------+----------------+--------------------+--------------------+-----------+



In [27]:
#calculates the running total of sales within each category, ordered by month
output = spark.sql("""
    select `Sales`,
           `Product category`, 
           sum_sales,
           decode(substr(order_date,6,2),
           '01','Jan',
           '02','Feb',
           '03','Mar',
           '04','Apr',
           '05','May',
           '06','Jun',
           '07','Jul',
           '08','Aug',
           '09','Sep',
           '10','Oct',
           '11','Nov',
           '12','Dec',
           'NA') as month
    from (
    SELECT `Sales`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd') as order_date,
           sum(`Sales`) over (partition by `Product category` 
           order by (substr(from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd'),6,2)) desc) sum_sales
    FROM dataset_to_view
    ) temp
""")

output.show()

+--------+----------------+-----------------+-----+
|   Sales|Product category|        sum_sales|month|
+--------+----------------+-----------------+-----+
|14223.82|       Furniture|566363.4880000002|  Dec|
|  291.66|       Furniture|566363.4880000002|  Dec|
| 6641.14|       Furniture|566363.4880000002|  Dec|
|10469.03|       Furniture|566363.4880000002|  Dec|
| 1280.34|       Furniture|566363.4880000002|  Dec|
|  145.68|       Furniture|566363.4880000002|  Dec|
| 1270.03|       Furniture|566363.4880000002|  Dec|
| 1773.86|       Furniture|566363.4880000002|  Dec|
|  543.22|       Furniture|566363.4880000002|  Dec|
|   813.9|       Furniture|566363.4880000002|  Dec|
| 5016.25|       Furniture|566363.4880000002|  Dec|
|  152.84|       Furniture|566363.4880000002|  Dec|
|  549.92|       Furniture|566363.4880000002|  Dec|
| 5572.92|       Furniture|566363.4880000002|  Dec|
|10338.93|       Furniture|566363.4880000002|  Dec|
|15897.01|       Furniture|566363.4880000002|  Dec|
|   919.0|  

In [33]:
#calculates the total sales for each category across all months
output = spark.sql("""select * from (SELECT `Sales`,
       `Product category`, 
       sum_sales,
       decode(substr(order_date,6,2),
              '01','Jan',
              '02','Feb',
              '03','Mar',
              '04','Apr',
              '05','May',
              '06','Jun',
              '07','Jul',
              '08','Aug',
              '09','Sep',
              '10','Oct',
              '11','Nov',
              '12','Dec',
              'NA') as month
FROM (
    SELECT `Sales`,
           `Product category`, 
           `Product Sub-Category`,
           `Product Name`,
           from_unixtime((CAST(dataset_to_view.`Order Date` AS DOUBLE) - 25569) * 86400,'yyyy-MM-dd') as order_date,
           sum(`Sales`) over (partition by `Product category`) as sum_sales
    FROM dataset_to_view
) )order by sum_sales
""")

output.show()


+--------+----------------+-----------------+-----+
|   Sales|Product category|        sum_sales|month|
+--------+----------------+-----------------+-----+
|  261.54| Office Supplies|3752762.099999994|  Oct|
|  845.32| Office Supplies|3752762.099999994|  Jan|
|10123.02| Office Supplies|3752762.099999994|  Oct|
|  244.57| Office Supplies|3752762.099999994|  Oct|
|  394.27| Office Supplies|3752762.099999994|  Aug|
|   93.54| Office Supplies|3752762.099999994|  Jun|
|  905.08| Office Supplies|3752762.099999994|  Jun|
| 2781.82| Office Supplies|3752762.099999994|  Mar|
|  228.41| Office Supplies|3752762.099999994|  Feb|
|  196.85| Office Supplies|3752762.099999994|  Nov|
|  124.56| Office Supplies|3752762.099999994|  Nov|
|  716.84| Office Supplies|3752762.099999994|  Jun|
|   80.61| Office Supplies|3752762.099999994|  Aug|
|  248.26| Office Supplies|3752762.099999994|  Nov|
|   59.03| Office Supplies|3752762.099999994|  Mar|
|   97.48| Office Supplies|3752762.099999994|  Mar|
|  511.83| O

In [66]:
#To display all duplicate records if any
output = spark.sql('''
select vo.* 
from dataset_to_view vo 
where `Row ID` <> (select max(vi.`Row ID`) from dataset_to_view vi where vo.`Row ID`=vi.`Row ID` )
''')
output.show()

+------+--------+----------+--------------+--------------+-----+--------+---------+------+----------+-------------+-------------+--------+------+----------------+----------------+--------------------+------------+-----------------+-------------------+---------+
|Row ID|Order ID|Order Date|Order Priority|Order Quantity|Sales|Discount|Ship Mode|Profit|Unit Price|Shipping Cost|Customer Name|Province|Region|Customer Segment|Product Category|Product Sub-Category|Product Name|Product Container|Product Base Margin|Ship Date|
+------+--------+----------+--------------+--------------+-----+--------+---------+------+----------+-------------+-------------+--------+------+----------------+----------------+--------------------+------------+-----------------+-------------------+---------+
+------+--------+----------+--------------+--------------+-----+--------+---------+------+----------+-------------+-------------+--------+------+----------------+----------------+--------------------+------------+-

In [16]:
# To get hihgest profit per month
#spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
output = spark.sql('''
select `Product Name`,
order_month,
order_date,
Profit
from
(SELECT 
  `Product Name`,
  substr(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400)),6,2) AS order_month,
  from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400)) as order_date,
  row_number() OVER (partition BY substr(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400)),6,2) order by Profit desc) AS max_profit,
  Profit
FROM dataset_to_view) 
where max_profit=1
''')
output.show()

+--------------------+-----------+-------------------+------------------+
|        Product Name|order_month|         order_date|            Profit|
+--------------------+-----------+-------------------+------------------+
|Polycom ViaVideo™...|         01|2009-01-03 05:30:00|          14440.39|
|Hewlett-Packard c...|         02|2009-02-05 05:30:00|           8734.88|
|Polycom ViewStati...|         03|2009-03-21 05:30:00|          27220.69|
|Hewlett Packard L...|         04|2011-04-30 05:30:00|          9097.645|
|Polycom ViewStati...|         05|2012-05-04 05:30:00|           6138.48|
|Fellowes PB500 El...|         06|2009-06-07 05:30:00|10951.306499999999|
|Hewlett-Packard B...|         07|2009-07-31 05:30:00|          12606.81|
|Epson LQ-870 Dot ...|         08|2012-08-07 05:30:00|           7080.99|
|Hewlett Packard L...|         09|2010-09-29 05:30:00| 9791.041000000001|
|Fellowes PB500 El...|         10|2009-10-04 05:30:00|         11535.282|
|Hewlett-Packard c...|         11|2012

In [6]:
# Using User defined function in spark SQL
#import datetime
#import pytz
#import threading
#import multiprocessing

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

def process_data():
    # Define date parser function and register UDF
    def dateParser(date_input,specifier,date_format):
        if(date_format=='YYYY-MM-DD' or date_format=='YYYY/MM/DD'):
            if(specifier=='month'):
                return date_input[5:7]
            elif(specifier=='day'):
                return date_input[8:10]
            elif(specifier=='year'):
                return date_input[0:4]
            else:
                return "wrong specifier mentioned"
        elif(date_format=='DD-MM-YYYY' or date_format=='DD/MM/YYYY'):
            if(specifier=='month'):
                return date_input[3:5]
            elif(specifier=='day'):
                return date_input[0:2]
            elif(specifier=='year'):
                return date_input[6:10]
            else:
                return "wrong specifier mentioned"
        else:
            return "wrong date format specified"

    def dateParserWrapper(date_input,specifier,date_format):
        return dateParser(date_input,specifier,date_format)

    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    # converting python function to spark UDF (user defined function)
    udf_dateParser = udf(dateParserWrapper, StringType())
    
    # Registering the function in spark to use in spark SQL
    spark.udf.register("udf_dateParser", udf_dateParser)

    # Use the UDF to process the data
    output = spark.sql('''
        SELECT `Product Name`,
           udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD') AS order_month,
           SUM(Profit) OVER(PARTITION BY `Product Name`, udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD')) AS total_profit
    FROM dataset_to_view
    order BY `Product Name`, udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD')
    ''')
    output.show()

process_data()


+--------------------+-----------+--------------------+
|        Product Name|order_month|        total_profit|
+--------------------+-----------+--------------------+
|"While you Were O...|         01|                8.33|
|"While you Were O...|         05|               14.13|
|"While you Were O...|         06|               -0.31|
|"While you Were O...|         12|-0.12000000000000055|
|"While you Were O...|         12|-0.12000000000000055|
|#10 Self-Seal Whi...|         05|               21.71|
|#10 Self-Seal Whi...|         05|               21.71|
|#10 Self-Seal Whi...|         08|               52.27|
|#10 Self-Seal Whi...|         11|               27.55|
|#10 Self-Seal Whi...|         12|                39.0|
|#10 White Busines...|         04|   561.8499999999999|
|#10 White Busines...|         04|   561.8499999999999|
|#10 White Busines...|         05|             1428.78|
|#10 White Busines...|         05|             1428.78|
|#10 White Busines...|         05|             1

In [39]:
'''
When we use sum() over partition by clause, it will compute the sum of profit for each product and month combination 
separately and return a separate row for each combination. So, we may get multiple rows for the same product and month.
If we want to get a single row per product per month, we can try using the group by clause along with sum() 
aggregate function instead of using sum() over partition by clause.
'''
output = spark.sql('''
    select `Product Name`,
    udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD') as order_month,
    sum(Profit) as total_profit
    from dataset_to_view
    group by `Product Name`, order_month
    order by `Product Name`, udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD') 
''')

output.show()

+--------------------+-----------+--------------------+
|        Product Name|order_month|        total_profit|
+--------------------+-----------+--------------------+
|"While you Were O...|         01|                8.33|
|"While you Were O...|         05|               14.13|
|"While you Were O...|         06|               -0.31|
|"While you Were O...|         12|-0.12000000000000055|
|#10 Self-Seal Whi...|         05|               21.71|
|#10 Self-Seal Whi...|         08|               52.27|
|#10 Self-Seal Whi...|         11|               27.55|
|#10 Self-Seal Whi...|         12|                39.0|
|#10 White Busines...|         04|   561.8499999999999|
|#10 White Busines...|         05|             1428.78|
|#10 White Busines...|         06|              838.32|
|#10 White Busines...|         07|                9.35|
|#10 White Busines...|         10|                12.5|
|#10- 4 1/8" x 9 1...|         01|               111.4|
|#10- 4 1/8" x 9 1...|         02|              

In [8]:
# To check which Product is having max total_profit for that month
output = spark.sql('''
  select * from (
  select t.*,
  row_number() over (partition by t.order_month order by t.total_profit desc) row_num
  from 
  (
   select `Product Name`,
    udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD') as order_month,
    sum(Profit) as total_profit
    from dataset_to_view
    group by `Product Name`, order_month
    ) t
    )
    where row_num=1
''')
output.show()

+--------------------+-----------+------------------+-------+
|        Product Name|order_month|      total_profit|row_num|
+--------------------+-----------+------------------+-------+
|Global Troy™ Exec...|         01|          23685.02|      1|
|Okidata ML390 Tur...|         02|          13008.59|      1|
|GBC DocuBind 200 ...|         03|         15381.991|      1|
|Hewlett-Packard D...|         04|          18439.68|      1|
|Polycom ViewStati...|         05|           6138.48|      1|
|Fellowes PB500 El...|         06|10951.306499999999|      1|
|Hewlett-Packard B...|         07|          24168.89|      1|
|Riverside Palais ...|         08|           8573.84|      1|
|Hewlett Packard L...|         09| 9791.041000000001|      1|
|Canon PC1080F Per...|         10|         15991.267|      1|
|Hewlett-Packard c...|         11|          18240.54|      1|
|Global Troy™ Exec...|         12|14924.029999999999|      1|
+--------------------+-----------+------------------+-------+



In [21]:
output = spark.sql('''
    select `Product Name`,
    udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD') as order_month,
    sum(Profit) as total_profit
    from dataset_to_view
    group by `Product Name`, order_month
    order by `Product Name`, udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400),'YYYY-MM-DD'),'month','YYYY-MM-DD') 
''')

output.show()

+--------------------+-----------+--------------------+
|        Product Name|order_month|        total_profit|
+--------------------+-----------+--------------------+
|"While you Were O...|         01|                8.33|
|"While you Were O...|         05|               14.13|
|"While you Were O...|         06|               -0.31|
|"While you Were O...|         12|-0.12000000000000055|
|#10 Self-Seal Whi...|         05|               21.71|
|#10 Self-Seal Whi...|         08|               52.27|
|#10 Self-Seal Whi...|         11|               27.55|
|#10 Self-Seal Whi...|         12|                39.0|
|#10 White Busines...|         04|   561.8499999999999|
|#10 White Busines...|         05|             1428.78|
|#10 White Busines...|         06|              838.32|
|#10 White Busines...|         07|                9.35|
|#10 White Busines...|         10|                12.5|
|#10- 4 1/8" x 9 1...|         01|               111.4|
|#10- 4 1/8" x 9 1...|         02|              

In [22]:
# Which day of the month is having highest orders
from pyspark.sql.functions import *
output = spark.sql('''
    select * from (
    select sum(`Order Quantity`) as tot_orderquantity,
    udf_dateParser(from_unixtime(((CAST(`Order Date` AS DOUBLE)-25569)*86400)),'day','YYYY-MM-DD') as order_day,
    row_number() over (order by sum(`Order Quantity`) desc) rownum
    from dataset_to_view
    group by order_day
    ) where rownum=1
''')

output.show(40)

+-----------------+---------+------+
|tot_orderquantity|order_day|rownum|
+-----------------+---------+------+
|           8204.0|       20|     1|
+-----------------+---------+------+

