# Spark SQL Sales Performance Analysis




This script contains a set of Spark SQL queries focused on analyzing sales performance over time within a Databricks environment. The queries leverage Spark's distributed computing to explore temporal trends, product performance, customer segmentation, and category contributions, providing actionable insights into business operations.
Features

### Temporal Sales Analysis:

Aggregate sales, customer counts, and quantities by year and month using EXTRACT, DATE_TRUNC, and TO_CHAR for flexible date formatting.
Calculate running totals and moving averages for yearly sales and prices.


### Product Performance:

Analyze yearly product sales, comparing current sales to historical averages and year-over-year performance using window functions (AVG, LAG).
Segment products into cost ranges (e.g., Below 100, 100-500) to understand cost distribution.


### Customer Segmentation:

Categorize customers into VIP, Regular, and New segments based on spending behavior (€5,000 threshold) and lifespan (12+ months).
Count customers in each segment to assess customer base composition.


### Category Contribution:

Identify top-performing product categories by total sales and their percentage contribution to overall sales.


### Database Schema
The queries assume a Delta Lake with the following key tables in the gold schema, optimized for Spark SQL:

dim_customers: Contains customer information (e.g., customer_key).
dim_products: Contains product details (e.g., product_key, product_name, category, cost).
fact_sales: Contains sales transactions (e.g., order_date, customer_key, product_key, sales_amount, quantity, price).

### Key Queries

Sales Over Time: Aggregate sales, customers, and quantities by year/month using Spark SQL date functions.
Running Totals and Averages: Calculate cumulative sales and moving average prices per year with window functions.
Yearly Product Performance: Compare product sales to historical averages and previous year's sales.
Product Cost Segmentation: Group products by cost ranges and count products in each range.
Customer Segmentation: Classify customers into VIP, Regular, and New based on spending and lifespan.
Category Sales Contribution: Rank categories by sales and calculate their percentage of total sales.

In [0]:
## Analyse sales performance over time

df = spark.sql("""SELECT
    EXTRACT(YEAR FROM order_date) AS order_year,
    EXTRACT(MONTH FROM order_date) AS order_month,
    SUM(sales_amount) AS total_sales,
    COUNT(DISTINCT customer_key) AS total_customers,
    SUM(quantity) AS total_quantity
FROM dwh_project.gold.fact_sales
WHERE order_date IS NOT NULL
GROUP BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date)
ORDER BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date)""")
display(df)


order_year,order_month,total_sales,total_customers,total_quantity
2010,12,43419,14,14
2011,1,469795,144,144
2011,2,466307,144,144
2011,3,485165,150,150
2011,4,502042,157,157
2011,5,561647,174,174
2011,6,737793,230,230
2011,7,596710,188,188
2011,8,614516,193,193
2011,9,603047,185,185


In [0]:
## DATE_FORMAT()

df = spark.sql("""SELECT
    DATE_FORMAT(order_date, 'yyyy-MM') AS order_month,
    SUM(sales_amount) AS total_sales,
    COUNT(DISTINCT customer_key) AS total_customers,
    SUM(quantity) AS total_quantity
FROM dwh_project.gold.fact_sales
WHERE order_date IS NOT NULL
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM')
ORDER BY order_month""")
display(df)

order_month,total_sales,total_customers,total_quantity
2010-12,43419,14,14
2011-01,469795,144,144
2011-02,466307,144,144
2011-03,485165,150,150
2011-04,502042,157,157
2011-05,561647,174,174
2011-06,737793,230,230
2011-07,596710,188,188
2011-08,614516,193,193
2011-09,603047,185,185


In [0]:
## Calculate the total sales per year and the running total of sales over time 

df = spark.sql("""SELECT
    order_year,
    total_sales,
    SUM(total_sales) OVER (ORDER BY order_year) AS running_total_sales,
    AVG(avg_price) OVER (ORDER BY order_year) AS moving_average_price
FROM
(
    SELECT 
        DATE_FORMAT(order_date, 'yyyy') AS order_year,
        SUM(sales_amount) AS total_sales,
        AVG(price) AS avg_price
    FROM dwh_project.gold.fact_sales
    WHERE order_date IS NOT NULL
    GROUP BY DATE_FORMAT(order_date, 'yyyy')
) t""")
display(df)

order_year,total_sales,running_total_sales,moving_average_price
2010,43419,43419,3101.3571428571427
2011,7075088,7118507,3147.0431923671995
2012,5842231,12960738,2671.302369634261
2013,16344878,29305616,2080.8911324983264
2014,45642,29351258,1669.3466115824176


In [0]:
## Analyze the yearly performance of products by comparing their sales to both the average sales performance of the -- product and the previous year's sales

