#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![](./logistics_project.png)

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

**Source 1: Logistics Shipment Data (JSON Format)**
- Data is received from the source system in JSON[Semi strcutured format]
- key–value pairs

**Source 2: Logistics Data (CSV Format – 4 Columns)**
- Data is received in CSV format with 4 columns
- Header present, no footer
- Null columns and null records are there
- Data format inconsistencies observed like age contain string value
- Includes additional column(s)

**Source 3: Logistics Data (CSV Format – 7 Columns)**
- Data is received in CSV format with 7 columns
- Header present, no footer
- Contains duplicate records
- Null columns and null records are there
- Data format inconsistencies observed like age contain string value
- Includes additional column(s)

####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)

1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
source1_df=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/logistics_source1",header=True,inferSchema=True).toDF("Shipment_id","First_Name","Last_Name","Age","Role")

print(source1_df.printSchema())

display(source1_df.show(10,False))
display(source1_df.columns)
display(source1_df.dtypes) #Age is in string format and shippment ID is in string type
dedup_df = source1_df.distinct()
removed_rows = source1_df.exceptAll(dedup_df)
removed_rows.show(truncate=False)

print("Original count:", source1_df.count())
print("After distinct:", dedup_df.count())
print("Duplicates removed:", source1_df.count() - dedup_df.count())
print("de-duplicated given id column count:",source1_df.dropDuplicates(['Shipment_id']).count())


source2_df=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/logistics_source2",header=True)
display(source2_df.show(10,False))
display(source2_df.columns)
display(source2_df.dtypes) #Age is in string format and shippment ID is in string type
dedup_df1 = source2_df.distinct()
removed_rows1 = source2_df.exceptAll(dedup_df1)
removed_rows1.show(truncate=False)

print("Original count:", source2_df.count())
print("After distinct:", dedup_df1.count())
print("Duplicates removed:", source2_df.count() - dedup_df1.count())
print("de-duplicated given id column count:",source2_df.dropDuplicates(['Shipment_id']).count())

display(source1_df.summary())
display(source2_df.summary())


###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:<br>
3. fewer columns than expected<br>
4. more columns than expected<br>

In [0]:
find_df1 = source1_df.where("Shipment_id rlike '[A-Za-z]'")
print("shipment_id is non-numeric from source1_df:")
display(find_df1)
source1_df.schema

find_df1 = source2_df.where("Shipment_id rlike '[A-Za-z]'")
print("shipment_id is non-numeric from source2_df:")
display(find_df1)
source2_df.schema

Count rows having:<br>
3. fewer columns than expected<br>
4. more columns than expected<br>

In [0]:
from pyspark.sql.functions import size,col,split,when
expected_cols = 5   # change as per your file
delimiter = ","
print("Source df1 Count rows:")
raw_df1=spark.read.text("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/logistics_source1")
df_with_col_count=raw_df1.withColumn('actual_col_count',size(split(col('value'),delimiter)))

df_flagged=df_with_col_count.withColumn("column_status",when(col("actual_col_count")<expected_cols,"FEWER_COLUMNS").when(col("actual_col_count")>expected_cols,"MORE_COLUMNS").otherwise("EXPECTED_COLUMNS"))
display(df_flagged)

df_bad_record = df_with_col_count.where(col("actual_col_count") != expected_cols).groupBy("actual_col_count").count()
display(df_bad_record)

print("Source df2 Count rows:")
raw_df2=spark.read.text("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/logistics_source2")
df_with_col_count1=raw_df2.withColumn("actual_column_count",size(split(col("value"),delimiter)))
df_flagged1=df_with_col_count1.withColumn("column_status",when(col("actual_column_count")<7,"FEWER_COLUMNS").when(col("actual_column_count")>7,"MORE_COLUMNS").otherwise("EXPECTED_COLUMNS"))
display(df_flagged1)
df_bad_record1=df_flagged1.where(col("actual_column_count")!=7).groupBy("actual_column_count").count()
display(df_bad_record1)

In [0]:
#Create a Spark Session Object
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("Logistic_analysis").getOrCreate()

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema<br>
2. Align them into a single canonical schema: 
- shipment_id,<br>
- first_name,<br>
- last_name,<br>
- age,<br>
- role,<br>
- hub_location,<br>
- vehicle_type,<br>
- data_source<br>
3. Add data_source column with values as: system1, system2 in the respective dataframes<br>

