#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](https://raw.githubusercontent.com/mohamedirfankader/databricks-code-repo/main/4_logistics_usecase/logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("BB2_Use_Case2").getOrCreate()
#RawDf=spark.read.csv("/Volumes/we47catalog/we47schema/we47_volume/logistics_source1",header=True,inferSchema=True)
#display(RawDf)
'''#Manual Analysis
   1.last Name Cannot be NULL 
   2.Age Cannot be Null
   3.Role Cannot be Null
   4.First Cannot be Null
   5.All Rows Cannot be Null
   6.Shipment and Age Cannot be string
   7.FirstName,LastName,age,role should not be null
'''


####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
from pyspark.sql.types import *
schema = StructType([StructField("Shipment_id",IntegerType(),nullable=False),
                     StructField("First_Name",StringType(),True),
                     StructField("Last_Name",StringType(),True),
                     StructField("Age",IntegerType(),True),
                     StructField("Role",StringType(),True)])
RawDf=spark.read.schema(schema).csv("/Volumes/we47catalog/we47schema/we47_volume/logistics_source1").toDF("Shipment_id","First_Name","Last_Name","Age","Role")
RawDf.printSchema()
display(RawDf)
print('RawCount->',RawDf.count())
print('Distinct RawCount->',RawDf.distinct().count())
print('Deduplicated on ShipmentID',RawDf.dropDuplicates(["shipment_id"]).count())
print(RawDf.columns)
print(RawDf.dtypes)
print(RawDf.schema)
#display(RawDf.describe())

###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark=SparkSession.builder.appName("BB2_Use_Case2").getOrCreate()
Src1=spark.read.csv("/Volumes/we47catalog/we47schema/we47_volume/logistics_source1",header=True)
Src2=spark.read.csv("/Volumes/we47catalog/we47schema/we47_volume/logistics_source2",header=True)
#Shipment IDs that appear in both master_v1 and master_v2
Src3=Src1.join(Src2,how='inner',on='shipment_id')
#display(Src3)
#1.shipment_id is non-numeric
filterShipNonNumericDf=Src1.filter(col("shipment_id").rlike("^[a-zA-Z]+$"))
#display(filterShipNonNumericDf)
#2.age is not an integer
filterAgeNonNumericDf=Src1.filter(col("age").rlike("^[a-zA-Z]+$"))
#display(filterAgeNonNumericDf)
#3.fewer columns than expected
#fewer_cols=Src1.filter(size(split(col("value"), ",")) < 5)
print("after scrubbing, count of data",len(Src1.collect()))
#display(fewer_cols)

In [0]:
#Create a Spark Session Object

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
#Read both files without enforcing schema
from pyspark.sql.functions import *
Src1=spark.read.csv("/Volumes/we47catalog/we47schema/we47_volume/logistics_source1",header=True)
#display(Src1)
Src2=spark.read.csv("/Volumes/we47catalog/we47schema/we47_volume/logistics_source2",header=True)
#display(Src2)
Src= Src1.withColumn("data_Source",lit("system1")).unionByName(Src2.withColumn("data_Source",lit("system2")),allowMissingColumns=True).select("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","data_Source")
#Src=Src.select("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","data_Source")
display(Src)


#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
#print('Before Null filter', Src.filter(col("Shipment_id").isNull()).count())
#1.Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
Src = Src.filter(col("Shipment_id").isNotNull() | col("role").isNotNull())
#print('After Null filter', Src.select("Shipment_id").distinct().count())
#display(Src)
#2.Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
Src=Src.filter((col("first_name").isNotNull() & col("last_name").isNotNull()))
#display(Src)

