##Study Case: Analyzing Sales Data

Imagine you're working with a dataset of sales transactions, and you want to perform various analytical tasks using Spark SQL, including calculating running totals, ranking products by sales, and summarizing sales by different categories.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=c246619f32b59552e27b483821ec84e68348188c0057ba2a94075ccdbb9c9fac
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


##Creating Example DataFrames:
We will create two DataFrames `salesDF` and `productsDF`, to represent sales transactions and product information.

In [28]:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()


# Sample data for sales transactions:

sales_data = [
    Row(transaction_id=1, product_id=101, sale_date = '2023-01-01', quantity = 3, price = 50.0),
    Row(transaction_id=2, product_id=102, sale_date = '2023-01-02', quantity = 2, price=30.0),
    Row(transaction_id=3, product_id=101, sale_date='2023-01-03', quantity=4, price=60.0),
    Row(transaction_id=4, product_id=103, sale_date='2023-01-04', quantity=5, price=25.0),
    Row(transaction_id=5, product_id=104, sale_date='2023-01-05', quantity=2, price=40.0),
    Row(transaction_id=6, product_id=102, sale_date='2023-01-06', quantity=3, price=30.0),
    Row(transaction_id=7, product_id=104, sale_date='2023-01-07', quantity=1, price=40.0),
    Row(transaction_id=8, product_id=105, sale_date='2023-01-08', quantity=2, price=50.0),
    Row(transaction_id=9, product_id=101, sale_date='2023-01-09', quantity=3, price=60.0),
    Row(transaction_id=10, product_id=102, sale_date='2023-01-10', quantity=4, price=30.0),
    Row(transaction_id=11, product_id=103, sale_date='2023-01-11', quantity=2, price=25.0),
    Row(transaction_id=12, product_id=104, sale_date='2023-01-12', quantity=1, price=40.0),
    Row(transaction_id=13, product_id=105, sale_date='2023-01-13', quantity=5, price=50.0),
    Row(transaction_id=14, product_id=101, sale_date='2023-01-14', quantity=2, price=60.0),
    Row(transaction_id=15, product_id=102, sale_date='2023-01-15', quantity=3, price=30.0),
    Row(transaction_id=16, product_id=103, sale_date='2023-01-16', quantity=1, price=25.0),
]


products_data = [
    Row(product_id=101, product_category='Electronics'),
    Row(product_id=102, product_category='Clothing'),
    Row(product_id=103, product_category='Electronics'),
    Row(product_id=104, product_category='Clothing'),
    Row(product_id=105, product_category='Electronics'),
    Row(product_id=106, product_category='Books'),
    Row(product_id=107, product_category='Books'),
    Row(product_id=108, product_category='Clothing'),
    Row(product_id=109, product_category='Electronics'),
    Row(product_id=110, product_category='Electronics'),
    Row(product_id=111, product_category='Books'),
    Row(product_id=112, product_category='Clothing'),
    Row(product_id=113, product_category='Books'),
    Row(product_id=114, product_category='Books'),
    Row(product_id=115, product_category='Clothing'),
    Row(product_id=116, product_category='Clothing'),
    Row(product_id=117, product_category='Electronics'),
    Row(product_id=118, product_category='Books'),
    Row(product_id=119, product_category='Books'),

]

In [29]:
#Defining the Schema for each data frame:

sales_schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("sale_date", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", FloatType(), True),
])

products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_category", StringType(), True),
])

In [30]:
# Create DataFrames
salesDF = spark.createDataFrame(sales_data, sales_schema)
productsDF = spark.createDataFrame(products_data, products_schema)


# Convert date strings to DateType using to_date function
sales_data = [Row(transaction_id=row.transaction_id,
                   product_id=row.product_id,
                   sale_date=to_date(row.sale_date, 'yyyy-MM-dd'),
                   quantity=row.quantity,
                   price=row.price)
              for row in sales_data]


# Register DataFrames as temporary SQL tables
salesDF.createOrReplaceTempView("sales")
productsDF.createOrReplaceTempView("products")

In [31]:
#verifying the schema for each dataframe

salesDF = salesDF.withColumn("sale_date", to_date("sale_date", 'yyyy-MM-dd'))

salesDF.printSchema()

print()

productsDF.printSchema()

salesDF.createOrReplaceTempView("sales")
productsDF.createOrReplaceTempView("products")

