In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

In [None]:
spark = SparkSession.builder.appName("Retail-Data-PySpark").getOrCreate()
INPUT_ROOT = "/content"

In [None]:
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", TimestampType(), True),
    StructField("order_customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

order_items_schema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_item_order_id", IntegerType(), True),
    StructField("order_item_product_id", IntegerType(), True),
    StructField("order_item_quantity", IntegerType(), True),
    StructField("order_item_subtotal", DoubleType(), True),
    StructField("order_item_product_price", DoubleType(), True)
])

customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("customer_password", StringType(), True),
    StructField("customer_street", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_zipcode", StringType(), True)
])

categories_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("category_department_id", IntegerType(), True),
    StructField("category_name", StringType(), True)
])

products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_category_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("product_description", StringType(), True),
    StructField("product_price", DoubleType(), True),
    StructField("product_image", StringType(), True)
])

departments_schema = StructType([
    StructField("department_id", IntegerType(), True),
    StructField("department_name", StringType(), True)
])

In [None]:
orders = spark.read.option("header", False).schema(orders_schema).csv(f"{INPUT_ROOT}/orders.csv")
order_items = spark.read.option("header", False).schema(order_items_schema).csv(f"{INPUT_ROOT}/order_items.csv")
customers = spark.read.option("header", False).schema(customers_schema).csv(f"{INPUT_ROOT}/customers.csv")
categories = spark.read.option("header", False).schema(categories_schema).csv(f"{INPUT_ROOT}/categories.csv")
products = spark.read.option("header", False).schema(products_schema).csv(f"{INPUT_ROOT}/products.csv")
departments = spark.read.option("header", False).schema(departments_schema).csv(f"{INPUT_ROOT}/departments.csv")


In [None]:
orders.createOrReplaceTempView("orders")
order_items.createOrReplaceTempView("order_items")
customers.createOrReplaceTempView("customers")
categories.createOrReplaceTempView("categories")
products.createOrReplaceTempView("products")
departments.createOrReplaceTempView("departments")

# Retail SQL → Spark SQL Queries

Each query is explained with a Markdown cell and executed in Spark SQL using `spark.sql()`.

### 1. Distinct order status
Taking distinct order statuses from the orders table and sorting them.

In [None]:
spark.sql("""
SELECT DISTINCT order_status
FROM orders
ORDER BY order_status
""").show()

### 2. Orders with COMPLETE status

In [None]:
spark.sql("SELECT * FROM orders WHERE order_status='COMPLETE'").show()

### 3. Orders with CLOSED status

In [None]:
spark.sql("SELECT * FROM orders WHERE order_status='CLOSED'").show()

### 4. Orders with CLOSED or COMPLETE status

In [None]:
spark.sql("SELECT * FROM orders WHERE order_status IN ('CLOSED','COMPLETE')").show()

### 5. Count of orders

In [None]:
spark.sql("SELECT COUNT(*) AS order_count FROM orders").show()

### 6. Count of order items

In [None]:
spark.sql("SELECT COUNT(*) AS order_items_count FROM order_items").show()

### 7. Count of distinct order statuses

In [None]:
spark.sql("SELECT COUNT(DISTINCT order_status) AS distinct_status_count FROM orders").show()

### 8. Order revenue per order

In [None]:
spark.sql("""
SELECT order_item_order_id,
       ROUND(SUM(order_item_subtotal),2) AS order_revenue
FROM order_items
GROUP BY order_item_order_id
ORDER BY order_item_order_id
""").show()

### 9. Orders with >=120 per date

In [None]:
spark.sql("""
SELECT order_date, COUNT(*) AS order_count
FROM orders
WHERE order_status IN ('COMPLETE','CLOSED')
GROUP BY order_date
HAVING COUNT(*) >= 120
ORDER BY order_count DESC
""").show()

### 10. Orders with revenue >=2000

In [None]:
spark.sql("""
SELECT order_item_order_id,
       ROUND(SUM(order_item_subtotal),2) AS order_revenue
FROM order_items
GROUP BY order_item_order_id
HAVING ROUND(SUM(order_item_subtotal),2) >= 2000
ORDER BY order_revenue DESC
""").show()

### 11. Inner Join Orders + Order Items

In [None]:
spark.sql("""
SELECT o.order_date, oi.order_item_product_id, oi.order_item_subtotal
FROM orders o
JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()

### 12. Left Outer Join Orders + Order Items

In [None]:
spark.sql("""
SELECT o.order_id, o.order_date,
       oi.order_item_id, oi.order_item_product_id, oi.order_item_subtotal
FROM orders o
LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
ORDER BY o.order_id
""").show()

### 13. Daily Revenue temp view

In [None]:
spark.sql("""
CREATE OR REPLACE TEMP VIEW daily_revenue AS
SELECT to_date(o.order_date) AS order_date,
       ROUND(SUM(oi.order_item_subtotal),2) AS order_revenue
FROM orders o
JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE','CLOSED')
GROUP BY to_date(o.order_date)
""")
spark.sql("SELECT * FROM daily_revenue ORDER BY order_date").show()

### 14. Daily Product Revenue temp view

In [None]:
spark.sql("""
CREATE OR REPLACE TEMP VIEW daily_product_revenue AS
SELECT to_date(o.order_date) AS order_date,
       oi.order_item_product_id,
       ROUND(SUM(oi.order_item_subtotal),2) AS order_revenue
FROM orders o
JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE','CLOSED')
GROUP BY to_date(o.order_date), oi.order_item_product_id
""")
spark.sql("SELECT * FROM daily_product_revenue ORDER BY order_date, order_revenue DESC").show()

### 15. Monthly Revenue with Window

In [None]:
spark.sql("""
SELECT date_format(dr.order_date,'yyyy-MM') AS order_month,
       dr.order_date,
       dr.order_revenue,
       SUM(dr.order_revenue) OVER (PARTITION BY date_format(dr.order_date,'yyyy-MM')) AS monthly_order_revenue
FROM daily_revenue dr
ORDER BY dr.order_date
""").show()

### 16. Total Revenue with Window

In [None]:
spark.sql("""
SELECT dr.*,
       SUM(order_revenue) OVER () AS total_order_revenue
FROM daily_revenue dr
ORDER BY dr.order_date
""").show()

### 17. Top 5 Products per Day (Window)

In [None]:
spark.sql("""
WITH daily_product_revenue_ranks AS (
  SELECT order_date,
         order_item_product_id,
         order_revenue,
         RANK() OVER (PARTITION BY order_date ORDER BY order_revenue DESC) AS rnk,
         DENSE_RANK() OVER (PARTITION BY order_date ORDER BY order_revenue DESC) AS drnk
  FROM daily_product_revenue
  WHERE date_format(order_date,'yyyy-MM')='2014-01'
)
SELECT *
FROM daily_product_revenue_ranks
WHERE drnk <= 5
ORDER BY order_date, order_revenue DESC
""").show()