#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

In [0]:
%sql
create catalog if not exists EnterpriseFleetAnalytics; 
create database if not exists EnterpriseFleetAnalytics.Data_Ingestion_DB; 
create volume if not exists EnterpriseFleetAnalytics.Data_Ingestion_DB.Data_DE_VL;

####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
rawdf1=spark.read.csv("/Volumes/enterprisefleetanalytics/data_ingestion_db/data_de_vl/logistics_source1",header=True,inferSchema=True)
#rawdf1.show(20,False)
display(rawdf1.take(20))
display(rawdf1.sample(.1))
#inferSchema=True ->Spark scans data and assigns data types automatically
#sample(.1) #Randomly selects ~10% of rows -> Data inspection
#take(20) -> Displaying first 20 rows (Action)
rawdf2=spark.read.csv("/Volumes/enterprisefleetanalytics/data_ingestion_db/data_de_vl/logistics_source2",header=True,inferSchema=True)
#rawdf1.show(20,False)
display(rawdf2.take(20))
display(rawdf2.sample(.1))

2.Analyse the schema, datatypes, columns etc.

In [0]:
#Important passive EDA structure functions we can use
rawdf1.printSchema() #I am realizing the id & age columns are having some non numerical values (supposed to be numeric)
rawdf2.printSchema() 
#Column names in exact order
print(rawdf1.columns) #I am understanding the column numbers/order and the column names

#Column name + datatype mapping
print(rawdf1.dtypes) #Realizing the datatype of every columns (even we can do programattic column & type identification for dynamic programming)

3.Analyse the duplicate records count and summary of the dataframe.

In [0]:
#Total row count (baseline)
print("actual count of the data",rawdf1.count()) 

#de duplicate the entire columns of the given  dataframe(SQL-style operation)
print("de-duplicated record (all columns) count sqlstyle",rawdf1.distinct().count())

#de duplicate the entire columns of the given  dataframe(DataFrame-specific API)
print("de-duplicated record (all columns) count DF api",rawdf1.dropDuplicates().count())

#de duplicate the entire columns of the given  dataframe(remove duplicates based on specific columns)
print("de-duplicated given cid column count",rawdf1.dropDuplicates(['shipment_id']).count())

#describe() provides basic statistics like count, mean, min, and max, 
display(rawdf1.describe())

# while summary() extends this by adding percentile-based distribution metrics such as median and quartiles, making it more suitable for deeper data quality analysis.
display(rawdf1.summary())

###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
#Create a Spark Session Object
#Create a Spark Session Object
from pyspark.sql.session import SparkSession
spark= SparkSession.builder.getOrCreate()
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
#Without modifying the data, identify:Shipment IDs that appear in both master_v1 and master_v2
#Set operations (INTERSECT, UNION) require same datatype

common_shipmentsdf = (rawdf1.select(col("shipment_id").cast("string"))
          .intersect(rawdf2.select(col("shipment_id").cast("string")))
           )

display(common_shipmentsdf)
common_shipmentsdf.count()

-**Records where shipment_id is non-numeric**
- rlike("^[0-9]+$") is a regular-expression (regex) check used in Spark to test whether a value is purely numeric
- Keeps rows like: 123, 4567
- ‚ùå Drops rows like: ten, 12A, 12.5, NULL
- ~col("shipment_id").rlike("^[0-9]+$") --shows 'ten'

In [0]:
#Records where shipment_id is non-numeric #both datasets
non_numeric_shipment_Bothdf = (
    rawdf1.select(("shipment_id"))
    .unionByName(rawdf2.select(col("shipment_id").cast("string")))) #combined DataFrame for shipment

non_numeric_df=(non_numeric_shipment_Bothdf
    .filter(~col("shipment_id").rlike("^[0-9]+$"))) #find non-numeric shipment IDs
display(non_numeric_df)

#age is not an integer
invalid_age_df = (
    rawdf1.select("age")
    .unionByName(rawdf2.select("age"))) #combined DataFrame for age
invalid_age_df=invalid_age_df.filter(~col("age").rlike("^[0-9]+$")) #age is not an integer
display(invalid_age_df)

Count rows having:

- 1.fewer columns than expected
- 2.more columns than expected
- split(col("value"), ",")
- Splits the raw CSV line using comma ,
- Converts the string into an array -Size() - Counts how many elements are in the array

In [0]:
from pyspark.sql.functions import split, size, col
#Split and count columns per row
rawdf1_with_count = rawdf1.withColumn(
    "col_count", size(split(col("value"), ","))
)




- Once data is loaded into a DataFrame:
- Spark enforces schema
- Every row has the same number of columns
- üëâ So you CANNOT detect ‚Äúfewer or more columns per row‚Äù from rawdf1 directly. 

In [0]:
expected_col_countdf = len(rawdf1.columns)
print("Expected column count:", expected_col_countdf)

raw_text_df = spark.read.text(
    "/Volumes/enterprisefleetanalytics/data_ingestion_db/data_de_vl/logistics_source1"
)
rawdf1_with_count = raw_text_df.withColumn(
    "col_count", size(split(col("value"), ","))
)
from pyspark.sql.functions import *
#3Ô∏è‚É£ Fewer columns than expected
fewer_columns_count = (
    rawdf1_with_count.where(col("col_count") < expected_col_countdf)
        .count()
)
print("Rows with fewer columns than expected:", fewer_columns_count)
#4Ô∏è‚É£ More columns than expected
more_columns_count = (
rawdf1_with_count
        .where(col("col_count") > expected_col_countdf)
        .count()
)
print("Rows with fewer columns than expected:", more_columns_count)

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
#1.Combining Data + Schema Merging (Structuring)
#Read both files without enforcing schema
rawdf1 = spark.read.csv(
    "/Volumes/enterprisefleetanalytics/data_ingestion_db/data_de_vl/logistics_source1",
    header=True,
    inferSchema=False
)

