# Module 5: PySpark Window Functions - Logistics & Time Analysis
**Scenario:** Working for a Logistics Company (e.g., FedEx, DHL, Maersk).

**Objective:** Analytics on "Journey Time". How long does it take for a package to move from Point A to Point B?

**The Challenge:**
Standard `groupBy` cannot look at "Relationship between rows".
*   Row 1: Package A scanned at Warehouse (10:00 AM)
*   Row 2: Package A scanned at Truck (12:00 PM)
*   **Question:** What was the delay? (Row 2 Time - Row 1 Time).

**The Solution:** Window Functions (`lag`, `lead`, `rank`).
These are the most powerful tools in SQL/Spark for time-series data.

---
## 1. Setup Environment

In [None]:
# Setup PySpark
try:
    import pyspark
    print("PySpark is already installed")
except ImportError:
    print("Installing PySpark...")
    !pip install pyspark findspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Logistics_Window_Analysis") \
    .master("local[*]") \
    .getOrCreate()

print("Spark Session Ready")

## 2. Load Logistics Tracking Data
We track 2 packages (`PKG_123` and `PKG_456`).
Each row is a "Scan Event".

*   `PKG_123`: Warehouse -> Truck -> Distribution Center -> Delivered.
*   `PKG_456`: Warehouse -> Truck (Stuck there).

In [None]:
# --- Logistics Tracking Data ---
logistics_data = [
    # Package 123 Journey
    ("PKG_123", "Warehouse_A", "2023-01-01 08:00:00"),
    ("PKG_123", "Truck_Dispatch", "2023-01-01 10:00:00"), # 2 hours later
    ("PKG_123", "Distribution_Center", "2023-01-01 15:00:00"), # 5 hours later
    ("PKG_123", "Delivered", "2023-01-01 18:00:00"),      # 3 hours later

    # Package 456 Journey (Problematic)
    ("PKG_456", "Warehouse_B", "2023-01-02 09:00:00"),
    ("PKG_456", "Truck_Dispatch", "2023-01-02 20:00:00")  # 11 hours delay!
]

schema = ["package_id", "location", "scan_timestamp"]
df_tracking = spark.createDataFrame(logistics_data, schema)

# Convert string to TimestampType (Crucial for time math)
df_tracking = df_tracking.withColumn("scan_timestamp", to_timestamp("scan_timestamp"))

print("--- Raw Tracking Data ---")
df_tracking.show(truncate=False)

## 3. Window Function: Calculate "Time Since Last Checkpoint"
We need to peek at the **Previous Row**'s timestamp.
1.  **Partition:** Group by `package_id` (Packages are independent).
2.  **Order By:** `scan_timestamp` (Chronological order).
3.  **Function:** `lag("scan_timestamp")` retrieves the value from 1 row before.

In [None]:
# 1. Define the Window
# Partition by ID (So PKG_123 doesn't mix with PKG_456)
# Order by Time (So we compare 10am to 8am, not random)
windowSpec = Window.partitionBy("package_id").orderBy("scan_timestamp")

# 2. Get Previous Timestamp using Lag()
df_with_lag = df_tracking.withColumn("previous_scan_time", lag("scan_timestamp", 1).over(windowSpec))

# 3. Calculate Delay (Current Time - Previous Time)
# unix_timestamp converts time to seconds. Difference / 3600 = Hours.
df_delays = df_with_lag.withColumn(
    "delay_hours",
    round(
        (unix_timestamp("scan_timestamp") - unix_timestamp("previous_scan_time")) / 3600,
        2
    )
).fillna(0.0, subset=["delay_hours"]) # First scan has no delay

print("--- Tracking Analysis with Delays ---")
df_delays.show(truncate=False)

# Look at PKG_456: 11 Hours delay! Management needs to see this.

## 4. Business Insight: Bottleneck Identification
We want to flag any operational step that takes > 6 hours as a **"Major Delay"**.

*   This is called **KPI Monitoring** (Key Performance Indicator).
*   Service companies sell dashboards that turn red when this happens.

In [None]:
# Filter for Major Delays
df_major_delays = df_delays.filter(col("delay_hours") > 6) \
    .select("package_id", "location", "delay_hours", "previous_scan_time")

print("--- ALERT: Major Fulfillment Delays (> 6 Hours) ---")
df_major_delays.show(truncate=False)

# PKG_456 at Truck_Dispatch took 11 hours. Someone forgot to load the truck!