#Scrubbing (convert raw to tidy)
#4. Age Defaulting Rule - Fill NULL values in the age column with: -1
Src=Src.withColumn("age",coalesce(col("age"),lit('-1')))
#5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
Src = Src.withColumn("vehicle_type",coalesce(col("vehicle_type"),lit("UNKNOWN")))
#display(Src)
#6. Invalid Age Replacement - Replace the following values in age: "ten" to -1 "" to -1
Src = Src.withColumn("age",when(col("age").rlike("^[a-zA-Z]+$"),'-1').otherwise(col("age")))
#display(Src)
#7. Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
Src = Src.withColumn("vehicle_type",when(col("vehicle_type")=='Truck','LMV').when(col("vehicle_type")=='Bike','TwoWheeler').otherwise(col("vehicle_type")))
#display(Src)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
#1.Create a DF by Reading Data from logistics_shipment_detail.json
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("LogisiticsShipmentDetails").getOrCreate()
schema=StructType([StructField("shipment_id",IntegerType(),True),
                   StructField("order_id",StringType(),True),
                   StructField("source_city",StringType(),True),
                   StructField("destination_city",StringType(),True),
                   StructField("shipment_status",StringType(),True),
                   StructField("cargo_type",StringType(),True),
                   StructField("vehicle_type",StringType(),True),
                   StructField("payment_mode",StringType(),True),
                   StructField("shipment_weight_kg",DoubleType(),True),
                   StructField("shipment_cost",DoubleType(),True),
                   StructField("shipment_date",StringType(),True)])


SrcJsonDf=spark.read.option("multiline","true").schema(schema).json("/Volumes/we47catalog/we47schema/we47_volume/logistics_shipment_detail.json")
#display(SrcJsonDf)


Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>
3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
'''1.Add a column
    Source File: DF of logistics_shipment_detail_3000.json
    : domain as 'Logistics', current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'''
from pyspark.sql.functions import *
SrcJsonDf = SrcJsonDf.withColumns({"domain":lit('Logistics'),"ingestion_timestamp":current_timestamp(),"is_expedited":lit('False')})
#display(SrcJsonDf)

In [0]:

#2.Column Uniformity: role - Convert to lowercase
Src1=Src1.withColumn("role", lower(col("role")))
#display(Src1)
#SrcJsonDf = SrcJsonDf.
#display(SrcJsonDf)

In [0]:
#2.1 Source File: DF of merged(logistics_source1 & logistics_source2)
#vehicle_type - Convert values to UPPERCASE
Src=Src.withColumn("vehicle_type",upper(col("vehicle_type")))
#display(Src)

In [0]:
#2.2 Source Files: DF of logistics_shipment_detail_3000.json hub_location - Convert values to initcap case
Src=Src.withColumn("hub_location",initcap(col("hub_location")))
#display(Src)

Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
#Apply Record Level De-Duplication
print(Src.count())
print(Src.distinct().count())
Src=Src.distinct()
#display(Src)
print(SrcJsonDf.count())
print(SrcJsonDf.distinct().count())
SrcJsonDf=SrcJsonDf.distinct()
#display(SrcJsonDf)

In [0]:
#2.Apply Column Level De-Duplication (Primary Key Enforcement)
print('Src Total shipment_id Cnt',Src.select("shipment_id").count())
print('Src Unique shipment_id Cnt',Src.select("shipment_id").distinct().count())
Src=Src.dropDuplicates(subset=["shipment_id"])
#display(Src)
print('SrcJsonDF Total shipment_id Cnt',SrcJsonDf.select("shipment_id").count())
print('SrcJsonDF Unique shipment_id Cnt',SrcJsonDf.select("shipment_id").distinct().count())
SrcJsonDf=SrcJsonDf.dropDuplicates(subset=["shipment_id"])
#display(SrcJsonDf)

##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
'''1. Add Audit Timestamp (load_dt) Source File: DF of logistics_source1 and logistics_source2
Scenario: We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
Action: Add a column load_dt using the function current_timestamp().'''
#Src1=Src1.withColumn("load_dt",current_timestamp())
#display(Src1)
#Src2=Src2.withColumn("load_dt",current_timestamp())
#display(Src2)
Src=Src.withColumn("load_dt",current_timestamp())
#display(Src)