rawdf2 = spark.read.csv(
    "/Volumes/enterprisefleetanalytics/data_ingestion_db/data_de_vl/logistics_source2",
    header=True,
    inferSchema=False
)

#Add data_source column with values as: system1, system2 in the respective dataframes
df_system1 = rawdf1.withColumn("data_source", lit("system1"))
df_system2 = rawdf2.withColumn("data_source", lit("system2"))
df_system1.printSchema()
df_system2.printSchema()
#merged df
rawdf_merged = df_system1.unionByName(
    df_system2,
    allowMissingColumns=True
) #Missing columns auto-filled with NULL

#Align them into a single canonical schema: shipment_id, first_name, last_name, age, role, hub_location, vehicle_type, data_source
canonical_cols = ["shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","data_source"]
final_df = rawdf_merged.select(canonical_cols)
final_df.printSchema()
display(final_df)

#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
#Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
print("RawDF1 count (before cleaning):", final_df.count())
cleanseddf1 = final_df.na.drop(how="any",subset=["shipment_id", "role"])
print("RawDF1 count (after cleaning):", cleanseddf1.count())
display(cleanseddf1)
#Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
print("RawDF1 count (before cleaning):", cleanseddf1.count())
cleanseddf2 = cleanseddf1.na.drop(how="any",subset=["first_name", "last_name"])
print("RawDF1 count (after cleaning):", cleanseddf2.count())
display(cleanseddf2)
#Join Readiness Rule - Drop records where the join key is null: shipment_id
print("RawDF1 count (before cleaning):", cleanseddf2.count())
cleanseddf3 = cleanseddf2.na.drop(how="any",subset=["shipment_id"])
print("RawDF1 count (after cleaning):", cleanseddf3.count())
display(cleanseddf3)

-->na.fill is fast and simple for replacing true NULLs, but it cannot handle malformed or empty values, so it should be used only after data normalization.

- NULL ‚Üí replaced
- "" ‚Üí NOT replaced
- "null" ‚Üí NOT replaced
- "NA" ‚Üí NOT replaced
- "ten" ‚Üí NOT replaced

üîπ na.fill() ‚Äî fill NULL values only
- What it does
- Replaces NULL values
- Does not touch empty strings "" or other values
### üîπ na.replace() ‚Äî replace specific values
- df.na.replace({old_value: new_value}, subset=["col"])
- What it does
- Replaces exact values you specify
- Works on non-NULL values
- Does not replace NULLs
| Scenario             | Use              |
| -------------------- | ---------------- |
| NULL values          | `na.fill()`      |
| Known invalid values | `na.replace()`   |
| Both                 | `fill ‚Üí replace` |


In [0]:

#Age Defaulting Rule - Fill NULL values in the age column with: -1
#cleanseddf4=cleanseddf3.na.fill(-1,subset=["age"])
#---display(cleanseddf4) # only works for for true NULL. dierct check carefully does not work for empty string,null,NA,ten
#Since age is a string column, fill it with "-1" (string)
cleanseddf5 = cleanseddf3.na.fill( {"age": "-1"} )
display(cleanseddf5)
#Because the dataset was ingested without schema enforcement, numeric fields like age are strings. I defaulted nulls using string-safe values to avoid implicit casting.

#Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
cleanseddf6 = cleanseddf5.na.fill({"vehicle_type": "UNKNOWN"}
)
display(cleanseddf6)
#Invalid Age Replacement - Replace the following values in age: "ten" to -1 "" to -1
cleanseddf7 = cleanseddf6.na.replace({"ten": "-1", "": "-1"},subset=["age"])
display(cleanseddf7)
#Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
cleanseddf8 = cleanseddf7.na.replace({"truck": "LMV", "bike": "TwoWheeler"},subset=["vehicle_type"])
display(cleanseddf8)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:

#Use multiline = true for .json() to read the json file because the json file is multiline
shipment_df = (
    spark.read
        .option("multiline", "true")   # ‚≠ê critical
        .json("/Volumes/enterprisefleetanalytics/data_ingestion_db/data_de_vl/logistics_shipment_detail_3000.json")
)
shipment_df.printSchema()


Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>
3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
#Add a columnSource File: logistics_shipment_detail_3000.json : domain as 'Logistics' Add a column
#Source File: DF of logistics_shipment_detail_3000.json
#: domain as 'Logistics', current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
shipment_df1 = (shipment_df
                .withColumn("domain", lit("Logistics"))
                .withColumn("ingestion_timestamp", current_timestamp())
                .withColumn("is_expedited", lit(False))
            )
shipment_df1.printSchema()
#Column Uniformity: role - Convert to lowercase
#Source File: logistics_source1 & logistics_source2
#vehicle_type - Convert values to UPPERCASE
cleanseddf9= (
        cleanseddf8.withColumn("role", lower(col("role")))
        .withColumn("vehicle_type", upper(col("vehicle_type")))
) # or latest cleansed df for source1
display(cleanseddf9)
#Source Files: logistics_shipment_detail_3000.json (and the merged master files) hub_location - Convert values to initcap case
cleanseddf10 = cleanseddf9.withColumn("hub_location", initcap(col("hub_location"))) #new york ‚Üí New York