root
 |-- transaction_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- sale_date: date (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: float (nullable = true)


root
 |-- product_id: integer (nullable = true)
 |-- product_category: string (nullable = true)



#Example 1: Calculating the Running Total

In this example, we use a window function to calculate the running total of sales for each transaction.

In [39]:
totalDF = spark.sql("""

  SELECT
      transaction_id,
      product_id,
      sale_date,
      quantity,
      price,
      SUM(price * quantity) OVER (PARTITION BY transaction_id ORDER BY sale_date) AS running_total

  FROM sales



""")
totalDF.show(truncate=False)

+--------------+----------+----------+--------+-----+-------------+
|transaction_id|product_id|sale_date |quantity|price|running_total|
+--------------+----------+----------+--------+-----+-------------+
|1             |101       |2023-01-01|3       |50.0 |150.0        |
|2             |102       |2023-01-02|2       |30.0 |60.0         |
|3             |101       |2023-01-03|4       |60.0 |240.0        |
|4             |103       |2023-01-04|5       |25.0 |125.0        |
|5             |104       |2023-01-05|2       |40.0 |80.0         |
|6             |102       |2023-01-06|3       |30.0 |90.0         |
|7             |104       |2023-01-07|1       |40.0 |40.0         |
|8             |105       |2023-01-08|2       |50.0 |100.0        |
|9             |101       |2023-01-09|3       |60.0 |180.0        |
|10            |102       |2023-01-10|4       |30.0 |120.0        |
|11            |103       |2023-01-11|2       |25.0 |50.0         |
|12            |104       |2023-01-12|1       |4

##Explanation:

The SUM(price * quantity) calculates the total sale amount for each transaction.
The OVER clause defines the window frame for the window function.
PARTITION BY transaction_id indicates that the window frame is partitioned by the transaction_id, meaning the calculation resets for each new transaction.
ORDER BY sale_date specifies the order within the window frame, which ensures that the running total is calculated based on the chronological order of sale dates.
The result includes columns for the transaction details, and the running_total column shows the cumulative sales amount for each transaction.

## Example 2: Rank Products by Sale

In this example, we use a window function to rank products by their total sales.


In [40]:
df_ranked_products = spark.sql("""

    SELECT
        product_id,
        SUM(price * quantity) AS total_sales,
        RANK() OVER (ORDER BY SUM(price * quantity) DESC) AS sales_rank

    FROM sales

    GROUP BY product_id

""").show(truncate=False)

+----------+-----------+----------+
|product_id|total_sales|sales_rank|
+----------+-----------+----------+
|101       |690.0      |1         |
|102       |360.0      |2         |
|105       |350.0      |3         |
|103       |200.0      |4         |
|104       |160.0      |5         |
+----------+-----------+----------+



##Explanation:

The SUM(price * quantity) calculates the total sales amount for each product.
The RANK() function is a window function that assigns a rank to each product based on the total sales.
ORDER BY SUM(price * quantity) DESC orders the products by descending total sales, which means the product with the highest sales gets the lowest rank.
The result shows the product_id, total_sales, and sales_rank columns, indicating the total sales for each product and its ranking.

##Example 3: Summarize Sales by Category

In this example, we use window functions to summarize sales by product category.

In [45]:
df_sales_by_category = spark.sql("""

    SELECT
        p.product_id,
        p.product_category,
        SUM(s.price * s.quantity) As total_sales

    FROM sales AS s
    INNER JOIN products p ON s.product_id = p.product_id
    GROUP BY p.product_id, p.product_category

""").show(truncate=False)

+----------+----------------+-----------+
|product_id|product_category|total_sales|
+----------+----------------+-----------+
|101       |Electronics     |690.0      |
|102       |Clothing        |360.0      |
|103       |Electronics     |200.0      |
|104       |Clothing        |160.0      |
|105       |Electronics     |350.0      |
+----------+----------------+-----------+



##Explanation:

We're performing a join between the sales and products tables to get the product category information.
The SUM(price * quantity) calculates the total sales for each product and category.
We group the results by both product_id and product_category.
The result includes columns for product_id, product_category, and total_sales, which represents the total sales within each product category.

##Example 4: Calculate the Monthly Sales Growth

In this example, we'll calculate the monthly sales growth for each product category. We'll use a CTE to prepare the data for analysis and aggregate functions to calculate growth.

In [47]:
resultDF = spark.sql("""

WITH MonthlySales AS(

    SELECT
        p.product_category,
        MONTH(s.sale_date) AS sale_month,
        SUM(s.price * s.quantity) AS monthly_sales
    FROM sales AS s
    INNER JOIN products p ON s.product_id = p.product_id
    GROUP BY p.product_category, sale_month
)

    SELECT
        product_category,
        sale_month,
        monthly_sales,
        LAG(monthly_sales, 1 , 0) OVER(PARTITION BY product_category ORDER BY sale_month) AS prev_month_sales,
        (monthly_sales - LAG(monthly_sales, 1, 0) OVER (PARTITION BY product_category ORDER BY sale_month)) AS sales_growth

    FROM MonthlySales
    ORDER BY product_category, sale_month



""").show(truncate = False)

+----------------+----------+-------------+----------------+------------+
|product_category|sale_month|monthly_sales|prev_month_sales|sales_growth|
+----------------+----------+-------------+----------------+------------+
|Clothing        |1         |520.0        |0.0             |520.0       |
|Electronics     |1         |1240.0       |0.0             |1240.0      |
+----------------+----------+-------------+----------------+------------+



##Explanation:

We use a CTE (Common Table Expression) called MonthlySales to group sales data by product category and month, calculating monthly sales totals.
In the main query, we calculate the sales growth by comparing the monthly sales with the previous month's sales using the LAG window function.
The result includes columns for product_category, sale_month, monthly_sales, prev_month_sales, and sales_growth.

##Example 5: Find Top-Selling Products by Category

In this example, we'll find the top-selling product within each product category.

In [48]:
df_top_selling_product = spark.sql("""

  WITH ProductRank AS (

    SELECT
        p.product_category,
        s.product_id,
        SUM(s.price * s.quantity) AS total_sales,
        RANK() OVER (PARTITION BY p.product_category ORDER BY SUM(s.price * s.quantity) DESC) AS rank

    FROM sales s
    INNER JOIN products p ON s.product_id = p.product_id
    GROUP BY p.product_category, s.product_id
  )

  SELECT
      product_category,
      product_id,
      total_sales
  FROM ProductRank
  WHERE rank = 1

""").show(truncate = False)

+----------------+----------+-----------+
|product_category|product_id|total_sales|
+----------------+----------+-----------+
|Clothing        |102       |360.0      |
|Electronics     |101       |690.0      |
+----------------+----------+-----------+



##Explanation:

We use a CTE called ProductRank to calculate the total sales for each product within each product category and assign a rank based on sales.
In the main query, we filter the results to include only the products with a rank of 1, indicating the top-selling product in each category.
The result includes columns for product_category, product_id, and total_sales.