In [0]:
'''2. Create Full Name (full_name) Source File: DF of logistics_source1 and logistics_source2
Scenario: The reporting dashboard requires a single field for the driver's name instead of separate columns.
Action: Create full_name by concatenating first_name and last_name with a space separator.
Result: "Rajesh" + " " + "Kumar" -> "Rajesh Kumar"'''
Src=Src.withColumn("full_name",concat_ws(" ",col("first_name"),col("last_name")))
#display(Src)


In [0]:
'''3. Define Route Segment (route_segment) Source File: DF of logistics_shipment_detail_3000.json
Scenario: The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
Action: Combine source_city and destination_city with a hyphen.
Result: "Chennai" + "-" + "Pune" -> "Chennai-Pune"'''
SrcJsonDf=SrcJsonDf.withColumn("src_to_desc",concat_ws("-",col("source_city"),col("destination_city")))
#display(SrcJsonDf)

In [0]:
'''4. Generate Vehicle Identifier (vehicle_identifier) Source File: DF of logistics_shipment_detail_3000.json
Scenario: We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
Action: Combine vehicle_type and shipment_id to create a composite key.
Result: "Truck" + "_" + "500001" -> "Truck_500001"'''

SrcJsonDf=SrcJsonDf.withColumn("vehicle_identifier",concat_ws("_",col("vehicle_type"),col("shipment_id")))
#display(SrcJsonDf)

###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

**4. Flag shipment status (`is_expedited`)**
* **Scenario:** The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
* **Action:** Flag as **'True'** if the `shipment_status` IN_TRANSIT or DELIVERED.

In [0]:
'''1. Derive Shipment Year (shipment_year)
Scenario: Management needs an annual performance report to compare growth year-over-year.
Action: Extract the year component from shipment_date.
Result: "2024-04-23" -> 2024'''
SrcJsonDf=SrcJsonDf.withColumn("shipment_year",year(try_to_date(col("shipment_date"),'yy-MM-dd')))
#display(SrcJsonDf)

In [0]:
'''2. Derive Shipment Month (shipment_month)
Scenario: Analysts want to identify seasonal peaks (e.g., increased volume in December).
Action: Extract the month component from shipment_date.
Result: "2024-04-23" -> 4 (April)'''
SrcJsonDf=SrcJsonDf.withColumn("Shipment_month",month(try_to_date(col("shipment_date"),'yy-MM-dd')))
#display(SrcJsonDf)

In [0]:
'''3. Flag Weekend Operations (is_weekend)
Scenario: The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
Action: Flag as 'True' if the shipment_date falls on a Saturday or Sunday.'''

SrcJsonDf=SrcJsonDf.withColumn("is_weekend",date_format(try_to_date(col("shipment_date"),"yy-MM-dd"),"EEE").isin("Sat","Sun"))
#display(SrcJsonDf)

In [0]:
'''4. Flag shipment status (is_expedited)
Scenario: The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
Action: Flag as 'True' if the shipment_status IN_TRANSIT or DELIVERED.'''
SrcJsonDf=SrcJsonDf.withColumn("is_expedited",when(col("shipment_status").isin('IN_TRANSIT','DELIVERED'),lit('True')).otherwise(lit('False')))
#display(SrcJsonDf)

###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:
'''1. Calculate Unit Cost (cost_per_kg)

Scenario: The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
Action: Divide shipment_cost by shipment_weight_kg.
Logic: shipment_cost / shipment_weight_kg'''

SrcJsonDf=SrcJsonDf.withColumn("cost_per_kg",try_divide(col("shipment_cost"),(col("shipment_weight_kg"))))
#display(SrcJsonDf)

In [0]:
'''2. Track Shipment Age (days_since_shipment)
Scenario: The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
Action: Calculate the difference in days between the current_date and the shipment_date.
Logic: datediff(current_date(), shipment_date)'''