- Format Standardization:
- Source Files: DF of logistics_shipment_detail_3000.json
- Convert shipment_date to yyyy-MM-dd
- Ensure shipment_cost has 2 decimal precision
#to_date(col("shipment_date"), "dd-MM-yy")
> What it does -Parses the STRING into a Spark DATE -->Uses the explicit format: dd-MM-yy
# date_format(‚Ä¶, "yyyy-MM-dd")
> What it does --Takes a DATE and formats it as a STRING--Format specified: yyyy-MM-dd
- STRING (shipment_date)
-    ‚Üì to_date("dd-MM-yy")
- DATE
-    ‚Üì date_format("yyyy-MM-dd")
- STRING (formatted)

| Step   | Function        | Purpose       |
| ------ | --------------- | ------------- |
| Parse  | `to_date()`     | STRING ‚Üí DATE |
| Format | `date_format()` | DATE ‚Üí STRING |


In [0]:
##Convert to STRING formatted as yyyy-MM-dd

shipment_df2 = shipment_df1.withColumn("shipment_date",date_format(to_date(col("shipment_date"), "dd-MM-yy"),"yyyy-MM-dd"))
display(shipment_df2)
#Ensure shipment_cost has 2 decimal precision
shipment_df3 = shipment_df2.withColumn(
    "shipment_cost",
    round(col("shipment_cost"), 2)
)


Data Type Standardization
- Standardizing column data types to fix schema drift and enable mathematical operations.
- Source File: DF of merged(logistics_source1 & logistics_source2)
- age: Cast String to Integer

In [0]:
#age: Cast String to Integer
cleanseddf11 = cleanseddf10.withColumn("age",col("age").cast("int"))
cleanseddf11.printSchema()

#Source File: DF of logistics_shipment_detail_3000.json
#shipment_weight_kg: Cast to Double
shipment_df4 = shipment_df3.withColumn(
    "shipment_weight_kg",
    col("shipment_weight_kg").cast("double")
)
#is_expedited: Cast to Boolean
shipment_df5 = shipment_df4.withColumn(
    "is_expedited",
    col("is_expedited").cast("boolean")
)
shipment_df5.printSchema()
display(shipment_df5)

In [0]:
#withColumnRenamed #Rename: first_name to staff_first_name #Rename: last_name to staff_last_name
#Rename: hub_location to origin_hub_city
cleanseddf12 = (
    cleanseddf11
        .withColumnRenamed("first_name", "staff_first_name")
        .withColumnRenamed("last_name", "staff_last_name")
        .withColumnRenamed("hub_location", "origin_hub_city")
)

cleanseddf12.printSchema()
display(cleanseddf12)

- Reordering columns logically in a better standard format:
- Source File: DF of Data from all 3 files
- shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
from pyspark.sql.functions import col
cleanseddf12.printSchema()
shipment_df5.printSchema()
cleanseddf12_str = cleanseddf12.withColumn(
    "shipment_id",
    col("shipment_id").cast("string")
)

shipment_df5_str = shipment_df5.withColumn(
    "shipment_id",
    col("shipment_id").cast("string")
)

union_df = cleanseddf12_str.unionByName(shipment_df5_str, allowMissingColumns=True)
final_ordered_df = union_df.select(
    "shipment_id",          # Identifier
    "staff_first_name",     # Dimension
    "staff_last_name",      # Dimension
    "role",                 # Dimension
    "origin_hub_city",      # Location
    "shipment_cost",        # Metric
    "ingestion_timestamp"   # Audit
)

final_ordered_df.printSchema()
display(final_ordered_df)



Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
#Apply Record Level De-Duplication
recordleveldup_df = union_df.dropDuplicates()
#Apply Column Level De-Duplication (Primary Key Enforcement)
duplicate_removed= recordleveldup_df.dropDuplicates(["shipment_id"])
#using window function for deduplication removal
from pyspark.sql.window import Window
pk_window = (
    Window
    .partitionBy("shipment_id")
    .orderBy(col("ingestion_timestamp").desc())
)
# 3. Keep latest record per shipment_id
final_df = (
    duplicate_removed
    .withColumn("rn", row_number().over(pk_window))
    .filter(col("rn") == 1)
    .drop("rn")
)

##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
# Add a column load_dt using the function current_timestamp()
cleanseddf13 = cleanseddf12_str.withColumn(
    "load_dt",
    current_timestamp()
)
#Create full_name by concatenating first_name and last_name with a space separator.
#concat_ws() automatically handles NULLs
#Prevents results like "Rajesh null"
#Cleaner than concat() #concat_ws = best way to concatenate columns
cleanseddf14 = cleanseddf13.withColumn(
    "full_name",
    concat_ws(" ", col("staff_first_name"), col("staff_last_name"))
)
#Scenario: The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
display(cleanseddf14)
shipment_df6 = shipment_df5_str.withColumn(
    "route_segment",
    concat_ws("-", col("source_city"), col("destination_city"))
)
#Result: "Truck" + "_" + "500001" -> "Truck_500001"
shipment_df7 = shipment_df6.withColumn(
    "vehicle_identifier",
    concat_ws("_", col("vehicle_type"), col("shipment_id"))
)


###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

**4. Flag shipment status (`is_expedited`)**
* **Scenario:** The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
* **Action:** Flag as **'True'** if the `shipment_status` IN_TRANSIT or DELIVERED.

In [0]:
#Result: "2024-04-23" -> 2024
shipment_df6 = shipment_df5.withColumn(
    "shipment_year",
    year(to_date(col("shipment_date"), "yyyy-MM-dd"))
)
shipment_df6.printSchema()

shipment_df7 = shipment_df6.withColumn(
    "shipment_month",
    date_format(to_date(col("shipment_date"), "yyyy-MM-dd"), "MMM") #month in Jan,Feb format
)
display(shipment_df7)
shipment_df8 = shipment_df7.withColumn("is_weekend",
    when(dayofweek(to_date(col("shipment_date"), "yyyy-MM-dd")).isin(1, 7),True).otherwise(False))
