First we import Spark and create a Spark Session. 

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SuperstoreAssignment").getOrCreate()

Then we create a dataframe from the sample data set as a .csv file. Then we take a look at the schema. 

In [120]:
file_path = "Downloads/Superstore.csv"

df = spark.read \
    .format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(file_path)


    

In [121]:
df.printSchema()
df.count()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)



9994

After a little bit of tinkering, a problem with the data set is discovered. One issue is that several of the quantitative variables (e.g., Sales, Quantity, and Discount) are cast as strings. While this might be a problem with inferring the schema and can be manually changed, a little more investigation reveals some entries in the Sales column being strings (e.g., Blue), and the type of numerical values in the same row after the Sales column don't match the expected format. Digging a bit more reveals a problem reading in the Product Name column in the. Several of these descriptions have a single quotation mark (") to designate inches, which throws off the start and end of the string, which then reads the next comma as the delimeter even though it is supposed to be a part of the string in the Product Name. Converting the Sales to double turns these character entries to NULL, and we can count the number of such instances. 

In [123]:
df = df.withColumn("Sales", df.Sales.cast("double"))
df.printSchema()

df.createOrReplaceTempView("table_init")

spark.sql("SELECT COUNT(`Product ID`) as Num_Null \
            FROM table_init \
            WHERE Sales IS NULL;").show()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)

+--------+
|Num_Null|
+--------+
|     300|
+--------+



There are 300 such fields. This is nontrivial, so we'll need to do a find and replace to get rid of all " occurances within the strings. This could be done in python, but will actually be a lot easier in a spreadsheet. I've done this and created a new .csv file called Superstore1.csv. Now when we load this, Spark correctly infers the variable type of the last four variables. 

In [125]:
file_path = "Downloads/Superstore1.csv"

df1 = spark.read \
    .format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(file_path)


df1.printSchema()
df1.count()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



9994

To have a more consistent naming convention and to avoid using backticks on field names with spaces and hyphens, I'm going to rename fields with an underscore convention. 

Then, we create a SQL table. 

In [127]:
df2 = df1.withColumnsRenamed({"Row ID": "Row_ID", "Order ID": "Order_ID", "Order Date": "Order_Date", \
                       "Ship Date": "Ship_Date", "Ship Mode": "Ship_Mode", "Customer ID": "Customer_ID", \
                       "Customer Name": "Customer_Name", "Postal Code": "Postal_Code", \
                       "Product ID": "Product_ID", "Sub-Category": "Sub_Category","Product Name": "Product_Name"})

df2.printSchema()