SrcJsonDf=SrcJsonDf.withColumn("days_since_shipment",datediff(current_date(),to_date(try_to_date(col("shipment_date"),'yy-MM-dd'),'yyyy-MM-dd')))
#display(SrcJsonDf)

In [0]:
'''3. Compute Tax Liability (tax_amount)
Scenario: For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
Action: Calculate 18% GST on the total shipment_cost.
Logic: shipment_cost * 0.18'''

SrcJsonDf=SrcJsonDf.withColumn("tax_amount",round(col("shipment_cost") * 0.18,2))
#display(SrcJsonDf)

###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: DF of logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:
'''1. Remove Redundant Name Columns
Scenario: Since we have already created the full_name column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
Action: Drop the first_name and last_name columns.
Logic: df.drop("first_name", "last_name")'''

Src=Src.drop("first_name","last_name")
#Src=Src.select("shipment_id","age","role","hub_location","vehicle_type","data_Source","load_dt","full_name")
#Src=Src.selectExpr("shipment_id","age","role","hub_location","vehicle_type","data_Source","load_dt","full_name")
#display(Src)


##### Splitting & Merging/Melting of Columns
*Reshaping columns to extract hidden values or combine fields for better analysis.*<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

In [0]:
'''1. Splitting (Extraction) Breaking one column into multiple to isolate key information.
Split Order Code:
Action: Split order_id ("ORD100000") into two new columns:
order_prefix ("ORD")
order_sequence ("100000")'''
#SrcJsonDf=SrcJsonDf.withColumn("order_prefix",substring(col("order_id"),1,3))
SrcJsonDf=SrcJsonDf.withColumn("order_prefix",regexp_extract(col("order_id"), r"^([A-Z_]+?)[0-9]+$", 1))
#SrcJsonDf=SrcJsonDf.withColumn("order_sequence",substring(col("order_id"),4,6))
SrcJsonDf = SrcJsonDf.withColumn("order_sequence", regexp_extract(col("order_id"), r"^([A-Z_]+?)([0-9]+)$", 2))
#display(SrcJsonDf)

In [0]:
'''Split Date:
Action: Split shipment_date into three separate columns for partitioning:
ship_year (2024)
ship_month (4)
ship_day (23)'''
SrcJsonDf=SrcJsonDf.withColumns({"ship_year":year(try_to_date(col("shipment_date"),'yy-MM-dd')),
                               "ship_month":month(try_to_date(col("shipment_date"),'yy-MM-dd')),
                               "ship_day":day(try_to_date(col("shipment_date"),'yy-MM-dd'))}
                               )
#display(SrcJsonDf)

In [0]:
'''2. Merging (Concatenation) Combining multiple columns into a single unique identifier or description.

Create Route ID:
Action: Merge source_city ("Chennai") and destination_city ("Pune") to create a descriptive route key:
route_lane ("Chennai->Pune")'''
SrcJsonDf=SrcJsonDf.withColumn("route_lane",concat_ws('->',col("source_city"),col("destination_city")))
#display(SrcJsonDf)

## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
from pyspark.sql.functions import udf
def calculate_bonus(role, age):
    if role == 'Driver' and age >= 30:
        return (15/100)
    elif role == 'Driver' and age < 30:
        return (10/100)
    else:
        return (0)

user_def_fun = udf(calculate_bonus)
#RawDf=RawDf.withColumn("projected_bonus",user_def_fun(col("role"),col("age")))
#RawDf.show(5)

In [0]:
def mask_identity(name):
    return name[0] + '*' * (len(name) - 1) + name[-1]

mask_identity_udf = udf(mask_identity)

#Src=Src.withColumn("masked_name",mask_identity_udf(col("full_name")))
#Src.show(2)

## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

**2. Filter (Selection)**<br>
Source File: DF of json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: DF of json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 50,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