display(shipment_df8)
shipment_df9 = shipment_df8.withColumn("is_expedited",
    col("shipment_status").isin("IN_TRANSIT", "DELIVERED")
)
display(shipment_df9)


###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:
# Calculate Unit Cost (cost_per_kg)
shipment_df10 = shipment_df9.withColumn(
    "cost_per_kg",
    when(col("shipment_weight_kg") > 0,
         col("shipment_cost") / col("shipment_weight_kg")
    ).otherwise(None)
)
#Track Shipment Age (days_since_shipment)
shipment_df11 = shipment_df10.withColumn(
    "days_since_shipment",
    datediff(current_date(), col("shipment_date"))
)
shipment_df12 = shipment_df11.withColumn(
    "tax_amount",
    round(col("shipment_cost") * 0.18, 2)
)
display(shipment_df12)

###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: DF of logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:
cleanseddf15 = cleanseddf14.drop(
    "staff_first_name",
    "staff_last_name"
)
cleanseddf15.printSchema()
display(cleanseddf15)

##### Splitting & Merging/Melting of Columns
*Reshaping columns to extract hidden values or combine fields for better analysis.*<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

- How **translate()** works
- **translate(column, chars_to_remove, "")**
- Removes all matching characters from the string
- Works character-by-character, not pattern-based

In [0]:
#translate(column, chars_to_remove, "")
#Action: Split order_id ("ORD100000") into two new columns
shipment_df13 = (
    shipment_df12.withColumn("order_prefix",translate(col("order_id"), "0123456789", ""))
        .withColumn("order_sequence",translate(col("order_id"), "ABCDEFGHIJKLMNOPQRSTUVWXYZ", ""))
)



- üß† How regexp_extract() works
- Uses regular expressions
- Extracts substrings matching a pattern
- Index 0 = full regex match
- Regex explained
- ^[A-Za-z]+ ‚Üí letters at the start
- [0-9]+$ ‚Üí digits at the end

In [0]:
shipment_df14 = (
    shipment_df13.withColumn(
            "order_prefix",
            regexp_extract(col("order_id"), "^[A-Za-z]+", 0)
        )
        .withColumn("order_sequence",
            regexp_extract(col("order_id"), "[0-9]+$", 0)
        )
)

In [0]:
#Action: Split shipment_date into three separate columns for partitioning:
shipment_df15 = shipment_df14.withColumn(
    "shipment_date",
    to_date(col("shipment_date"), "yyyy-MM-dd")
)
#Action: Split shipment_date into three separate columns for partitioning
shipment_df16 = (
    shipment_df15
        .withColumn("ship_year", year(col("shipment_date")))
        .withColumn("ship_month", month(col("shipment_date")))
        .withColumn("ship_day", dayofmonth(col("shipment_date")))
)
#Action: Merge source_city ("Chennai") and destination_city ("Pune") to create a descriptive route key
shipment_df17 = shipment_df16.withColumn(
    "route_lane",
    concat_ws("->", col("source_city"), col("destination_city"))
)
display(shipment_df16)

## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

-Why Python UDFs are slow 
> 1.JVM ‚Üî Python serialization
- Spark runs in JVM.
- Python UDFs run in Python workers.
> 2.Spark cannot optimize UDF logic
- Spark SQL functions:
- Are Catalyst-optimized
- Use code generation
- Can be reordered, pushed down, pruned
> 3.Python UDFs:
- Treated as black boxes
- No predicate pushdown
- No column pruning
- No whole-stage codegen

In [0]:
#UDF1 Action: Create a Python function calculate_bonus(role, age) and register it as a Spark UDF.
def calculate_bonus(role, age):
    if role == "driver" and age > 50:
        return 0.15
    elif role == "driver" and age < 30:
        return 0.05
    else:
        return 0.0
#from pyspark.sql.functions import udf
calculate_bonus_udf = udf(calculate_bonus)
cleanseddf16 = cleanseddf15.withColumn(
    "projected_bonus",
    calculate_bonus_udf(col("role"), col("age"))
)
display(cleanseddf16)

In [0]:
#Input  ‚Üí "Rajesh"
#Output ‚Üí "Ra****h"
def mask_identity(name):
    if name is None:
        return None
    if len(name) <= 3:
        return name  # too short to mask meaningfully
    return name[:2] + "****" + name[-1]
mask_identity_udf = udf(mask_identity, StringType())

cleanseddf17 = cleanseddf16.withColumn(
    "masked_name",
    mask_identity_udf(col("full_name"))
)
display(cleanseddf17)

## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

**2. Filter (Selection)**<br>
Source File: DF of json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: DF of json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 50,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

**4. Format (Standardization)**<br>
Source File: DF of json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"‚Çπ30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" ‚Üí **"CHENNAI"**).

**5. Group & Aggregate (Summarization)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

**6. Sorting (Ordering)**<br>
Source File: DF of json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending).

**7. Limit (Top-N Analysis)**<br>
Source File: DF of json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
cleanseddf17_driver_view = cleanseddf11.select(
    "first_name",
    "role",
    "hub_location"
)
display(cleanseddf17_driver_view)
#shipment_df17
#Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.
operational_issues_df = shipment_df17.filter(
    col("shipment_status").isin("DELAYED", "RETURNED")
)
#Filter rows where age > 50
senior_staff_df = cleanseddf17.filter(
    col("age") > 50
)
display(senior_staff_df)
#is_high_value = True if shipment_cost > 50,000, else False
shipment_df17 = shipment_df17.withColumn(
    "is_high_value",
    when(col("shipment_cost") > 50000, True).otherwise(False)
)

