In [0]:
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import DecimalType

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SalesDataPipeline") \
      .getOrCreate() 

In [0]:
# Create RDD from parallelize    
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd=spark.sparkContext.parallelize(dataList)

In [0]:
# List files in the /tmp directory in DBFS
display(dbutils.fs.ls("/FileStore/tables/"))
#display(dbutils.fs.ls("dbfs:/tmp/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/Customer.csv,Customer.csv,65561,1730025405000
dbfs:/FileStore/tables/Product.csv,Product.csv,142954,1730025405000
dbfs:/FileStore/tables/Sales.csv,Sales.csv,1007318,1729964837000
dbfs:/FileStore/tables/train.csv,train.csv,25887312,1724477699000


In [0]:
# Load CSV files into DataFrames
sales_df = spark.read.csv("/FileStore/tables/Sales.csv",header=True,inferSchema=True)
sales_df.printSchema()
sales_df.show()

root
 |-- Order Line: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)

+----------+--------------+----------+----------+--------------+-----------+---------------+--------+--------+--------+--------+
|Order Line|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Product ID|   Sales|Quantity|Discount|  Profit|
+----------+--------------+----------+----------+--------------+-----------+---------------+--------+--------+--------+--------+
|         1|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|   CG-12520|FUR-BO-10001798|  261.96|       2|     0.0| 41.9136|
|         2|CA-2016-152156|2016-11

In [0]:
#dbutils.fs.mkdirs("/FileStore/tables")
#https://drive.google.com/file/d/1A0LNeOv83hjb3o5mIjQ0h2DDdpb70gz2/view?usp=drive_link
#import gdown
#download_url = f"https://drive.google.com/uc?id=1A0LNeOv83hjb3o5mIjQ0h2DDdpb70gz2"

# Destination path where the file will be saved
#output = "/tmp/Product.csv"

# Download the file from Google Drive
#gdown.download(download_url, output, quiet=False)

In [0]:
# Load CSV files into DataFrames
product_df = spark.read.csv("/FileStore/tables/Product.csv",header=True,inferSchema=True)
product_df.printSchema()
product_df.show()

root
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)

+---------------+---------------+------------+--------------------+
|     Product ID|       Category|Sub-Category|        Product Name|
+---------------+---------------+------------+--------------------+
|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset Col...|
|FUR-CH-10000454|      Furniture|      Chairs|Hon Deluxe Fabric...|
|OFF-LA-10000240|Office Supplies|      Labels|Self-Adhesive Add...|
|FUR-TA-10000577|      Furniture|      Tables|Bretford CR4500 S...|
|OFF-ST-10000760|Office Supplies|     Storage|Eldon Fold N Roll...|
|FUR-FU-10001487|      Furniture| Furnishings|Eldon Expressions...|
|OFF-AR-10002833|Office Supplies|         Art|          Newell 322|
|TEC-PH-10002275|     Technology|      Phones|Mitel 5320 IP Pho...|
|OFF-BI-10003910|Office Supplies|     Binders|DXL Angle-View Bi...|
|OFF-AP-

In [0]:
# Load CSV files into DataFrames
customer_df = spark.read.csv("/FileStore/tables/Customer.csv",header=True,inferSchema=True)
customer_df.printSchema()
customer_df.show()

root
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)

+-----------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|Customer ID|     Customer Name|    Segment|Age|      Country|           City|         State|Postal Code| Region|
+-----------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|   CG-12520|       Claire Gute|   Consumer| 67|United States|      Henderson|      Kentucky|      42420|  South|
|   DV-13045|   Darrin Van Huff|  Corporate| 31|United States|    Los Angeles|    California|      90036|   West|
|   SO-20335|    Sean O'Donnell|   Consumer| 65|United States|Fort 

In [0]:
#to show list of columns
sales_df.columns

Out[7]: ['Order Line',
 'Order ID',
 'Order Date',
 'Ship Date',
 'Ship Mode',
 'Customer ID',
 'Product ID',
 'Sales',
 'Quantity',
 'Discount',
 'Profit']