Source 1 (System A): id, fname, lname, age<br>
Source 2 (System B): shipment_id, full_name, years<br>
Canonical schema (decided by you):- shipment_id, first_name, last_name, age<br>

All source data is reshaped into this structure before further use.

In [0]:
from pyspark.sql.functions import col, lit, expr

source1_raw = spark.read \
    .option("header", True) \
    .option("mode", "permissive") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/logistics_source1")

source2_raw = spark.read \
    .option("header", True) \
    .option("mode", "permissive") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/logistics_source2")

source1_canonical = source1_raw.select(
    col("shipment_id").cast("string").alias("shipment_id"),
    col("first_name"),
    col("last_name"),
    col("age").cast("string"),
    col("role"),
    lit(None).cast("string").alias("hub_location"),
    lit(None).cast("string").alias("vehicle_type"),
    lit("system1").alias("data_source")
)
source2_canonical = source2_raw.select(
    col("shipment_id").cast("string").alias("shipment_id"),
    col("first_name"),
    col("last_name"),
    col("age").cast("string"),
    col("role"),
    col("hub_location"),
    col("vehicle_type"),
    lit("system2").alias("data_source")
)
canonical_df = source1_canonical.unionByName(source2_canonical)
display(canonical_df)
print(canonical_df.printSchema())


#####2. Cleansing:
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

In [0]:
#Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
print("Before dropping duplicates:", canonical_df.count())
canonical_df1 = canonical_df.na.drop(how='any',subset=["shipment_id","role"])
print("After dropping duplicates:", canonical_df1.count())


#Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
print("Before dropping duplicates:", canonical_df1.count())
canonical_df2 = canonical_df1.na.drop(how='all',subset=["first_name","last_name"])
print("After dropping duplicates:", canonical_df2.count())


Join Readiness Rule
A record must have a valid join key (shipment_id) to participate in downstream joins.
If the join key is NULL, the record is not usable and must be dropped.

In [0]:
#Join Readiness Rule - Drop records where the join key is null: shipment_id
from pyspark.sql.functions import col
print("Before Join Readiness check:", canonical_df2.count())
canonical_df_join_ready = canonical_df2.filter(col("shipment_id").isNotNull())
print("After Join Readiness check:", canonical_df_join_ready.count())

#OR

#canonical_df.na.drop(subset=["shipment_id"])



#####3.Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1<br>
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV<br>
bike to TwoWheeler<br>

In [0]:
#Age Defaulting Rule - Fill NULL values in the age column with: -1
cleaned_df=canonical_df_join_ready.na.fill("-1",subset=["age"])

#Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
cleaned_df1=cleaned_df.na.fill("UNKNOWN",subset=["vehicle_type"])

#Invalid Age Replacement - Replace the following values in age: "ten" to -1 and "" to -1
replacedata={'ten':'-1','':'-1'}
cleaned_df2=cleaned_df1.na.replace(replacedata,subset=["age"])
cleaned_df1.printSchema()

#Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV and bike to TwoWheeler
replacedata1={'Truck':'LMV','Bike':'TwoWheeler'}
cleaned_df3=cleaned_df2.na.replace(replacedata1,subset=["vehicle_type"])
display(cleaned_df3)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
json_clean_df=spark.read.option("multiLine", True).json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/logistics_shipment_detail_3000.json")
display(json_clean_df.limit(5))


Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>
3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
from pyspark.sql.functions import lower,upper,initcap,current_timestamp
#1. Add a column<br> 
#Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp #'ingestion_timestamp' and 'False' as 'is_expedited'
json_clean_df1=json_clean_df.withColumn("domain",lit("Logistic")).withColumn("ingestion_timestamp",current_timestamp()).withColumn("is_expedited",lit('False'))
display(json_clean_df1.limit(5))


#Column Uniformity: role - Convert to lowercase
cleaned_df4=cleaned_df3.withColumn("role",lower(col("role")))


#vehicle_type - Convert values to UPPERCASE
cleaned_df5=cleaned_df4.withColumn("vehicle_type",upper(col("vehicle_type")))


#Source Files: logistics_shipment_detail_3000.json (and the merged master files) hub_location - Convert values to initcap case
cleaned_df6=cleaned_df5.withColumn("hub_location",initcap(col("hub_location")))
display(cleaned_df5.limit(100))

3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>

In [0]:
from pyspark.sql.functions import to_date,round

#Convert shipment_date to yyyy-MM-dd
json_clean_df2=json_clean_df1.withColumn("shipment_date",to_date(col("shipment_date"),'yy-MM-dd'))


