# Sales Analysis using PySpark in AWS EMR

This notebook solves PySpark exercises, using both RDDs and Spark DataFrames. Original exercises were taken from [Six Spark Exercises to Rule Them All](https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565).

Schema of the dataset can be seen in image below.

![dataset](https://miro.medium.com/max/700/1*wA4xJu3LMcm_vR5pFJkLpA.png)

The metadata of the table is the following:

- Sales
    - `order_id`: The order ID
    - `product_id`: The single product sold in the order. All orders have exactly one product)
    - `seller_id`: The selling employee ID that sold the product
    - `num_pieces_sold`: The number of units sold for the specific product in the order
    - `bill_raw_text`: A string that represents the raw text of the bill associated with the order
    - `date`: The date of the order.
- Products
    - `product_id`: The product ID
    - `product_name`: The product name
    - `price`: The product price
- Sellers
    - `seller_id`: The seller ID
    - `seller_name`: The seller name
    - `daily_target`: The number of items (regardless of the product type) that the seller needs to hit his/her quota. For example, if the daily target is 100,000, the employee needs to sell 100,000 products he can hit the quota by selling 100,000 units of product_0, but also selling 30,000 units of product_1 and 70,000 units of product_2

Import data

In [15]:
data_url = "s3://emr-studio-dependencies-emrstudiostoragebucket-1adgwjaceu08j/e-93SMUU57YC2V27EKSRI9R72NC/sales-analysis-aws-spark/data/"
sellers = spark.read.parquet(data_url + "sellers_parquet")
products = spark.read.parquet(data_url + "products2_parquet")
sales = spark.read.parquet(data_url + "sales_parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Register as temporary tables

In [29]:
sellers.createOrReplaceTempView("sellers")
products.createOrReplaceTempView("products")
sales.createOrReplaceTempView("sales")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Analysis

In [24]:
from pyspark.sql.functions import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### How many orders, products and sellers are there in the data?

In [16]:
products_num = products.count()
sellers_num = sellers.count()
sales_num = sales.count()

print(f"""
Products: \t{products_num:,d}
Sellers: \t{sellers_num:,d}
Orders: \t{sales_num:,d}
""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Products: 	75,000,000
Sellers: 	10
Orders: 	20,000,040

### How many products have been sold at least once?

In [18]:
sales.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)

Approach 1

In [39]:
unique_products_sold = sales.select('product_id').distinct().count()
print(f"Products sold at least once: {unique_products_sold:,d}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Products sold at least once: 993,299

Approach 2

In [26]:
sales.agg(countDistinct('product_id')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|count(product_id)|
+-----------------+
|           993299|
+-----------------+

Approach 3

In [41]:
spark.sql("SELECT COUNT(DISTINCT(product_id)) FROM sales").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                    993299|
+--------------------------+

### Which is the product contained in more orders?

spark.