In [0]:
#Rename Columns in the sales csv
sales_df = sales_df.withColumnRenamed("Ship Date","Ship_Date").withColumnRenamed("Ship Mode","Ship_Mode").withColumnRenamed("Customer ID","Customer_ID").withColumnRenamed("Product ID","Product_ID").withColumnRenamed("Order Line","Order_Line").withColumnRenamed("Order ID","Order_ID").withColumnRenamed("Order Date","Order_Date")
sales_df = sales_df.withColumn("Sales", sales_df["Sales"].cast(DecimalType(10, 2)))
sales_df.describe().show()
sales_df.show()

+-------+------------------+--------------+--------------+-----------+---------------+-----------------+-----------------+-------------------+------------------+
|summary|        Order_Line|      Order_ID|     Ship_Mode|Customer_ID|     Product_ID|            Sales|         Quantity|           Discount|            Profit|
+-------+------------------+--------------+--------------+-----------+---------------+-----------------+-----------------+-------------------+------------------+
|  count|              9994|          9994|          9994|       9994|           9994|             9994|             9994|               9994|              9994|
|   mean|            4997.5|          null|          null|       null|           null|       229.858022|3.789573744246548|0.15620272163298934|28.656896307784802|
| stddev|2885.1636290974325|          null|          null|       null|           null|623.2451308551178|2.225109691141402|0.20645196782571626| 234.2601076909573|
|    min|                 1|