#Ensure delivery_cost has 2 decimal precision
json_clean_df3=json_clean_df2.withColumn("shipment_cost",round(col("shipment_cost"),2))
display(json_clean_df3.limit(15))

4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>

In [0]:
#Source File: logistics_source1 & logistics_source2
#age: Cast String to Integer
from pyspark.sql.functions import col

cleaned_df6.printSchema()
cleaned_df7 = cleaned_df6.withColumn("age",col("age").cast("int"))
cleaned_df7.printSchema()

#shipment_weight_kg: Cast to Double
json_clean_df3.printSchema()

json_clean_df4=json_clean_df3.withColumn("is_expedited",col("is_expedited").cast("Boolean"))
json_clean_df4.printSchema()


5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>

In [0]:
#Naming Standardization
#Source File: logistics_source1 & logistics_source2
#Rename: first_name to staff_first_name
#Rename: last_name to staff_last_name
cleaned_df8=cleaned_df7.withColumnsRenamed({"first_name":"staff_first_name","last_name":"staff_last_name"})

#Rename: hub_location to origin_hub_city
cleaned_df9=cleaned_df8.withColumnRenamed("hub_location","origin_hub_city")
cleaned_df9.printSchema()

6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
cleaned_df10 = cleaned_df9.where("NOT Shipment_id rlike '[A-Za-z]'")
complte_df=cleaned_df10.unionByName(json_clean_df4,allowMissingColumns=True)

cleaned_df11= complte_df.select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","shipment_cost","ingestion_timestamp")
display(cleaned_df11)

Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
cleaned_df12=cleaned_df11.distinct()
cleaned_df13=cleaned_df12.dropDuplicates(subset=['"shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","shipment_cost","ingestion_timestamp"'])
display(cleaned_df12)

##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

Add Audit Timestamp (load_dt) Source File: DF of logistics_source1 and logistics_source2

Scenario: We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
Action: Add a column load_dt using the function current_timestamp().

In [0]:
cleaned_df11=cleaned_df10.withColumn("load_dt",current_timestamp())
display(cleaned_df11.limit(5))

2. Create Full Name (full_name) Source File: DF of logistics_source1 and logistics_source2

Scenario: The reporting dashboard requires a single field for the driver's name instead of separate columns.
Action: Create full_name by concatenating first_name and last_name with a space separator.
Result: "Rajesh" + " " + "Kumar" -> "Rajesh Kumar"

In [0]:
from pyspark.sql.functions import concat, col

cleaned_df12=cleaned_df11.withColumn("Full_Name", concat(col("staff_first_name"), lit(' '), col("staff_last_name")))
display(cleaned_df12.limit(3))

3. Define Route Segment (route_segment) Source File: DF of logistics_shipment_detail_3000.json

Scenario: The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
Action: Combine source_city and destination_city with a hyphen.
Result: "Chennai" + "-" + "Pune" -> "Chennai-Pune"

In [0]:
json_clean_df5=json_clean_df4.withColumn("route_segment",concat(col("source_city"),lit('-'),col("destination_city")))
display(json_clean_df5.limit(5))

4. Generate Vehicle Identifier (vehicle_identifier) Source File: DF of logistics_shipment_detail_3000.json

Scenario: We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
Action: Combine vehicle_type and shipment_id to create a composite key.
Result: "Truck" + "_" + "500001" -> "Truck_500001"

In [0]:
json_clean_df6=json_clean_df5.withColumn("vehicle_identifier",concat(col("vehicle_type"),lit('_'),col("shipment_id")))
display(json_clean_df6.limit(5))

###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

In [0]:
from pyspark.sql.functions import year
json_clean_df7=json_clean_df6.withColumn("shipment_year",year(col("shipment_date")))
display(json_clean_df7.limit(5))

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

In [0]:
from pyspark.sql.functions import month
json_clean_df8=json_clean_df7.withColumn("shipment_month",month(col("shipment_date")))
display(json_clean_df8.limit(5))

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

In [0]:
from pyspark.sql.functions import dayofweek
json_clean_df9=json_clean_df8.withColumn("is_weekend",when(dayofweek(col("shipment_date")).isin(1,7),True).otherwise(False))
display(json_clean_df9.limit(5))

**4. Flag shipment status (`is_expedited`)**
* **Scenario:** The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
* **Action:** Flag as **'True'** if the `shipment_status` IN_TRANSIT or DELIVERED.