#is_weekend = True if shipment day is Saturday or Sunday
shipment_df17 = shipment_df17.withColumn(
    "is_weekend",
    when(dayofweek(col("shipment_date")).isin(1, 7), True).otherwise(False)
)
display(shipment_df17)



- format_number(col, 2) ‚Üí adds commas + 2 decimals
- concat() ‚Üí safely prepends currency symbol
- Returns STRING (ideal for reports, dashboards, exports)
- **- groupBy().agg(count())**
- **groupBy().agg(sum())**

In [0]:
from pyspark.sql.functions import *
#Format shipment_cost to a string like: ‚Çπ30,695.80
#chennai ‚Üí CHENNAI
df = (shipment_df17.withColumn(
            "shipment_cost_fmt",
            concat(lit("‚Çπ"), format_number(col("shipment_cost"), 2))
        )
        .withColumn(
            "source_city_std",
            upper(col("source_city"))
        )
)
display(df)
#Group by hub_location and count number of staff (bestway)
staff_by_hub = cleanseddf11.groupBy("hub_location") \
    .agg(
        count(col("staff_first_name")).alias("staff_count")
    )
#secondway
staff_by_hub = cleanseddf11.groupBy("hub_location") \
    .count() \
    .withColumnRenamed("count", "staff_count")
display(staff_by_hub)
#Group by vehicle_type and sum shipment_weight_kg
fleet_capacity = shipment_df17.groupBy("vehicle_type") \
    .agg(sum("shipment_weight_kg").alias("total_weight_kg"))

expensive_shipments_df = shipment_df17.orderBy(col("shipment_cost").desc())
dispatch_schedule_df = shipment_df17.orderBy("shipment_date")
#Action: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.
critical_delays_df = (
    shipment_df17
        .filter(col("shipment_status") == "DELAYED")
        .orderBy(col("shipment_cost").desc())
        .limit(10)
)


## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> DF of logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> DF of logistics_shipment_detail_3000.json<br>
#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`.

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

### **2. Lookup**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Validation. Check if the `hub_location` in the staff file exists in the corporate `Master_City_List`.
* **Action:** Compare values against a reference list.

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup `hub_location` ("Pune") in a Master Latitude/Longitude table and enrich the dataset by adding `lat` and `long` columns for map plotting.

### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).

### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

1.1 Frequently Used Simple Joins (Inner, Left)

In [0]:
#Scenario: We only want to analyze completed work. Connect Staff to the Shipments they handled.
#Result: Returns only rows where a staff member is assigned to a valid shipment.
staff_df = cleanseddf12_str.alias("staff")
shipments_df = shipment_df5_str.alias("ship")
display(staff_df)
display(shipments_df)
performance_df = staff_df.join(
    shipments_df,
    staff_df.shipment_id == shipments_df.shipment_id,
    "inner"
)
display(performance_df)
#Scenario: Find out which staff members are currently idle (not assigned to any shipment).
#Action: Join staff_df (Left) with shipments_df (Right) on shipment_id. Filter where shipments_df.shipment_id is NULL.
idle_staff_df = (staff_df.join(shipments_df,staff_df.shipment_id == shipments_df.shipment_id,"left")
                .filter(col("ship.shipment_id").isNull()))
display(idle_staff_df)



1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)
- Goal: Find pairs of employees working in the same hub_location
- Join Type: SELF JOIN
- Condition:
- Same hub_location
- Different employees ‚Üí staff_id_A != staff_id_B

/*Self Join (Peer Finding):
Scenario: Find all pairs of employees working in the same hub_location.*/
Normal code flow if we have correct data
- a = staff_df.alias("a")
- b = staff_df.alias("b")
- peer_df = (a.join (b, on=col("a.hub_location") == col("b.hub_location"), how="inner"). filter(col("a.staff_id") != col("b.staff_id")))
> If you only want unique pairs (Alice‚ÄìBob but not Bob‚ÄìAlice):
>  .filter(col("a.staff_id") < col("b.staff_id"))
| Method     | Output         |
| ---------- | -------------- |
| row_number | 1,2,3,4        |
| mono ID    | 0,1,8589934592 |


In [0]:
cleanseddf11_str = cleanseddf11.withColumn(
    "shipment_id",
    col("shipment_id").cast("string")
)
#cleanseddf99_str = cleanseddf11_str.withColumn("staff_id", monotonically_increasing_id()) #IDs are unique but not continuous
#display(cleanseddf99_str)
window_spec = Window.orderBy("shipment_id")
cleanseddf12_str = cleanseddf11_str.withColumn("staff_id", row_number().over(window_spec))
display(cleanseddf12_str)
staff_A = cleanseddf12_str.alias("A") #self join
staff_B = cleanseddf12_str.alias("B")
peer_df = (staff_A.join(staff_B,col("A.hub_location") == col("B.hub_location"),"inner")
         .filter(col("A.staff_id") != col("B.staff_id"))) #staff_id_A not found in dataset so used shipment_id
display(peer_df)
#Avoid mirrored duplicate pairs (A‚ÄìB and B‚ÄìA)
#.filter(col("A.staff_id") < col("B.staff_id"))
peer_df1 = (staff_A.join(staff_B,col("A.hub_location") == col("B.hub_location"),"inner")
         .filter(col("A.staff_id") < col("B.staff_id"))) #staff_id_A not found in dataset so used shipment_id
display(peer_df1)

- Right Join (Orphan Data Check):
- Scenario: Identify shipments in the system that have no valid driver assigned (Data Integrity Issue).
- Action: Join staff_df (Left) with shipments_df (Right). Focus on NULLs on the left side.

In [0]:
window_spec = Window.orderBy("shipment_id")