df = spark.sql("""WITH yearly_product_sales AS (
    SELECT
        EXTRACT(YEAR FROM f.order_date) AS order_year,
        p.product_name,
        SUM(f.sales_amount) AS current_sales
    FROM dwh_project.gold.fact_sales f
    LEFT JOIN dwh_project.gold.dim_products p
        ON f.product_key = p.product_key
    WHERE f.order_date IS NOT NULL
    GROUP BY 
        EXTRACT(YEAR FROM f.order_date),
        p.product_name
)
SELECT
    order_year,
    product_name,
    current_sales,
    AVG(current_sales) OVER (PARTITION BY product_name) AS avg_sales,
    current_sales - AVG(current_sales) OVER (PARTITION BY product_name) AS difference_avg,
    CASE 
        WHEN current_sales - AVG(current_sales) OVER (PARTITION BY product_name) > 0 THEN 'Above Avg'
        WHEN current_sales - AVG(current_sales) OVER (PARTITION BY product_name) < 0 THEN 'Below Avg'
        ELSE 'Avg'
    END AS avg_change,
    -- Year-over-Year Analysis
    LAG(current_sales) OVER (PARTITION BY product_name ORDER BY order_year) AS previous_year_sales,
    current_sales - LAG(current_sales) OVER (PARTITION BY product_name ORDER BY order_year) AS difference_previous_year,
    CASE 
        WHEN current_sales - LAG(current_sales) OVER (PARTITION BY product_name ORDER BY order_year) > 0 THEN 'Increase'
        WHEN current_sales - LAG(current_sales) OVER (PARTITION BY product_name ORDER BY order_year) < 0 THEN 'Decrease'
        ELSE 'No Change'
    END AS previous_year_change
FROM yearly_product_sales
ORDER BY product_name, order_year""")
display(df)

order_year,product_name,current_sales,avg_sales,difference_avg,avg_change,previous_year_sales,difference_previous_year,previous_year_change
2012,AWC Logo Cap,72,6570.0,-6498.0,Below Avg,,,No Change
2013,AWC Logo Cap,18891,6570.0,12321.0,Above Avg,72.0,18819.0,Increase
2014,AWC Logo Cap,747,6570.0,-5823.0,Below Avg,18891.0,-18144.0,Decrease
2012,All-Purpose Bike Stand,159,13197.0,-13038.0,Below Avg,,,No Change
2013,All-Purpose Bike Stand,37683,13197.0,24486.0,Above Avg,159.0,37524.0,Increase
2014,All-Purpose Bike Stand,1749,13197.0,-11448.0,Below Avg,37683.0,-35934.0,Decrease
2013,Bike Wash - Dissolver,6960,3636.0,3324.0,Above Avg,,,No Change
2014,Bike Wash - Dissolver,312,3636.0,-3324.0,Below Avg,6960.0,-6648.0,Decrease
2013,Classic Vest- L,11968,6240.0,5728.0,Above Avg,,,No Change
2014,Classic Vest- L,512,6240.0,-5728.0,Below Avg,11968.0,-11456.0,Decrease


In [0]:
## Segment products into cost ranges and count how many products fall into each segment

df = spark.sql("""WITH product_segments AS (
    SELECT
        product_key,
        product_name,
        cost,
        CASE 
            WHEN cost < 100 THEN 'Below 100'
            WHEN cost BETWEEN 100 AND 500 THEN '100-500'
            WHEN cost BETWEEN 500 AND 1000 THEN '500-1000'
            ELSE 'Above 1000'
        END AS cost_range
    FROM dwh_project.gold.dim_products
)
SELECT 
    cost_range,
    COUNT(product_key) AS total_products
FROM product_segments
GROUP BY cost_range
ORDER BY total_products DESC""")
display(df)

cost_range,total_products
Below 100,110
100-500,101
500-1000,45
Above 1000,39


In [0]:

## Group customers into three segments based on their spending behavior: VIP: Customers with at least 12 months of history and spending more than €5,000. Regular: Customers with at least 12 months of history but spending €5,000 or less New: Customers with a lifespan less than 12 months. And find the total number of customers by each group


df = spark.sql("""WITH customer_spending AS (
    SELECT
        c.customer_key,
        SUM(f.sales_amount) AS total_spending,
        MIN(f.order_date) AS first_order,
        MAX(f.order_date) AS last_order,
        CAST(
            MONTHS_BETWEEN(MAX(f.order_date), MIN(f.order_date))
            AS INT
        ) AS lifespan
    FROM dwh_project.gold.fact_sales f
    LEFT JOIN dwh_project.gold.dim_customers c
        ON f.customer_key = c.customer_key
    GROUP BY c.customer_key
)
SELECT 
    customer_segment,
    COUNT(customer_key) AS total_customers
FROM (
    SELECT 
        customer_key,
        CASE 
            WHEN lifespan >= 12 AND total_spending > 5000 THEN 'VIP'
            WHEN lifespan >= 12 AND total_spending <= 5000 THEN 'Regular'
            ELSE 'New'
        END AS customer_segment
    FROM customer_spending
) AS segmented_customers
GROUP BY customer_segment
ORDER BY total_customers DESC""")
display(df)

customer_segment,total_customers
New,14828
Regular,2037
VIP,1619


In [0]:
## Categories contribute the most to overall sales

df = spark.sql("""WITH category_sales AS (
    SELECT
        p.category,
        SUM(f.sales_amount) AS total_sales
    FROM dwh_project.gold.fact_sales f
    LEFT JOIN dwh_project.gold.dim_products p
        ON p.product_key = f.product_key
    GROUP BY p.category
)
SELECT
    category,
    total_sales,
    SUM(total_sales) OVER () AS overall_sales,
    ROUND((total_sales::NUMERIC / SUM(total_sales) OVER ()) * 100, 2) AS percentage_of_total
FROM category_sales
ORDER BY total_sales DESC""")
display(df)

category,total_sales,overall_sales,percentage_of_total
Bikes,28316272,29356250,96.46
Accessories,700262,29356250,2.39
Clothing,339716,29356250,1.16