In [0]:
from pyspark.sql.functions import dayofweek
json_clean_df10=json_clean_df9.withColumn("is_expedited",when(col("shipment_status")=="IN_TRANSIT",True).when(col("shipment_status")=="DELIVERED",True).otherwise(False))
display(json_clean_df10.limit(5))

###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

In [0]:
from pyspark.sql.functions import try_divide,round
json_clean_df11=json_clean_df10.withColumn("cost_per_kg",round(try_divide(col("shipment_cost"),col("shipment_weight_kg")),2))
display(json_clean_df11.limit(5))

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

In [0]:
from pyspark.sql.functions import date_diff,current_date
json_clean_df12=json_clean_df11.withColumn("days_since_shipment",date_diff(current_date(), "shipment_date"))
display(json_clean_df12.limit(5))

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:

json_clean_df13=json_clean_df12.withColumn("tax_amount",round(col("shipment_cost")*0.18,2))
display(json_clean_df13.limit(5))

###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: DF of logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:
display(cleaned_df12.limit(5))
cleaned_df13=cleaned_df12.drop("staff_first_name", "staff_last_name")
display(cleaned_df13.limit(5))

Splitting & Merging/Melting of Columns
Reshaping columns to extract hidden values or combine fields for better analysis.
Source File: DF of logistics_shipment_detail_3000.json

**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

In [0]:
from pyspark.sql.functions import substring,day
json_clean_df14=json_clean_df13.withColumn("order_prefix",substring(col("order_id"),1,3)).withColumn("order_sequence",substring(col("order_id"),4,6))
display(json_clean_df14.limit(5))

json_clean_df15=json_clean_df14.withColumn("ship_year",year(col("shipment_date"))).withColumn("ship_month",month(col("shipment_date"))).withColumn("ship_day",day(col("shipment_date")))
display(json_clean_df15.limit(5))

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

In [0]:
json_clean_df16=json_clean_df15.withColumn("route_lane",concat(col("source_city"),lit('->'),col("destination_city")))
display(json_clean_df16.limit(5))


## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

In [0]:
def calculate_bonus(role, age):
    if role == 'driver' and age > 50:
        bonus = "15% of Salary (Reward for Seniority)"
    elif role == 'driver' and age < 30:
        bonus = "5% of Salary (Encouragement for Juniors)"
    else:
        bonus = 0

    return bonus



**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

In [0]:
from pyspark.sql.functions import udf
cleaned_df14=cleaned_df13.withColumn("projected_bonus",udf(calculate_bonus)(col("role"),col("age")))
display(cleaned_df14.limit(50))

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
def mask_identity(name):
    if name is None:
        return None

    name = name.strip()

    # Handle very short names safely
    if len(name) <= 3:
        return name[0] + "****" + name[-1]

    return name[:2] + "****" + name[-1]


In [0]:
cleaned_df15=cleaned_df12.withColumn("mask_identity",udf(mask_identity)("staff_first_name"))
display(cleaned_df15.limit(5))

**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
from pyspark.sql.functions import trim,length
cleaned_df15=cleaned_df12.withColumn("mask_identity",concat(substring(trim(col("staff_first_name")),1,2)
                                                            ,lit('****'),substring(col("staff_first_name"),length(trim(col("staff_first_name"))),1)))
display(cleaned_df15.limit(5))

## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

In [0]:
cleaned_df16=cleaned_df15.select("staff_first_name","role","origin_hub_city")
#display(cleaned_df16)

**2. Filter (Selection)**<br>
Source File: DF of json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

In [0]:
json_clean_df17=json_clean_df16.filter("shipment_status IN ('DELAYED', 'RETURNED')")
display(json_clean_df17.limit(5))

cleaned_df18=cleaned_df15.where("age>50")
display(cleaned_df18.limit(5))

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: DF of json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 40,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

In [0]:
json_clean_df17=json_clean_df16.withColumn("is_high_value",when(col("shipment_cost")>40000,True).otherwise(False))
display(json_clean_df17)

#Scenario: Flag weekend operations for overtime calculation.
#Action: Create flag is_weekend = True if day is Saturday or Sunday.

#This flag is already added in this DF previously

**4. Format (Standardization)**<br>
Source File: DF of json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"₹30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).

In [0]:

from pyspark.sql.functions import col, concat, lit, format_number,upper

json_clean_df18 = json_clean_df17.withColumn("shipment_cost",concat(lit("₹"),format_number(col("shipment_cost"),2)))


json_clean_df19=json_clean_df18.withColumn("source_city",upper("source_city"))
display(json_clean_df19.limit(5))