shipment_df5_str = shipment_df5_str.withColumn(
    "driver_id",
    row_number().over(window_spec) #assign driver_id
)
display(shipment_df5_str)
#Spark SQL does not recognize Python DataFrame variable names.Only aliases created using .alias() can be used in column references.
staff = cleanseddf12_str.alias("staff")
ship  = shipment_df5_str.alias("ship")

orphan_shipments_df = (
    staff
    .join(
        ship,
        col("staff.staff_id") == col("ship.driver_id"),
        how="right"
    )
    .filter(col("staff.staff_id").isNull())
    .select(
        col("ship.driver_id"),
        col("staff.staff_id")
    )
)

display(orphan_shipments_df)

- Full Outer Join (Reconciliation):
- Scenario: A complete audit to find both idle drivers AND unassigned shipments in one view.
- Action: Perform a Full Outer Join on shipment_id.

In [0]:
#Create DataFrame aliases
staff_df = cleanseddf12_str.alias("staff")
shipments_df = shipment_df5_str.alias("ship")
display(shipments_df)
shipments_df.printSchema()
display(staff_df)
staff_df.printSchema()
#Perform the FULL OUTER JOIN
full_join_df = (
    staff_df.join(shipments_df,
        col("staff.shipment_id") == col("ship.shipment_id"),how="full")
)
#Add reconciliation status column
recon_df = (
    full_join_df
    .withColumn(
        "reconciliation_status",
        when(col("staff.staff_id").isNull(), "UNASSIGNED_SHIPMENT")
        .when(col("ship.shipment_id").isNull(), "IDLE_DRIVER")
        .otherwise("OK")
    )
)
#select
final_reconciliation_df = recon_df.select(
    col("staff.staff_id"),
    col("staff.shipment_id").alias("staff_shipment_id"),
    col("ship.shipment_id").alias("ship_shipment_id"),
    col("ship.driver_id"),
    col("reconciliation_status")
)
display(final_reconciliation_df)
  

Cartesian/Cross Join (Capacity Planning):

In [0]:
drivers_df = cleanseddf12_str.select(
    "staff_id",
    "first_name",
    "hub_location"
).alias("d")

pending_shipments_df = shipment_df5_str.select(
    "shipment_id",
    "driver_id",
    "shipment_cost"
).alias("s")
capacity_matrix_df = drivers_df.join(pending_shipments_df, how="cross")
display(capacity_matrix_df)

- 1.3 Advanced Joins (Semi and Anti)
- Left Semi Join (Existence Check):
- Scenario: "Show me the details of Drivers who have at least one shipment." (Standard filtering).
- Action: staff_df.join(shipments_df, "shipment_id", "left_semi").
- Benefit: Performance optimization; it stops scanning the right table once a match is found.
- üîπ What is a LEFT SEMI JOIN?
- A Left Semi Join:
- Returns only rows from the LEFT table
- Keeps a row if at least one matching row exists in the RIGHT table
- Does NOT return any columns from the right table
- Think of it as:
- ‚ÄúFilter the left table based on existence in the right table‚Äù

In [0]:
drivers_with_shipments_df = (staff_df.join(shipments_df,  staff_df.staff_id == shipments_df.driver_id,
                                           how="left_semi"))

display(drivers_with_shipments_df)

- What is a LEFT ANTI JOIN?
- A Left Anti Join:
- Returns only rows from the LEFT table
- Keeps rows ONLY if NO matching row exists in the RIGHT table
- Returns no columns from the right table
- Think of it as:
- ‚ÄúGive me rows from the left that do NOT exist on the right‚Äù

In [0]:
drivers_without_shipments_df = (staff_df.join(
        shipments_df,
        staff_df.staff_id == shipments_df.driver_id,
        how="left_anti"
    )
)

display(drivers_without_shipments_df)

- **Step 2: VALID lookup (LEFT SEMI JOIN)**
- ‚úî Keeps only staff rows
- ‚úî Includes staff whose hub_location exists in master
- ‚úî No unnecessary columns
- ‚úî Stops scanning master after first match
- **Step 3: INVALID lookup (LEFT ANTI JOIN)**
- ‚úî Returns staff whose hub_location does NOT exist in master
- ‚úî Perfect for data quality failure detection
- ‚úî Cleaner than LEFT JOIN + IS NULL

In [0]:
#1. Lookup
staff_df = staff_df.alias("staff")          # merged staff dataframe
master_city_df = shipment_df.alias("master")  # acts as Master_City_List
#Step 2: VALID lookup (hub_location exists in master)
valid_staff_df = (
    staff_df
    .join(master_city_df,staff_df.hub_location == master_city_df.source_city,
        how="left_semi"
    )
)

display(valid_staff_df)
#Step 3: INVALID lookup (hub_location NOT in master)
#Use LEFT ANTI JOIN (lookup failure)
invalid_staff_df = (
    staff_df.join(master_city_df,staff_df.hub_location == master_city_df.source_city,
        how="left_anti"
    )
)
display(invalid_staff_df)

3. Lookup & Enrichment

4.** Schema Modeling (Denormalization)**
- Denormalization means:
- Intentionally combining multiple related tables into a single wide table by duplicating dimension attributes.
> - 3.**Design Principle (IMPORTANT)**
- Shipments = FACT table
- Staff = DIMENSION
- Use LEFT JOIN so no shipment is lost
- Gold tables are denormalized & analytics
> - 2.**How Denormalization is Done (Technically)**
- Step-by-step pattern
- Choose Fact table (Shipments)
- LEFT JOIN all Dimensions (Staff, Vehicle, City, etc.)
- Select required columns
- Rename columns for business clarity
- Persist as Gold table

In [0]:
#Step 1: Prepare & Alias DataFrames
from pyspark.sql.functions import col