In [0]:
#Find na null values
sales_df.select("Order_Line",isnan("Order_Line")).show()


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1829244035121902>:2[0m
[1;32m      1[0m [38;5;66;03m#Find na null values[39;00m
[0;32m----> 2[0m sales_df[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mOrder Line[39m[38;5;124m"[39m,[43misnan[49m([38;5;124m"[39m[38;5;124mOrder Line[39m[38;5;124m"[39m))[38;5;241m.[39mshow()

[0;31mNameError[0m: name 'isnan' is not defined

In [0]:
#Rename Columns from the product csv
product_df = product_df.withColumnRenamed("Product ID","Product_ID").withColumnRenamed("Product Name","Product_Name").withColumnRenamed("Sub-Category","Sub_Category")
product_df.show()

+---------------+---------------+------------+--------------------+
|     Product_ID|       Category|Sub_Category|        Product_Name|
+---------------+---------------+------------+--------------------+
|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset Col...|
|FUR-CH-10000454|      Furniture|      Chairs|Hon Deluxe Fabric...|
|OFF-LA-10000240|Office Supplies|      Labels|Self-Adhesive Add...|
|FUR-TA-10000577|      Furniture|      Tables|Bretford CR4500 S...|
|OFF-ST-10000760|Office Supplies|     Storage|Eldon Fold N Roll...|
|FUR-FU-10001487|      Furniture| Furnishings|Eldon Expressions...|
|OFF-AR-10002833|Office Supplies|         Art|          Newell 322|
|TEC-PH-10002275|     Technology|      Phones|Mitel 5320 IP Pho...|
|OFF-BI-10003910|Office Supplies|     Binders|DXL Angle-View Bi...|
|OFF-AP-10002892|Office Supplies|  Appliances|Belkin F5C206VTEL...|
|FUR-TA-10001539|      Furniture|      Tables|Chromcraft Rectan...|
|TEC-PH-10002033|     Technology|      Phones|Ko

In [0]:
#Rename Columns from the customer csv
customer_df = customer_df.withColumnRenamed("Customer ID","Customer_ID").withColumnRenamed("Customer Name","Customer_Name").withColumnRenamed("Postal Code","Postal_Code")
customer_df.show()

+-----------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|Customer_ID|     Customer_Name|    Segment|Age|      Country|           City|         State|Postal_Code| Region|
+-----------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|   CG-12520|       Claire Gute|   Consumer| 67|United States|      Henderson|      Kentucky|      42420|  South|
|   DV-13045|   Darrin Van Huff|  Corporate| 31|United States|    Los Angeles|    California|      90036|   West|
|   SO-20335|    Sean O'Donnell|   Consumer| 65|United States|Fort Lauderdale|       Florida|      33311|  South|
|   BH-11710|   Brosina Hoffman|   Consumer| 20|United States|    Los Angeles|    California|      90032|   West|
|   AA-10480|      Andrew Allen|   Consumer| 50|United States|        Concord|North Carolina|      28027|  South|
|   IM-15070|      Irene Maddox|   Consumer| 66|United States|        Seattle|    Washin

In [0]:
#Join the DataFrames:
joined_df = sales_df.join(product_df, "product_id").join(customer_df, "customer_id")
joined_df.show()

+-----------+---------------+----------+--------------+----------+----------+--------------+-------+--------+--------+--------+---------------+------------+--------------------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|Customer_ID|     Product_ID|Order_Line|      Order_ID|Order_Date| Ship_Date|     Ship_Mode|  Sales|Quantity|Discount|  Profit|       Category|Sub_Category|        Product_Name|     Customer_Name|    Segment|Age|      Country|           City|         State|Postal_Code| Region|
+-----------+---------------+----------+--------------+----------+----------+--------------+-------+--------+--------+--------+---------------+------------+--------------------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|   CG-12520|FUR-BO-10001798|         1|CA-2016-152156|2016-11-08|2016-11-11|  Second Class| 261.96|       2|     0.0| 41.9136|      Furniture|   Bookcases|Bush Somer

In [0]:
joined_df.createOrReplaceTempView("sales_view")
df2 = spark.sql("SELECT count(DISTINCT(product_id)) from sales_view")
df2.show()

+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                      1862|
+--------------------------+



In [0]:
#1.What is the total sales for each product category?
Q1 = spark.sql("SELECT category,sum(sales)as sales from sales_view GROUP BY ALL")
Q1.show()

+---------------+---------+
|       category|    sales|
+---------------+---------+
|Office Supplies|719046.99|
|      Furniture|741999.98|
|     Technology|836154.10|
+---------------+---------+



In [0]:
#2.Which customer has made the highest number of purchases?
Q2 = spark.sql("SELECT Customer_Name,approx_count_distinct(Order_ID)as No_of_Purchases from sales_view GROUP BY ALL ORDER BY approx_count_distinct(Order_ID) DESC limit 1")
Q2.show()

+-------------+---------------+
|Customer_Name|No_of_Purchases|
+-------------+---------------+
|   Emily Phan|             17|
+-------------+---------------+



In [0]:
#3.What is the average discount given on sales across all products?
Q3 = spark.sql("SELECT Product_Name,round(avg(discount),2)as Avg_Discount from sales_view GROUP BY ALL ORDER BY Product_Name ASC")
Q3.show()

+--------------------+------------+
|        Product_Name|Avg_Discount|
+--------------------+------------+
|#10 Gummed Flap W...|         0.1|
|#10 Self-Seal Whi...|        0.05|
|#10 White Busines...|        0.06|
|#10- 4 1/8 x 9 1/...|        0.08|
|#10- 4 1/8 x 9 1/...|        0.08|
|#10-4 1/8 x 9 1/2...|         0.1|
|#6 3/4 Gummed Fla...|         0.2|
|1.7 Cubic Foot Co...|        0.07|
|1/4 Fold Party De...|         0.1|
|12 Colored Short ...|        0.13|
|12-1/2 Diameter R...|        0.28|
|14-7/8 x 11 Blue ...|         0.1|
|2300 Heavy-Duty T...|         0.1|
|24 Capacity Maxi ...|         0.1|
|24-Hour Round Wal...|        0.07|
|  3-ring staple pack|         0.1|
|3.6 Cubic Foot Co...|        0.52|
|36X48 HARDFLOOR C...|        0.23|
|3D Systems Cube P...|         0.0|
|3D Systems Cube P...|        0.35|
+--------------------+------------+
only showing top 20 rows



In [0]:
#4.How many unique products were sold in each region?
Q4 = spark.sql("SELECT Region,approx_count_distinct(product_id) as Product_Sold from sales_view GROUP BY ALL")
Q4.show()

+-------+------------+
| Region|Product_Sold|
+-------+------------+
|  South|        1089|
|Central|        1373|
|   East|        1471|
|   West|        1629|
+-------+------------+



In [0]:
#5.What is the total profit generated in each state?
Q5 = spark.sql("SELECT state,round(sum(profit),2) as Profit from sales_view GROUP BY ALL ORDER BY state ASC")
Q5.show()

+--------------------+--------+
|               state|  Profit|
+--------------------+--------+
|             Alabama| 2845.06|
|             Arizona|  9563.2|
|            Arkansas|  -62.95|
|          California|59398.31|
|            Colorado|  970.46|
|         Connecticut|   533.5|
|            Delaware| 3336.38|
|District of Columbia|  490.96|
|             Florida|  750.74|
|             Georgia|12781.34|
|            Illinois| 9560.15|
|             Indiana| 2707.35|
|                Iowa| 2258.59|
|              Kansas|   139.2|
|            Kentucky| 4513.31|
|           Louisiana| 2659.24|
|            Maryland|  436.65|
|       Massachusetts| 5905.54|
|            Michigan|  7752.3|
|           Minnesota| 7202.52|
+--------------------+--------+
only showing top 20 rows



In [0]:
#6.Which product sub-category has the highest sales?
Q6 = spark.sql("SELECT Sub_Category,sum(sales)as Sales from sales_view GROUP BY ALL order by sales DESC ")
Q6.show()

+------------+---------+
|Sub_Category|    Sales|
+------------+---------+
|      Phones|330007.10|
|      Chairs|328449.13|
|     Storage|223843.59|
|      Tables|206965.68|
|     Binders|203412.77|
|    Machines|189238.68|
| Accessories|167380.31|
|     Copiers|149528.01|
|   Bookcases|114880.05|
|  Appliances|107532.14|
| Furnishings| 91705.12|
|       Paper| 78479.24|
|    Supplies| 46673.52|
|         Art| 27118.80|
|   Envelopes| 16476.38|
|      Labels| 12486.30|
|   Fasteners|  3024.25|
+------------+---------+



In [0]:
#7.What is the average age of customers in each segment?
Q7 = spark.sql("SELECT segment,round(avg(age),2) as Avg_Age from sales_view GROUP BY ALL order by avg(age) DESC ")
Q7.show()

+-----------+-------+
|    segment|Avg_Age|
+-----------+-------+
|  Corporate|  44.82|
|   Consumer|  44.61|
|Home Office|  43.28|
+-----------+-------+



In [0]:
#8.How many orders were shipped in each shipping mode?
Q8 = spark.sql("SELECT ship_mode,approx_count_distinct(Order_ID)as No_of_Orders_Shipped from sales_view GROUP BY ALL order by approx_count_distinct(Order_ID) DESC ")
Q8.show()

+--------------+--------------------+
|     ship_mode|No_of_Orders_Shipped|
+--------------+--------------------+
|Standard Class|                2746|
|  Second Class|                 997|
|   First Class|                 743|
|      Same Day|                 255|
+--------------+--------------------+



In [0]:
#9.What is the total quantity of products sold in each city?
Q9 = spark.sql("SELECT city,sum(quantity)as Quantity_Sold from sales_view GROUP BY ALL order by sum(quantity) DESC ")
Q9.show()

+-------------+-------------+
|         city|Quantity_Sold|
+-------------+-------------+
|New York City|         3217|
|  Los Angeles|         2756|
| Philadelphia|         2299|
|San Francisco|         1773|
|      Houston|         1425|
|      Seattle|         1371|
|      Chicago|         1153|
|     Columbus|          854|
|       Aurora|          611|
|    San Diego|          609|
|       Dallas|          602|
| Jacksonville|          362|
|      Detroit|          332|
|  Springfield|          282|
|    Rochester|          279|
|    Charlotte|          275|
|   Wilmington|          271|
|       Tucson|          257|
|      Phoenix|          256|
|        Dover|          256|
+-------------+-------------+
only showing top 20 rows



In [0]:
#10.Which customer segment has the highest profit margin?
Q10 = spark.sql("SELECT segment,round(sum(profit),2) as Profit from sales_view GROUP BY ALL order by Profit DESC limit 1")
Q10.show()

+--------+---------+
| segment|   Profit|
+--------+---------+
|Consumer|134119.21|
+--------+---------+



In [0]:
# 10 Analytical Questions based integrated dataset
#Q11.What are the top 10 best-selling products by profit?
Q11 = spark.sql("SELECT product_name as Product_Name,round(sum(profit),2) as Profit from sales_view GROUP BY ALL ORDER BY Profit DESC limit 10")
Q11.show()

+--------------------+--------+
|        Product_Name|  Profit|
+--------------------+--------+
|Canon imageCLASS ...|25199.93|
|Fellowes PB500 El...| 7753.04|
|Hewlett Packard L...| 6983.88|
|Canon PC1060 Pers...| 4570.93|
|Logitech G19 Prog...| 4425.34|
|HP Designjet T520...| 4094.98|
|Ativa V4110MDD Mi...| 3772.95|
|3D Systems Cube P...| 3717.97|
|Ibico EPK-21 Elec...| 3345.28|
|Zebra ZM400 Therm...| 3343.54|
+--------------------+--------+



In [0]:
#Q12.What is the average profit margin for different product categories?
Q12 = spark.sql("SELECT category,round(avg(profit),2) as Profit from sales_view GROUP BY ALL ORDER BY Profit DESC ")
Q12.show()

+---------------+------+
|       category|Profit|
+---------------+------+
|     Technology| 78.75|
|Office Supplies| 20.33|
|      Furniture|   8.7|
+---------------+------+



In [0]:
#Q13.How has the performance of a specific product category changed over time?
Q13 = spark.sql("SELECT category,year(order_date) as Year,sum(sales) as Sales from sales_view  GROUP BY ALL ORDER BY category,Year")
Q13.show()

+---------------+----+---------+
|       category|Year|    Sales|
+---------------+----+---------+
|      Furniture|2014|157192.89|
|      Furniture|2015|170518.26|
|      Furniture|2016|198901.55|
|      Furniture|2017|215387.28|
|Office Supplies|2014|151776.41|
|Office Supplies|2015|137233.42|
|Office Supplies|2016|183940.07|
|Office Supplies|2017|246097.09|
|     Technology|2014|175278.26|
|     Technology|2015|162780.78|
|     Technology|2016|226364.24|
|     Technology|2017|271730.82|
+---------------+----+---------+



In [0]:
#Q14.Identify customer segments based on sales
Q14 = spark.sql("SELECT customer_name,case when sum(sales)<='7000'then 'Low-Value Customers'  when sum(sales)<='11000' then 'Mid-Value Customers'else 'High-Value Customers' end as Customer_Segmentation,sum(sales) as Sales from sales_view GROUP BY ALL ORDER BY Sales DESC ")
Q14.show()

+------------------+---------------------+--------+
|     customer_name|Customer_Segmentation|   Sales|
+------------------+---------------------+--------+
|       Sean Miller| High-Value Customers|25043.07|
|      Tamara Chand| High-Value Customers|19052.22|
|      Raymond Buch| High-Value Customers|15117.35|
|      Tom Ashbrook| High-Value Customers|14595.62|
|     Adrian Barton| High-Value Customers|14473.57|
|      Ken Lonsdale| High-Value Customers|14175.23|
|      Sanjit Chand| High-Value Customers|14142.34|
|      Hunter Lopez| High-Value Customers|12873.30|
|      Sanjit Engle| High-Value Customers|12209.44|
|Christopher Conant| High-Value Customers|12129.08|
|      Todd Sumrall| High-Value Customers|11891.75|
|         Greg Tran| High-Value Customers|11820.12|
|      Becky Martin| High-Value Customers|11789.64|
|       Seth Vernon| High-Value Customers|11470.94|
|   Caroline Jumper| High-Value Customers|11164.97|
|       Clay Ludtke|  Mid-Value Customers|10880.56|
|     Maria 

In [0]:
#Q15.What are the overall sales trends over time (e.g., monthly, quarterly, yearly)?
Q15 = spark.sql("SELECT year(order_date) as Year,sum(sales)as sales from sales_view GROUP BY ALL order by Year")
Q15.show()

+----+---------+
|Year|    sales|
+----+---------+
|2014|484247.56|
|2015|470532.46|
|2016|609205.86|
|2017|733215.19|
+----+---------+



In [0]:
#Q16.What are the overall sales trends by qtr for 2017?
Q16 = spark.sql("SELECT quarter(order_date) as Qtr,sum(sales)as sales from sales_view where year(order_date)='2017' GROUP BY ALL order by Qtr")
Q16.show()

+---+---------+
|Qtr|    sales|
+---+---------+
|  1|123144.84|
|  2|133764.33|
|  3|196251.94|
|  4|280054.08|
+---+---------+



In [0]:
#Q17.How do sales vary across different geographic regions?
Q17 = spark.sql("SELECT Region,year(order_date) as Year,sum(sales)as sales from sales_view  GROUP BY ALL order by Region,Year")
Q17.show()

+-------+----+---------+
| Region|Year|    sales|
+-------+----+---------+
|Central|2014|106767.93|
|Central|2015|103610.90|
|Central|2016|134573.40|
|Central|2017|173847.83|
|   East|2014|103371.83|
|   East|2015|133268.86|
|   East|2016|172612.32|
|   East|2017|202481.51|
|  South|2014|106921.65|
|  South|2015| 80855.83|
|  South|2016| 97603.88|
|  South|2017|116650.70|
|   West|2014|167186.15|
|   West|2015|152796.87|
|   West|2016|204416.26|
|   West|2017|240235.15|
+-------+----+---------+



In [0]:
#Q18.Top 3 products sales by each sub_category
Q18 = spark.sql("SELECT product_name,sub_category,sum(sales)as sales,Rank from(SELECT product_name,sub_category,rank() OVER (PARTITION BY sub_category ORDER BY sum(sales) DESC) as Rank,sum(sales)as sales from sales_view GROUP BY ALL) where Rank<=3 GROUP BY ALL ")
Q18.show()

+--------------------+------------+--------+----+
|        product_name|sub_category|   sales|Rank|
+--------------------+------------+--------+----+
|Logitech G19 Prog...| Accessories|13756.54|   1|
|Logitech P710e Mo...| Accessories|11203.76|   2|
|Plantronics CS510...| Accessories|10822.36|   3|
|Honeywell Envirac...|  Appliances|11304.44|   1|
|Hoover Upright Va...|  Appliances| 6832.91|   2|
|Sanyo Counter Hei...|  Appliances| 5906.52|   3|
|Hunt PowerHouse E...|         Art| 1617.95|   1|
|Boston Heavy-Duty...|         Art| 1166.44|   2|
|Hunt BOSTON Model...|         Art| 1113.02|   3|
|Fellowes PB500 El...|     Binders|27453.38|   1|
|GBC DocuBind TL30...|     Binders|19823.48|   2|
|GBC Ibimaster 500...|     Binders|19024.50|   3|
|Riverside Palais ...|   Bookcases|15610.97|   1|
|DMI Eclipse Execu...|   Bookcases|12921.64|   2|
|Atlantic Metals M...|   Bookcases| 7539.71|   3|
|HON 5400 Series T...|      Chairs|21870.57|   1|
|Global Troy Execu...|      Chairs|12975.39|   2|


In [0]:
#Q19.Preferred ship mode by customer
Q19 = spark.sql("SELECT ship_mode,approx_count_distinct(customer_id)as Customer_Count from sales_view GROUP BY ALL order by Customer_Count DESC ")
Q19.show()


+--------------+--------------+
|     ship_mode|Customer_Count|
+--------------+--------------+
|Standard Class|           751|
|  Second Class|           541|
|   First Class|           497|
|      Same Day|           236|
+--------------+--------------+



In [0]:
#Q20.Top 10 customers for the year 2017
Q20 = spark.sql("SELECT customer_name,sum(sales) as Sales from sales_view where year(order_date)='2017' GROUP BY ALL order by Sales DESC limit 10")
Q20.show()

+---------------+--------+
|  customer_name|   Sales|
+---------------+--------+
|   Raymond Buch|14203.28|
|   Tom Ashbrook|13723.50|
|   Hunter Lopez|10522.55|
|    Seth Vernon| 8459.93|
| Grant Thornton| 8167.42|
|Helen Wasserman| 8166.35|
|   Todd Sumrall| 6702.29|
|    Rick Wilson| 6193.45|
|      Pete Kriz| 5979.13|
| Karen Ferguson| 5825.46|
+---------------+--------+