**4. Format (Standardization)**<br>
Source File: DF of json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"₹30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).

**5. Group & Aggregate (Summarization)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

**6. Sorting (Ordering)**<br>
Source File: DF of json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending) then `priority_flag` (Descending).

**7. Limit (Top-N Analysis)**<br>
Source File: DF of json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
'''1. Select (Projection)
Source Files: DF of logistics_source1 and logistics_source2

Scenario: The Driver App team only needs location data, not sensitive HR info.
Action: Select only first_name, role, and hub_location.'''
#Src=Src.select("full_name","role","hub_location")
#display(Src)

In [0]:
'''2. Filter (Selection)
Source File: DF of json

Scenario: We need a report on active operational problems.
Action: Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.

Scenario: Insurance audit for senior staff.
Action: Filter rows where age > 50.'''

SrcJsonDf=SrcJsonDf.filter((col("shipment_status")=="DELAYED")|(col("shipment_status")=="RETURNED"))
#display(SrcJsonDf)
Src=Src.filter(col("age")>50)
#display(Src)

In [0]:
'''3. Derive Flags & Columns (Business Logic)
Source File: DF of json

Scenario: Identify high-value shipments for security tracking.
Action: Create flag is_high_value = True if shipment_cost > 50,000.
Scenario: Flag weekend operations for overtime calculation.
Action: Create flag is_weekend = True if day is Saturday or Sunday.'''

SrcJsonDf=SrcJsonDf.withColumn("is_high_value",when(col("shipment_cost")>49000,True).otherwise(False))
display(SrcJsonDf)
SrcJsonDf=SrcJsonDf.withColumn("is_weekend1",date_format(try_to_date(col("shipment_date"),"yy-MM-dd"),"EEE").isin("Sat","Sun"))
display(SrcJsonDf)

In [0]:
'''4. Format (Standardization)
Source File: DF of json

Scenario: Finance requires readable currency formats.
Action: Format shipment_cost to string like "₹30,695.80".
Scenario: Standardize city names for reporting.
Action: Format source_city to Uppercase (e.g., "chennai" → "CHENNAI").'''

SrcJsonDf=SrcJsonDf.withColumn("formatted_amount",concat(lit("₹"), format_number(col("shipment_cost"), 2)))
#display(SrcJsonDf)

SrcJsonDf=SrcJsonDf.withColumn("source_city",upper(col("source_city")))
#display(SrcJsonDf)


In [0]:
'''5. Group & Aggregate (Summarization)
Source Files: DF of logistics_source1 and logistics_source2

Scenario: Regional staffing analysis.
Action: Group by hub_location and Count the number of staff.
Scenario: Fleet capacity analysis.
Action: Group by vehicle_type and Sum the shipment_weight_kg.'''

grpSrc=Src.groupBy("hub_location").agg(count("role").alias("Staff_count"))
#display(grpSrc)

GrpJsonDf=SrcJsonDf.groupBy("vehicle_type").agg(sum("shipment_weight_kg").alias("Capacity_Analysis"))
display(GrpJsonDf)

In [0]:
'''6. Sorting (Ordering)
Source File: DF of json

Scenario: Prioritize the most expensive shipments.
Action: Sort by shipment_cost in Descending order.
Scenario: Organize daily dispatch schedule.
Action: Sort by shipment_date (Ascending) then priority_flag (Descending). '''
SrcJsonDf=SrcJsonDf.orderBy(col("shipment_cost"),ascending=False)
#display(SrcJsonDf)
SrcJsonDf=SrcJsonDf.orderBy(col("shipment_date"),col("is_high_value"),ascending=[True,False])
display(SrcJsonDf)

In [0]:
'''7. Limit (Top-N Analysis)
Source File: DF of json

Scenario: Dashboard snapshot of critical delays.
Action: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.'''

SrcJsonDf=SrcJsonDf.filter(col("shipment_status")=='DELAYED').orderBy(col("shipment_cost"),ascending=True).limit(10)
display(SrcJsonDf)

## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> DF of logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> DF of logistics_shipment_detail_3000.json<br>
#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`.

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

### **2. Lookup**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Validation. Check if the `hub_location` in the staff file exists in the corporate `Master_City_List`.
* **Action:** Compare values against a reference list.

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup `hub_location` ("Pune") in a Master Latitude/Longitude table and enrich the dataset by adding `lat` and `long` columns for map plotting.

### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).

### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

In [0]:
'''1. Joins
Source Files:
Left Side (staff_df):
DF of logistics_source1 & logistics_source2
Right Side (shipments_df):
DF of logistics_shipment_detail_3000.json'''

staff_df=Src
#display(staff_df)
shipments_df=SrcJsonDf



In [0]:
'''1.1 Frequently Used Simple Joins (Inner, Left)
Inner Join (Performance Analysis):
Scenario: We only want to analyze completed work. Connect Staff to the Shipments they handled.
Action: Join staff_df and shipments_df on shipment_id.
Result: Returns only rows where a staff member is assigned to a valid shipment.
Left Join (Idle Resource check):
Scenario: Find out which staff members are currently idle (not assigned to any shipment).
Action: Join staff_df (Left) with shipments_df (Right) on shipment_id. Filter where shipments_df.shipment_id is NULL.'''

Inner_df=staff_df.join(shipments_df,how='inner',on='shipment_id')
#display(full_df)
left_df=staff_df.alias('st').join(shipments_df.alias('sh'),how='left',on='shipment_id').filter(col("sh.shipment_id").isNull())
#display(left_df)


In [0]:
'''1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)
Self Join (Peer Finding):
Scenario: Find all pairs of employees working in the same hub_location.
Action: Join staff_df to itself on hub_location, filtering where staff_id_A != staff_id_B.
Right Join (Orphan Data Check):
Scenario: Identify shipments in the system that have no valid driver assigned (Data Integrity Issue).
Action: Join staff_df (Left) with shipments_df (Right). Focus on NULLs on the left side.
Full Outer Join (Reconciliation):
Scenario: A complete audit to find both idle drivers AND unassigned shipments in one view.
Action: Perform a Full Outer Join on shipment_id.
Cartesian/Cross Join (Capacity Planning):
Scenario: Generate a schedule of every possible driver assignment to every pending shipment to run an optimization algorithm.
Action: Cross Join drivers_df and pending_shipments_df.'''
selfstaff_df = staff_df.alias("A").join(staff_df.alias("B"),how='semi',on='hub_location')
#display(selfstaff_df)
rightjoin = staff_df.alias('st').join(shipments_df.alias('sh'),how='right',on='shipment_id').filter(col("st.shipment_id").isNull())
#display(rightjoin)
fulljoin = staff_df.alias('st').join(shipments_df.alias('sh'),how='full',on='shipment_id')
#display(fulljoin)
crossjoin = staff_df.alias('st').join(shipments_df.alias('sh'),how='cross',on='shipment_id')
#display(crossjoin)


In [0]:
'''1.3 Advanced Joins (Semi and Anti)
Left Semi Join (Existence Check):
Scenario: "Show me the details of Drivers who have at least one shipment." (Standard filtering).
Action: staff_df.join(shipments_df, "shipment_id", "left_semi").
Benefit: Performance optimization; it stops scanning the right table once a match is found.
Left Anti Join (Negation Check):
Scenario: "Show me the details of Drivers who have never touched a shipment."
Action: staff_df.join(shipments_df, "shipment_id", "left_anti").'''

leftsemidf = staff_df.join(shipments_df, "shipment_id", "left_semi")
#display(leftsemidf)
leftantidf = staff_df.join(shipments_df, "shipment_id", "left_anti")
display(leftantidf)

##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

##7.Take the copy of the above notebook and try to write the equivalent SQL for which ever applicable.