In [1]:
# !pip install pyspark

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.sql.types import IntegerType

In [3]:
spark1 = (SparkSession.builder.appName("SparkExampleApp").getOrCreate())

In [4]:
csv_file = "SalesData.csv"
# df = spark1.read.format("csv").option("inferSchema", "true").option("header","true").load(csv_file)

### option('escape','"') to avoid confusion with inches("), "" is used when there is , in a value
df = spark1.read.format("csv").options(header='true', inferschema='true').option('escape','"').load(csv_file)

In [5]:
df.createOrReplaceTempView("u")

In [6]:
df.printSchema()
df.show(2)

### change data type if necessary
# df = df.withColumn("Quantity", df["Quantity"].cast(IntegerType())) # or cast('float')
# df.groupBy().sum("Quantity").show()

# df2 = df.select("Order ID","Sub-Category","Product Name","Quantity")
# df2 = df[(df["Order ID"] == "CA-2017-132738") | (df["Order ID"] == "CA-2014-112851")]
# df2.select("Product Name").show(20,False)

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)

+------+--------------+----------+----------+------------+-----------+-------------+--------+-------------+---------+--------+-----------+------

### 1. Find the best-selling product sub-category

In [7]:
# df.groupBy().sum("Quantity").show()
Sales_by_SubCat = df.groupBy("Sub-Category").agg(sum("Quantity").alias("Total Quantity Sold"))
Sales_by_SubCat.orderBy("Total Quantity Sold", ascending = False).show()
# Sales_by_SubCat.groupBy().sum("Total Quantity Sold").show()

+------------+-------------------+
|Sub-Category|Total Quantity Sold|
+------------+-------------------+
|     Binders|               5974|
|       Paper|               5178|
| Furnishings|               3563|
|      Phones|               3289|
|     Storage|               3158|
|         Art|               3000|
| Accessories|               2976|
|      Chairs|               2356|
|  Appliances|               1729|
|      Labels|               1400|
|      Tables|               1241|
|   Fasteners|                914|
|   Envelopes|                906|
|   Bookcases|                868|
|    Supplies|                647|
|    Machines|                440|
|     Copiers|                234|
+------------+-------------------+



### 2. Find the product category generating the highest revenue

In [8]:
rev_by_Cat = df.groupBy("Category").agg(sum("Sales").alias("Total Sales"))
rev_by_Cat.orderBy("Total Sales", ascending = False).show()

+---------------+-----------------+
|       Category|      Total Sales|
+---------------+-----------------+
|     Technology|836154.0329999966|
|      Furniture|741999.7952999998|
|Office Supplies|719047.0320000029|
+---------------+-----------------+



### 3. Compile a top 10 list of the most valuable customers

In [9]:
Sales_by_Cust = df.groupBy("Customer ID").agg(sum("Sales").alias("Total Sales"))
Sales_by_Cust.orderBy("Total Sales", ascending = False).limit(10).show()

+-----------+------------------+
|Customer ID|       Total Sales|
+-----------+------------------+
|   SM-20320|          25043.05|
|   TC-20980|19052.217999999997|
|   RB-19360|         15117.339|
|   TA-21385|          14595.62|
|   AB-10105|14473.570999999998|
|   KL-16645|         14175.229|
|   SC-20095|14142.333999999999|
|   HL-15040|12873.297999999999|
|   SE-20110|12209.438000000002|
|   CC-12370|         12129.072|
+-----------+------------------+



### 4. Determine the state responsible for the highest number of orders

In [10]:
order_by_Sate = df.groupBy("State").agg(sum("Quantity").alias("Total orders"))
order_by_Sate.orderBy("Total orders", ascending = False).show(5)

+------------+------------+
|       State|Total orders|
+------------+------------+
|  California|        7667|
|    New York|        4224|
|       Texas|        3724|
|Pennsylvania|        2153|
|  Washington|        1883|
+------------+------------+
only showing top 5 rows



### 5. Find the year with the highest revenue generation

In [11]:
df = df.withColumn("Order Date",to_date("Order Date", "m/d/yyyy"))
df = df.withColumn("Year", year("Order Date"))

rev_by_year = df.groupBy("Year").agg(sum("Sales").alias("Total Revenue"))
rev_by_year = rev_by_year.orderBy("Total revenue", ascending = False)
rev_by_year.withColumn("Total revenue", round("Total revenue",2)).show()

+----+-------------+
|Year|Total revenue|
+----+-------------+
|2017|    733215.26|
|2016|     609205.6|
|2014|     484247.5|
|2015|    470532.51|
+----+-------------+

