# Retail data with PySpark — schemas, loads, and core operations
Reads retail CSV files from `/content` **without headers**, defines explicit schemas per dataset, and demonstrates common PySpark DataFrame operations and simple analytics.

**Note:** Inputs have no header row, so every `.csv()` read uses `option('header', False)` and relies on the **column order** defined in each schema.

## Imports

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

## Environment

In [None]:
spark = SparkSession.builder.appName("Retail-Data-PySpark").getOrCreate()
INPUT_ROOT = "/content"  # departments.csv, categories.csv, products.csv, customers.csv, orders.csv, order_items.csv (no headers)

## What are `StructType` and `StructField`?
- **StructType** — the full table schema (an ordered list of fields). It is the blueprint Spark uses to interpret rows and enforce column names and data types when creating a DataFrame.
- **StructField** — one column definition inside a StructType. It specifies the **name**, **data type** (e.g., `IntegerType`, `StringType`, `DoubleType`, `TimestampType`), whether the value may be **nullable**, and optional **metadata**.
Because inputs lack headers, **schema order must match the CSV column order**.

## Create the orders schema

In [None]:
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", TimestampType(), True),
    StructField("order_customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

## Load the orders dataset

In [None]:
orders = spark.read.option("header", False).schema(orders_schema).csv(f"{INPUT_ROOT}/orders.csv")
orders.printSchema()
orders.show(5, truncate=False)

### Orders — to_date

In [None]:
orders.select(F.to_date("order_date").alias("order_day")).show(5, truncate=False)

### Orders — date_format yyyy-MM

In [None]:
orders.select(F.date_format("order_date","yyyy-MM").alias("year_month")).show(5, truncate=False)

### Orders — extract year

In [None]:
orders.select(F.year("order_date").alias("year")).show(5, truncate=False)

### Orders — extract month

In [None]:
orders.select(F.month("order_date").alias("month")).show(5, truncate=False)

### Orders — extract day of month

In [None]:
orders.select(F.dayofmonth("order_date").alias("day")).show(5, truncate=False)

## Create the order items schema

In [None]:
order_items_schema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_item_order_id", IntegerType(), True),
    StructField("order_item_product_id", IntegerType(), True),
    StructField("order_item_quantity", IntegerType(), True),
    StructField("order_item_subtotal", DoubleType(), True),
    StructField("order_item_product_price", DoubleType(), True)
])

## Load the order items dataset

In [None]:
order_items = spark.read.option("header", False).schema(order_items_schema).csv(f"{INPUT_ROOT}/order_items.csv")
order_items.printSchema()
order_items.show(5, truncate=False)

### Order items — derive line_amount

In [None]:
oi = order_items.withColumn("line_amount", F.col("order_item_quantity") * F.col("order_item_product_price"))
oi.select("order_item_id","order_item_order_id","line_amount").show(5, truncate=False)

### Order items — filter quantity and price > 0

In [None]:
order_items.where((F.col("order_item_quantity") >= 1) & (F.col("order_item_product_price") > 0)).show(5, truncate=False)

### Order items — between on subtotal (10.0–50.0)

In [None]:
order_items.where(F.col("order_item_subtotal").between(10.0, 50.0)).select("order_item_id","order_item_subtotal").show(5, truncate=False)

### Order items — sort by line_amount desc

In [None]:
oi.orderBy(F.desc("line_amount")).select("order_item_id","order_item_order_id","line_amount").show(5, truncate=False)

### Order items — groupBy revenue per order

In [None]:
oi.groupBy("order_item_order_id").agg(F.round(F.sum("line_amount"),2).alias("order_revenue")).orderBy(F.desc("order_revenue")).show(5, truncate=False)

## Create the customers schema

In [None]:
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("customer_password", StringType(), True),
    StructField("customer_street", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_zipcode", StringType(), True)
])

## Load the customers dataset

In [None]:
customers = spark.read.option("header", False).schema(customers_schema).csv(f"{INPUT_ROOT}/customers.csv")
customers.printSchema()
customers.show(5, truncate=False)

### Customers — select & col (customer_id, customer_city)

In [None]:
customers.select(F.col("customer_id"), F.col("customer_city")).show(5, truncate=False)

### Customers — selectExpr uppercase state

In [None]:
customers.selectExpr("customer_id", "upper(customer_state) as state_up").show(5, truncate=False)