root
 |-- Row_ID: integer (nullable = true)
 |-- Order_ID: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal_Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub_Category: string (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



In [128]:
df2.createOrReplaceTempView("table")

Since we have several tasks involving revenue and sales, and since we don't have the ability to ask about how to interpret these fields, we'll look at some of these values below. 

By looking at these (very anecdotally, but sufficient to understand the convention, here) we see that 

\begin{equation}
\mbox{Sales} = \mbox{(Per Item Price)} \times \mbox{(1-Discount)} \times \mbox{(Quantity)}.
\end{equation}

The Per Item Price is not given to us in this data set, but can be reverse engineered using the above relationship. 

It is most relevant to note that the Sales field is the total Revenue for the particular Order of that row. 


In [52]:
spark.sql("SELECT Product_ID, Sales, Quantity, Discount, Profit \
            FROM table \
            ORDER BY Product_ID DESC \
            LIMIT 10;").show()

+---------------+--------+--------+--------+--------+
|     Product_ID|   Sales|Quantity|Discount|  Profit|
+---------------+--------+--------+--------+--------+
|TEC-PH-10004977|1097.544|       7|     0.2|123.4737|
|TEC-PH-10004977| 470.376|       3|     0.2| 52.9173|
|TEC-PH-10004977| 627.168|       4|     0.2| 70.5564|
|TEC-PH-10004977|  391.98|       2|       0|113.6742|
|TEC-PH-10004977| 627.168|       4|     0.2| 70.5564|
|TEC-PH-10004977|  391.98|       2|       0|113.6742|
|TEC-PH-10004977| 470.376|       3|     0.2| 52.9173|
|TEC-PH-10004977|  979.95|       5|       0|284.1855|
|TEC-PH-10004977| 235.188|       2|     0.4|-43.1178|
|TEC-PH-10004959|  100.49|       1|       0| 25.1225|
+---------------+--------+--------+--------+--------+



It is also important to note that each Order may consist of several rows, i.e., the same Order_ID may occur on many rows, one for each Product_ID of that order. 

In [130]:
spark.sql("SELECT Order_ID, Product_ID, Sales, Quantity, Discount, Profit  \
            FROM table \
            ORDER BY Order_ID;").show()

+--------------+---------------+-------+--------+--------+--------+
|      Order_ID|     Product_ID|  Sales|Quantity|Discount|  Profit|
+--------------+---------------+-------+--------+--------+--------+
|CA-2014-100006|TEC-PH-10002075| 377.97|       3|     0.0|109.6113|
|CA-2014-100090|FUR-TA-10003715|502.488|       3|     0.2|-87.9354|
|CA-2014-100090|OFF-BI-10001597|196.704|       6|     0.2| 68.8464|
|CA-2014-100293|OFF-PA-10000176| 91.056|       6|     0.2| 31.8696|
|CA-2014-100328|OFF-BI-10000343|  3.928|       1|     0.2|  1.3257|
|CA-2014-100363|OFF-FA-10000611|  2.368|       2|     0.2|  0.8288|
|CA-2014-100363|OFF-PA-10004733| 19.008|       3|     0.2|  6.8904|
|CA-2014-100391|OFF-PA-10001471|  14.62|       2|     0.0|  6.7252|
|CA-2014-100678|OFF-AR-10001868|  2.688|       2|     0.2|   1.008|
|CA-2014-100678|FUR-CH-10002602|317.058|       3|     0.3|-18.1176|
|CA-2014-100678|OFF-EN-10000056|149.352|       3|     0.2| 50.4063|
|CA-2014-100678|TEC-AC-10000474|227.976|       3

Now, we address the assignment questions. 

1. Determine the best-selling product sub-category.

In [136]:
spark.sql("SELECT Sub_Category, SUM(Quantity) as Number_Sold \
            FROM table \
            GROUP BY Sub_Category \
            ORDER BY Number_Sold DESC;").show()


+------------+-----------+
|Sub_Category|Number_Sold|
+------------+-----------+
|     Binders|       5974|
|       Paper|       5178|
| Furnishings|       3563|
|      Phones|       3289|
|     Storage|       3158|
|         Art|       3000|
| Accessories|       2976|
|      Chairs|       2356|
|  Appliances|       1729|
|      Labels|       1400|
|      Tables|       1241|
|   Fasteners|        914|
|   Envelopes|        906|
|   Bookcases|        868|
|    Supplies|        647|
|    Machines|        440|
|     Copiers|        234|
+------------+-----------+



2. Identify the product category generating the highest revenue.

In [133]:
spark.sql("SELECT Category, SUM(Sales) as Total_Revenue \
             FROM table \
             GROUP BY Category \
             ORDER BY Total_Revenue DESC;").show()


+---------------+-----------------+
|       Category|    Total_Revenue|
+---------------+-----------------+
|     Technology|836154.0329999966|
|      Furniture|741999.7952999998|
|Office Supplies|719047.0320000029|
+---------------+-----------------+



3. Compile a top 10 list of the most valuable customers.

In [134]:
spark.sql("SELECT Customer_Name, SUM(Profit) as Total_Profit \
            FROM table \
            GROUP BY Customer_Name \
            ORDER BY Total_Profit DESC \
            LIMIT 10;").show()


+--------------------+------------------+
|       Customer_Name|      Total_Profit|
+--------------------+------------------+
|        Tamara Chand| 8981.323900000001|
|        Raymond Buch|         6976.0959|
|        Sanjit Chand| 5757.411899999999|
|        Hunter Lopez|5622.4292000000005|
|       Adrian Barton|         5444.8055|
|        Tom Ashbrook| 4703.788299999999|
|Christopher Martinez|3899.8903999999998|
|       Keith Dawkins|         3038.6254|
|         Andy Reiter|2884.6207999999997|
|       Daniel Raglin|2869.0760000000005|
+--------------------+------------------+



4. Determine the state responsible for the highest number of orders.

In [174]:
spark.sql("SELECT State, COUNT(DISTINCT Order_ID) as Number_Orders \
            FROM table \
            GROUP BY State \
            ORDER BY Number_Orders DESC \
            LIMIT 10;").show()


+--------------+-------------+
|         State|Number_Orders|
+--------------+-------------+
|    California|         1021|
|      New York|          562|
|         Texas|          487|
|  Pennsylvania|          288|
|      Illinois|          276|
|    Washington|          256|
|          Ohio|          236|
|       Florida|          200|
|North Carolina|          136|
|      Michigan|          117|
+--------------+-------------+



5. Find the year with the highest revenue generation.

In [184]:
spark.sql("SELECT years.Year AS Year, SUM(table.Sales) AS Annual_Revenue \
          FROM (SELECT Row_ID AS ID, RIGHT(Order_Date, 4) AS Year \
              FROM table) AS years \
          LEFT JOIN table \
          ON table.Row_ID = years.ID \
          GROUP BY years.Year \
          ORDER BY Annual_Revenue DESCcontinue;").show()

+----+------------------+
|Year|    Annual_Revenue|
+----+------------------+
|2017| 733215.2551999999|
|2016| 609205.5980000008|
|2014| 484247.4981000009|
|2015|470532.50899999985|
+----+------------------+