staff_df = cleanseddf12_str.alias("staff")
shipments_df = shipment_df5_str.alias("ship")
#Step 2: Join Shipments ‚Üí Staff (Driver Dimension)
shipment_staff_df = (
    shipments_df
    .join(
        staff_df,
        col("ship.driver_id") == col("staff.staff_id"),
        how="left"
    )
)
#Step 3: Flatten into a Gold (Wide) Schema
wide_shipment_history_df = shipment_staff_df.select(
    # ‚îÄ‚îÄ Shipment facts ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    col("ship.shipment_id"),
    col("ship.order_id"),
    col("ship.shipment_date"),
    col("ship.shipment_status"),
    col("ship.shipment_cost"),
    col("ship.shipment_weight_kg"),
    col("ship.cargo_type"),
    col("ship.payment_mode"),
    col("ship.is_expedited"),
    col("ship.source_city"),
    col("ship.destination_city"),
    col("ship.vehicle_type"),

    # ‚îÄ‚îÄ Driver / Staff attributes ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    col("staff.staff_id"),
    col("staff.first_name").alias("driver_first_name"),
    col("staff.last_name").alias("driver_last_name"),
    col("staff.age").alias("driver_age"),
    col("staff.role").alias("driver_role"),
    col("staff.hub_location").alias("driver_hub"),

    # ‚îÄ‚îÄ Audit columns ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    col("ship.domain"),
    col("ship.ingestion_timestamp")
)

display(wide_shipment_history_df)
#Step 4: Persist as Gold Layer Table
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
wide_shipment_history_df.write.mode("overwrite").saveAsTable(
    "gold.wide_shipment_history"
)
spark.sql("SHOW TABLES IN gold").show()

5. Windowing (Ranking & Trends)
- Source Files:
- DF of logistics_source2: Provides hub_location (Partition Key).
- logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)
- Scenario: "Who are the Top 3 Drivers by Cost in each Hub?"
- For dense_rank
- ‚ÄúI use dense_rank() when business wants to keep ties, even if it returns more than N results.‚Äù
- For row_number
- ‚ÄúI use row_number() when I need a strict Top-N result per partition.‚Äù

In [0]:
#Step 1: Join Staff + Shipments (to get cost per driver)
staff_df = cleanseddf12_str.alias("staff")
shipments_df = shipment_df5_str.alias("ship")

staff_shipments_df = (
    staff_df
    .join(
        shipments_df,
        col("staff.staff_id") == col("ship.driver_id"),
        how="inner"
    )
)
#Step 2: Aggregate total shipment cost per driver per hub
driver_cost_df = (
    staff_shipments_df
    .groupBy(
        col("staff.hub_location"),
        col("staff.staff_id"),
        col("staff.first_name"),
        col("staff.last_name")
    )
    .agg(
        sum("ship.shipment_cost").alias("total_shipment_cost")
    )
)
#Step 3: Define Window (Partition + Order)
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, row_number

window_spec = (
    Window
    .partitionBy("hub_location")
    .orderBy(col("total_shipment_cost").desc())
)
#Step 4A: Use dense_rank() (ties allowed)
ranked_dense_df = (
    driver_cost_df
    .withColumn("dense_rank", dense_rank().over(window_spec))
)
#Filter Top 3
top3_dense_df = ranked_dense_df.filter(col("dense_rank") <= 3)
display(top3_dense_df)
#Step 4B: Use row_number() (strict Top 3)
ranked_row_df = (
    driver_cost_df
    .withColumn("row_number", row_number().over(window_spec))
)
#Filter Top 3
top3_row_df = ranked_row_df.filter(col("row_number") <= 3)
display(top3_row_df)

6Ô∏è‚É£ **Analytical Functions ‚Äì Lead / Lag**
- Scenario
- For each driver, calculate the number of days elapsed since their previous shipment.
- Source DF: logistics_shipment_detail_3000.json
- Key: driver_id
- Ordering column: shipment_date
- ‚ÄúIdle time per driver is calculated using lag() over a window partitioned by driver and ordered by shipment date.

In [0]:
#idle_days = current_shipment_date ‚àí previous_shipment_date
from pyspark.sql.functions import col, to_date
shipments_df.select("shipment_date").distinct().show(10, truncate=False)
#Convert it to date type
shipments_df = shipment_df5_str.withColumn(
    "shipment_date",
    to_date(col("shipment_date"), "yyyy-MM-dd")
)
#Step 2: Define Window (per driver, ordered by date)
from pyspark.sql.window import Window

window_spec = (
    Window
    .partitionBy("driver_id")
    .orderBy("shipment_date")
)
#Step 3: Use lag() to get previous shipment date
from pyspark.sql.functions import lag

shipments_with_prev_df = shipments_df.withColumn(
    "prev_shipment_date",
    lag("shipment_date").over(window_spec)
)
#Step 4: Calculate idle days
idle_time_df = shipments_with_prev_df.withColumn(
    "idle_days",
    datediff(col("shipment_date"), col("prev_shipment_date"))
)
idle_time_final_df = idle_time_df.select(
    "driver_id",
    "shipment_id",
    "shipment_date",
    "prev_shipment_date",
    "idle_days"
)
#If each driver truly has one shipment: ,Idle time cannot be computed, NULL is the correct output.
display(idle_time_final_df)

- Option 2Ô∏è‚É£ Compute idle time per HUB instead (useful alternative)
- If business wants hub activity gaps instead of driver gaps:

In [0]:
from pyspark.sql.functions import lag, datediff, to_date, col

df = shipment_df5_str.withColumn(
    "shipment_date",
    to_date(col("shipment_date"))
)

window_spec = (
    Window
    .partitionBy("source_city")
    .orderBy("shipment_date")
)