**5. Group & Aggregate (Summarization)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

In [0]:
cleaned_df16=cleaned_df15.groupBy("origin_hub_city").count()
display(cleaned_df16)

#Scenario: Fleet capacity analysis.
#Action: Group by vehicle_type and Sum the shipment_weight_kg.
#Need clarification here because source files are wrong

**6. Sorting (Ordering)**<br>
Source File: DF of json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending) then `priority_flag` (Descending).

In [0]:
from pyspark.sql.functions import desc
json_clean_df20=json_clean_df19.orderBy(desc("shipment_cost"))
json_clean_df20=json_clean_df19.orderBy("shipment_cost",ascending=True) #priority_flag (Descending) no such flag we added in json so wrong action
display(json_clean_df20.limit(20))



**7. Limit (Top-N Analysis)**<br>
Source File: DF of json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
json_clean_df21=json_clean_df20.filter(col("shipment_status")=='DELAYED').orderBy("shipment_cost").limit(10)
display(json_clean_df21)

## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> DF of logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> DF of logistics_shipment_detail_3000.json<br>

#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

In [0]:
innerdf1=cleaned_df15.join(json_clean_df20,how='inner',on="shipment_id")
display(innerdf1.limit(10))

In [0]:
leftdf1=cleaned_df15.alias("staff_df").join(json_clean_df20.alias("shipments_df"),how='left',on="shipment_id")
resultdf=leftdf1.select(cleaned_df15["*"]).where("shipments_df.shipment_id is NULL")
display(resultdf.limit(10))

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`


* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.

In [0]:
selfdf1=cleaned_df15.alias("staff_df1").join(cleaned_df15.alias("staff_df2"),how='inner',on=col("staff_df1.origin_hub_city")==col("staff_df2.origin_hub_city"))
resultdf = (
    selfdf1
        .select(
            col("staff_df1.origin_hub_city").alias("hub_location"),

            col("staff_df1.shipment_id").alias("emp1_shipment_id"),
            col("staff_df1.staff_first_name").alias("emp1_first_name"),
            col("staff_df1.staff_last_name").alias("emp1_last_name"),
            col("staff_df1.age").alias("emp1_age"),
            col("staff_df1.role").alias("emp1_role"),
            col("staff_df1.vehicle_type").alias("emp1_vehicle_type"),

            col("staff_df2.shipment_id").alias("emp2_shipment_id"),
            col("staff_df2.staff_first_name").alias("emp2_first_name"),
            col("staff_df2.staff_last_name").alias("emp2_last_name"),
            col("staff_df2.age").alias("emp2_age"),
            col("staff_df2.role").alias("emp2_role"),
            col("staff_df2.vehicle_type").alias("emp2_vehicle_type")
        )
        .where(col("staff_df1.shipment_id")!= col("staff_df2.shipment_id"))
)

display(resultdf.limit(10))

* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.

In [0]:
RightJoinDf=cleaned_df15.alias("staff_df").join(json_clean_df20.alias("shipments_df"),how='right',on=col("staff_df.shipment_id")==col("shipments_df.shipment_id"))
resultdf=RightJoinDf.where(col("staff_df.shipment_id").isNull())
display(resultdf.limit(10))

* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.

In [0]:
fullOuterJoin=cleaned_df15.alias("staff_df").join(json_clean_df20.alias("shipments_df"),how='full',on=col("staff_df.shipment_id")==col("shipments_df.shipment_id"))

from pyspark.sql.functions import col, when

reconciliation_df = fullOuterJoin.withColumn(
    "audit_status",
    when(
        col("staff_df.shipment_id").isNull(),
        "UNASSIGNED_SHIPMENT"
    ).when(
        col("shipments_df.shipment_id").isNull(),
        "IDLE_DRIVER"
    ).otherwise(
        "VALID_ASSIGNMENT"
    )
)

display(reconciliation_df.limit(10))


* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`

In [0]:
json_clean_df21=json_clean_df20.filter(col("shipment_status").isin("CREATED","DELAYED"))
crossJoinDf=cleaned_df15.alias("staff_df").join(json_clean_df21.alias("shipments_df"),how="cross")
display(crossJoinDf.limit(5))

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.

In [0]:
SemiJoinDf=cleaned_df15.alias("staff_df").join(json_clean_df20.alias("shipments_df"),how='left_semi',on=col("staff_df.shipment_id")==col("shipments_df.shipment_id"))
display(SemiJoinDf.limit(5))

* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

In [0]:
leftantijoin=cleaned_df15.alias("staff_df").join(json_clean_df20.alias("shipments_df"),how='left_anti',on=col("staff_df.shipment_id")==col("shipments_df.shipment_id"))
display(leftantijoin.limit(5))

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv dataframe<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup `hub_location` (eg. "Pune") in a Master Latitude/Longitude Master_City_List.csv dataframe and enrich our logistics_source (merged dataframe) by adding `lat` and `long` columns for map plotting.

In [0]:
staffdf=cleaned_df15
master_city_df=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Silver_data/Master_City_List.csv",header=True,inferSchema=True)

geo_tagged_df = cleaned_df15.alias("staff_df").join(master_city_df.alias("city_df"),col("staff_df.origin_hub_city")== col("city_df.city_name"),"left")
#display(geo_tagged_df)

### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

Staff (dimension)
Shipments (fact)
Vehicle_Master (dimension)

In [0]:
staff_df = cleaned_df15
shipment_df = json_clean_df20
city_df = master_city_df

#Join Shipments with Staff
from pyspark.sql.functions import col

shipment_staff_df = shipment_df.alias("s") \
    .join(
        staff_df.alias("st"),
        col("s.shipment_id") == col("st.shipment_id"),
        "left"
    ) \
    .select(
        col("s.shipment_id").alias("shipment_id"),
        col("s.shipment_cost"),
        col("s.shipment_date"),
        col("st.origin_hub_city").alias("origin_hub_city"),  # ✅ FIX
        col("s.vehicle_type"),
        col("st.staff_first_name"),
        col("st.staff_last_name"),
        col("st.role")
    )



#Join Vehicle Master
wide_shipment_history = shipment_staff_df.alias("ssf") \
    .join(
        city_df.alias("c"),
        col("ssf.origin_hub_city") == col("c.city_name"),
        "left"
    ) \
    .select(
        col("ssf.shipment_id"),
        col("ssf.shipment_cost"),
        col("ssf.shipment_date"),
        col("ssf.origin_hub_city"),
        col("ssf.vehicle_type"),
        col("ssf.staff_first_name"),
        col("ssf.staff_last_name"),
        col("ssf.role"),
        col("c.city_name").alias("hub_city_name"),
        col("c.country"),
        col("c.latitude"),
        col("c.longitude")
    )


wide_shipment_history.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/logistics_data_analysis/Gold",header=True,mode="overwrite")

display(wide_shipment_history.limit(20))


### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

In [0]:
from pyspark.sql.functions import col, regexp_replace,dense_rank
from pyspark.sql.window import Window
clean_df = wide_shipment_history.withColumn(
    "shipment_cost",
    regexp_replace(col("shipment_cost"), "[₹,]", "").cast("double")
)

from pyspark.sql.functions import sum

driver_cost_df = clean_df.groupBy(
    "origin_hub_city",
    "staff_first_name",
    "staff_last_name"
).agg(
    sum("shipment_cost").alias("total_shipment_cost")
)


top3df=driver_cost_df.withColumn("Row_numberSeqNum",dense_rank().over(Window.partitionBy("origin_hub_city").orderBy(desc("total_shipment_cost")))).where("Row_numberSeqNum<=3")
display(top3df)

### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, datediff

df_with_prev=json_clean_df15.withColumn("prev_shipment_date",lag("shipment_date").over(Window.partitionBy("vehicle_identifier").orderBy("shipment_date")))


idle_time_df = df_with_prev.withColumn("idle_days",datediff(col("shipment_date"), col("prev_shipment_date")))

from pyspark.sql.functions import when

idle_time_df = idle_time_df.withColumn(
    "idle_days",
    when(col("prev_shipment_date").isNull(), 0)
    .otherwise(col("idle_days"))
)

idle_time_df.select(
    "vehicle_identifier",
    "shipment_id",
    "shipment_date",
    "prev_shipment_date",
    "idle_days"
).orderBy("vehicle_identifier", "shipment_date").display()



### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).

In [0]:
uniondf=source1_canonical.union(source2_canonical)
print("Union Count of the data:",uniondf.count())
intersectdf=source1_canonical.intersect(source2_canonical)
print("intersect Count of the data:",intersectdf.count())
#EXCEPT (Distinct Difference)
exceptdf=source1_canonical.exceptAll(source2_canonical)
print("exceptAll Count of the data:",exceptdf.count())



### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

In [0]:
innerdf1.printSchema()