### Customers — withColumnRenamed first/last name

In [None]:
customers.withColumnRenamed("customer_fname","first_name").withColumnRenamed("customer_lname","last_name").select("first_name","last_name").show(5, truncate=False)

### Customers — concatenate full name

In [None]:
customers.select(F.concat_ws(" ", F.col("customer_fname"), F.col("customer_lname")).alias("full_name")).show(5, truncate=False)

### Customers — drop a single column (customer_password)

In [None]:
customers.drop("customer_password").show(5, truncate=False)

### Customers — drop rows where email is null

In [None]:
customers.na.drop(subset=["customer_email"]).show(5, truncate=False)

## Create the categories schema

In [None]:
categories_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("category_department_id", IntegerType(), True),
    StructField("category_name", StringType(), True)
])

## Load the categories dataset

In [None]:
categories = spark.read.option("header", False).schema(categories_schema).csv(f"{INPUT_ROOT}/categories.csv")
categories.printSchema()
categories.show(5, truncate=False)

## Create the products schema

In [None]:
products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_category_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("product_description", StringType(), True),
    StructField("product_price", DoubleType(), True),
    StructField("product_image", StringType(), True)
])

## Load the products dataset

In [None]:
products = spark.read.option("header", False).schema(products_schema).csv(f"{INPUT_ROOT}/products.csv")
products.printSchema()
products.show(5, truncate=False)

## Create the departments schema

In [None]:
departments_schema = StructType([
    StructField("department_id", IntegerType(), True),
    StructField("department_name", StringType(), True)
])

## Load the departments dataset

In [None]:
departments = spark.read.option("header", False).schema(departments_schema).csv(f"{INPUT_ROOT}/departments.csv")
departments.printSchema()
departments.show(5, truncate=False)

### Orders — filter where status == 'COMPLETE'

In [None]:
orders.where(F.col("order_status") == "COMPLETE").select("order_id","order_status").show(5, truncate=False)

### Orders — filter where status != 'COMPLETE'

In [None]:
orders.where(F.col("order_status") != "COMPLETE").select("order_id","order_status").show(5, truncate=False)

### Orders — filter where status IN ('CLOSED','COMPLETE')

In [None]:
orders.where( (F.col("order_status") == "CLOSED") | (F.col("order_status") == "COMPLETE") ).select("order_id","order_status").show(5, truncate=False)

## Analytics — revenue for a specific order (order_id = 2)

In [None]:
oi = order_items.withColumn("line_amount", F.col("order_item_quantity") * F.col("order_item_product_price"))
oi.filter(F.col("order_item_order_id")==2).agg(F.round(F.sum("line_amount"),2).alias("revenue_for_order_2")).show(truncate=False)

## Analytics — revenue by category

In [None]:
oi = order_items.withColumn("line_amount", F.col("order_item_quantity") * F.col("order_item_product_price"))
rev_by_cat = (oi.join(products, oi.order_item_product_id == products.product_id, "left")
                .join(categories, products.product_category_id == categories.category_id, "left")
                .groupBy("category_id","category_name")
                .agg(F.round(F.sum("line_amount"),2).alias("revenue"))
                .orderBy(F.desc("revenue")))
rev_by_cat.show(10, truncate=False)

## Analytics — top products by revenue

In [None]:
oi = order_items.withColumn("line_amount", F.col("order_item_quantity") * F.col("order_item_product_price"))
top_products = (oi.join(products, oi.order_item_product_id == products.product_id, "left")
                  .groupBy("product_id","product_name")
                  .agg(F.round(F.sum("line_amount"),2).alias("revenue"))
                  .orderBy(F.desc("revenue")))
top_products.show(10, truncate=False)

## Analytics — top customers by spend

In [None]:
oi = order_items.withColumn("line_amount", F.col("order_item_quantity") * F.col("order_item_product_price"))
cust_spend = (oi.join(orders, oi.order_item_order_id == orders.order_id, "left")
                .join(customers, orders.order_customer_id == customers.customer_id, "left")
                .groupBy("customer_id",
                         F.concat_ws(" ", F.col("customer_fname"), F.col("customer_lname")).alias("customer_name"))
                .agg(F.round(F.sum("line_amount"),2).alias("total_spend"),
                     F.countDistinct("order_id").alias("orders"))
                .orderBy(F.desc("total_spend")))
cust_spend.show(10, truncate=False)