hub_idle_df = df.withColumn(
    "prev_shipment_date",
    lag("shipment_date").over(window_spec)
).withColumn(
    "idle_days",
    datediff(col("shipment_date"), col("prev_shipment_date"))
)

display(hub_idle_df.select(
    "source_city",
    "shipment_id",
    "shipment_date",
    "prev_shipment_date",
    "idle_days"
))

**Set Operations (Union / Intersect / Except)**
- Union
- ‚ÄúUnion combines datasets vertically.‚Äù
- Intersect
- ‚ÄúIntersect identifies common rows across datasets.‚Äù
- Except
- ‚ÄúExcept finds rows missing in another dataset.‚Äù
- Semi Join
- ‚ÄúSemi joins perform optimized existence checks.‚Äù
- Anti Join
- ‚ÄúAnti joins detect non-matching records efficiently.‚Äù
- 1.Union        ‚Üí combine
- 2.Intersect    ‚Üí common
- 3.Except       ‚Üí difference
- 4.Semi Join    ‚Üí exists
- 5.Anti Join    ‚Üí not exists

In [0]:
df_system1 = rawdf1.withColumn("data_source", lit("system1"))  # Legacy
df_system2 = rawdf2.withColumn("data_source", lit("system2"))  # Modern
#UNION / UNION BY NAME
rawdf_merged = df_system1.unionByName(
    df_system2,
    allowMissingColumns=True
)
#INTERSECT
staff_keys_sys1 = df_system1.select("shipment_id")
staff_keys_sys2 = df_system2.select("shipment_id")
common_staff_df = staff_keys_sys1.intersect(staff_keys_sys2)
display(common_staff_df)
#EXCEPT (DIFFERENCE)
new_hires_df = staff_keys_sys2.exceptAll(staff_keys_sys1)
display(new_hires_df)

Grouping & Aggregations (Advanced)
> CUBE / ROLLUP generate multiple aggregation levels in a single query, instead of writing separate GROUP BYs.
- CUBE
- ‚ÄúCube generates all possible subtotals across dimensions, including the grand total.‚Äù
- ROLLUP
- ‚ÄúRollup generates hierarchical subtotals and a grand total.‚Äù

- Scenario: The CFO wants a subtotal report at multiple levels:
- Total Cost by Hub.
- Total Cost by Hub AND Vehicle Type.
- Grand Total.
- Action: Use cube("hub_location", "vehicle_type") or rollup() to generate all these subtotals in a single query.

- GROUP BY (Baseline)
| hub_location | vehicle_type | total_cost |
| ------------ | ------------ | ---------- |
| Pune         | Truck        | 300        |
| Pune         | Van          | 150        |
| Delhi        | Truck        | 300        |
| Delhi        | Van          | 100        |
- ROLLUP(hub_location, vehicle_type)
- ROLLUP gives hierarchical totals, from left ‚Üí right.
| hub_location | vehicle_type | total_cost | Meaning         |
| ------------ | ------------ | ---------- | --------------- |
| Pune         | Truck        | 300        | Pune + Truck    |
| Pune         | Van          | 150        | Pune + Van      |
| Pune         | NULL         | 450        | **Total Pune**  |
| Delhi        | Truck        | 300        | Delhi + Truck   |
| Delhi        | Van          | 100        | Delhi + Van     |
| Delhi        | NULL         | 400        | **Total Delhi** |
| NULL         | NULL         | 850        | **Grand Total** |
- CUBE(hub_location, vehicle_type)
- CUBE gives ALL possible combinations of the dimensions.
| hub_location | vehicle_type | total_cost | Meaning         |
| ------------ | ------------ | ---------- | --------------- |
| Pune         | Truck        | 300        | Pune + Truck    |
| Pune         | Van          | 150        | Pune + Van      |
| Pune         | NULL         | 450        | **Total Pune**  |
| Delhi        | Truck        | 300        | Delhi + Truck   |
| Delhi        | Van          | 100        | Delhi + Van     |
| Delhi        | NULL         | 400        | **Total Delhi** |
| NULL         | Truck        | 600        | **Truck Total** |
| NULL         | Van          | 250        | **Van Total**   |
| NULL         | NULL         | 850        | **Grand Total** |


In [0]:
#Step 1: Join Staff + Shipments (bring dimensions + metric together)
staff_df = cleanseddf12_str.alias("staff")
shipments_df = shipment_df5_str.alias("ship")

fact_df = (
    staff_df
    .join(
        shipments_df,
        col("staff.staff_id") == col("ship.driver_id"),
        how="inner"
    )
)
display(fact_df)
#Step 2A: Use CUBE (ALL possible subtotal combinations)
cube_df = (
    fact_df
    .cube(
        col("staff.hub_location"),
        col("staff.vehicle_type")
    )
    .agg(
        sum(col("ship.shipment_cost")).alias("total_shipment_cost")
    )
)

display(cube_df)

display(cube_df)
#Step 2B: Use ROLLUP (Hierarchical subtotals)
rollup_df = (
    fact_df
    .rollup(
        col("staff.hub_location"),
        col("staff.vehicle_type")
    )
    .agg(
        sum(col("ship.shipment_cost")).alias("total_shipment_cost")
    )
)
display(rollup_df)

##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

In [0]:

final_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.cost_subtotals_by_hub_vehicle")
wide_shipment_history_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.wide_shipment_history")
spark.sql("SHOW TABLES IN gold").show(truncate=False) 
spark.table("gold.wide_shipment_history").printSchema()
display(spark.sql("""SELECT * FROM gold.wide_shipment_history"""))   

##7.Take the copy of the above notebook and try to write the equivalent SQL for which ever applicable.