#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

In [0]:
%python
from pyspark.sql.types import *
##schema
#schema = StructType([StructField("shipment_id", IntegerType(), True),StructField("order_id", StringType(), True),StructField("source_city", StringType(), True),StructField("destination_city", StringType(), True),StructField("shipment_status", StringType(), True),StructField("cargo_type", StringType(), True),StructField("vehicle_type", StringType(), True),StructField("shipment_weight_kg", DoubleType(), True),StructField("shipment_cost", DoubleType(), True),StructField("shipment_date", DateType(), True)])

##dataframe creation
dfjson = (spark.read.option("mode", "PERMISSIVE").option("multiLine", "True").json("/Volumes/workspace/default/logistics_project_data/logistics_shipment_detail_3000.json", dateFormat="yyyy-MM-dd"))
dfjson.printSchema()
display(dfjson)

In [0]:
##logistics_source1 - read csv
dfls1=spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header=True,inferSchema=True)
display(dfls1)
##logistics_source2 -read csv
dfls2=spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source2",header=True,inferSchema=True)
display(dfls2)  


# (Manual Exploratory Data Analysis)
- It is a Structured data with comma seperator (CSV)
- Header, No comments, NO footer is there in the data
- Total columns are (seperator + 1)
####- Data Quality 
- Null columns & null rows are there
- duplicate rows & Duplicate keys
- format issues are there (age & shipment_id  is not in number format eg.ten)
- No Uniformity issues found manually (Artist, artist)
- Number of columns are more or less than the expected
- eg. 5000006,John,Mathews,ten,Supervisor,Additionalcolumn
- 5000004,Suresh,,52,Loader
- Identification of data type

####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
##Apply inferSchema and toDF to create a DF and analyse the actual data.
dfls1 = spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True, inferSchema=True).toDF("id","firstname","lastname","age","profession")
dfls1.printSchema()
display(dfls1)

##Analyse the schema, datatypes, columns etc.
struct1=" shipment_id integer, firstname string, lastname string, age integer, profession string, corrupt_record string"
dfls1 = spark.read.schema(struct1).csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True ,  columnNameOfCorruptRecord='corrupt_record')
display(dfls1)
dfls1.printSchema()
##Analyse the duplicate records count and summary of the dataframe.
dfls1dup=dfls1.where(dfls1.shipment_id.isNull())
display(dfls1dup)
display(len(dfls1dup.collect()))
print(len(dfls1dup.collect()))


total_rows = dfls1.count()
total_columns = len(dfls1.columns)

print(f"Total Rows: {total_rows}")
print(f"Total Columns: {total_columns}")

distinct_rows = dfls1.select("shipment_id").distinct().count()
duplicate_rows = total_rows - distinct_rows

print(f"Distinct Rows: {distinct_rows}")
print(f"Duplicate Rows: {duplicate_rows}")

display(dfls1.summary())






###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
##shipment_id is non-numeric OR ## age is not an integer
#dfls1nnm=dfls1.where("id not rlike'[^0-9+$]'")##where("id not rlike'[0-9a-zA-Z]'")##where(dfls1.id.rlike(r'^[a-zA-Z]+$')

dfls11 = spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record')
display(dfls11)
dfls11nnmag=dfls11.where("shipment_id rlike '[^0-9]' OR age rlike '[^0-9]'")
display(dfls11nnmag)
##fewer columns than expected
dfls1 = spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record')
display(dfls1)
dfls1.where("shipment_id is null or age is null or first_name is null or last_name is null or role is null").show()



In [0]:
from pyspark.sql.functions import length, regexp_replace,col,split,size,col
##4. more columns than expected
struct1=" shipment_id integer, first_name string, last_name string, age integer, role string, corrupt_record string"
dfls1 = spark.read.schema(struct1).csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True , inferSchema=True , columnNameOfCorruptRecord='corrupt_record')
display(dfls1)
dfls1.where("corrupt_record is not null ").show(truncate=False)##truncate=False - show full content of data

##method1
expected_commas = 4
extra_columns_df = dfls1.filter((col("corrupt_record").isNotNull()) &(length(col("corrupt_record")) -length(regexp_replace(col("corrupt_record"), ",", ""))> expected_commas))
extra_columns_df.show(truncate=False)

##method2
bad_records = dfls1.where(size(split("corrupt_record", ",")) > 5)
bad_records.show(truncate=False)
##method3
bad_records = dfls1.filter(size(split("corrupt_record", ",")) > 5)
bad_records.show(truncate=False)
##fewer columns than expected but particular column
bad_records = dfls1.filter(size(split("corrupt_record", ",")) < 5)
bad_records.show(truncate=False)


###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
##Read both files without enforcing schema
##struct1=" shipment_id integer, first_name string, last_name string, age integer, role string,corrupt_record string"
from pyspark.sql.functions import *
dfls1 = spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record').withColumn("data_source",lit("system1"))
display(dfls1)
##struct2= "shipment_id integer, first_name string, last_name string, age integer, role string, Hub_location string, vehicle_type string,corrupt_record string"
dfls2=spark.read.csv(path=["/Volumes/workspace/default/logistics_project_data/logistics_source2"],header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record').withColumn("data_source",lit("system2"))
display(dfls2)

dfunion=dfls1.unionByName(dfls2,allowMissingColumns=True)
display(dfunion)
dfunion.printSchema()
dfunion.count()




#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
##Cleansing (removal of unwanted datasets)

##Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
dfdrop=dfunion.na.drop(how = "any",subset=["shipment_id","role"])
display(dfdrop)
##Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
dfdrop=dfdrop.na.drop(how = "all",subset=["first_name","last_name"])
display(dfdrop)
##Join Readiness Rule - Drop records where the join key is null: shipment_id
dfdrop1=dfdrop.na.drop(how = "any",subset=["shipment_id"])
display(dfdrop1)

##Scrubbing (convert raw to tidy)
##Age Defaulting Rule - Fill NULL values in the age column with: -1
dfage=dfunion.na.fill("-1",subset=['age'])
display(dfage)
##Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
dfvehicle=dfunion.na.fill("unknown",subset=['vehicle_type'])
display(dfvehicle)
##Invalid Age Replacement - Replace the following values in age: "ten" to -1 "" to -1
dfage1rplc={'ten': '-1'}
dfage1=dfage.na.replace(dfage1rplc,subset=['age'])
display(dfage1)
##Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
##method1
dfvhclerplce=dfunion.na.replace("Truck","LMV",subset=['vehicle_type']).na.replace("Bike","TwoWheeler",subset=['vehicle_type'])
display(dfvhclerplce)
##method2
dict1={"Truck":"LMV","Bike":"TwoWheeler"}
dfvhclerplce=dfunion.na.replace(dict1,subset=['vehicle_type'])
display(dfvhclerplce)


####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Detail Dataframe creation <br>
1. Read Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
%python
from pyspark.sql.types import *
##schema
#schema = StructType([StructField("shipment_id", IntegerType(), True),StructField("order_id", StringType(), True),StructField("source_city", StringType(), True),StructField("destination_city", StringType(), True),StructField("shipment_status", StringType(), True),StructField("cargo_type", StringType(), True),StructField("vehicle_type", StringType(), True),StructField("shipment_weight_kg", DoubleType(), True),StructField("shipment_cost", DoubleType(), True),StructField("shipment_date", DateType(), True)])

##dataframe creation
dfjson = (spark.read.option("mode", "PERMISSIVE").option("multiLine", "True").json("/Volumes/workspace/default/logistics_project_data/logistics_shipment_detail_3000.json", dateFormat="yyyy-MM-dd"))
dfjson.printSchema()
display(dfjson)

Standardizations:<br>

1. Add a column<br> 
Source File: logistics_shipment_detail_3000.json<br>: domain as 'Logistics'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: logistics_source1 & logistics_source2<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: (and the merged master files)
hub_location - Convert values to initcap case<br>
3. Format Standardization:<br>
Source Files: logistics_shipment_detail_3000.json
Convert shipment_ref to string<br>
Pad to 10 characters with leading zeros<br>
Convert dispatch_date to yyyy-MM-dd<br>
Ensure delivery_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: logistics_source1 & logistics_source2 <br>
age: Cast String to Integer<br>
Source File: logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: logistics_source1 & logistics_source2<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: All 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
##Add a column Source File: logistics_shipment_detail_3000.json:domain as 'Logistics'
from pyspark.sql.functions import *
dfjson=dfjson.withColumn("domain",lit("Logistics"))
display(dfjson)
##union logistics_source1 & logistics_source2
dfls1 = spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record').withColumn("data_source",lit("system1"))
display(dfls1)
##struct2= "shipment_id integer, first_name string, last_name string, age integer, role string, Hub_location string, vehicle_type string,corrupt_record string"
dfls2=spark.read.csv(path=["/Volumes/workspace/default/logistics_project_data/logistics_source2"],header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record').withColumn("data_source",lit("system2"))
display(dfls2)

dfunion=dfls1.unionByName(dfls2,allowMissingColumns=True)
display(dfunion)
##Column Uniformity: role - Convert to lowercase Source File: logistics_source1 & logistics_source2
dfuniform=dfunion.withColumn("role",lower(col("role")))
display(dfuniform)

##vehicle_type - Convert values to UPPERCASE Source Files: logistics_shipment_detail_3000.json

dfjsonupper=dfjson.withColumn("vehicle_type",upper(col("vehicle_type")))
display(dfjsonupper)
##(and the merged master files)hub_location - Convert values to initcap case
dfhub=dfuniform.withColumn("hub_location",initcap(col("hub_location")))
display(dfhub)
##Format Standardization Source Files: logistics_shipment_detail_3000.json Convert shipment_ref to string Pad to 10 characters with leading zeros
##Convert shipment_ref to string
dfshipment_string=dfjson.withColumn("shipment_id", col("shipment_id").cast("string"))
display(dfshipment_string)
##Pad to 10 characters with leading zeros##
dfshipment_string1=dfshipment_string.withColumn("shipment_id", lpad(col("shipment_id"), 10, "0"))
display(dfshipment_string1)
##Convert dispatch_date to yyyy-MM-dd

#dfjsdt=dfjson.withColumn("shipment_date",date_format(col("shipment_date"),"yyyy-MM-dd"))-- its notconverting to a date format
##Convert dispatch_date to yyyy-MM-dd
dfjsdt=dfjson.withColumn("shipment_date", to_date(col("shipment_date"), "yy-MM-dd"))
display(dfjsdt)
dfjsdt.printSchema()
##Ensure delivery_cost has 2 decimal precision
dfdeci = dfjsdt.withColumn("shipment_cost",col("shipment_cost").cast("decimal(12,2)"))
display(dfdeci)
dfdeci.printSchema()



In [0]:
##union logistics_source1 & logistics_source2
dfls1 = spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record').withColumn("data_source",lit("system1"))

##struct2= "shipment_id integer, first_name string, last_name string, age integer, role string, Hub_location string, vehicle_type string,corrupt_record string"
dfls2=spark.read.csv(path=["/Volumes/workspace/default/logistics_project_data/logistics_source2"],header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record').withColumn("data_source",lit("system2"))


dfunion=dfls1.unionByName(dfls2,allowMissingColumns=True)
display(dfunion)

#Data Type Standardization
#Standardizing column data types to fix schema drift and enable mathematical operations.
#Source File: logistics_source1 & logistics_source2
#age: Cast String to Integer
dfuncast=dfunion.withColumn("age", col("age").try_cast("int"))
#cast - will fail if we have NULL values, try_cast - If conversion fails → returns NULL, Does NOT fail the job
display(dfuncast)

#Source File: logistics_shipment_detail_3000.json - shipment_weight_kg: Cast to Double
dfdouble = dfdeci.withColumn("shipment_weight_kg",col("shipment_weight_kg").cast("double"))
display(dfdouble)
dfdouble.printSchema()

#Source File: logistics_shipment_detail_3000.json<br> - is_expedited: Cast to Boolean - we have used own condition for reference
dfbool = dfdouble.withColumn("is_expedited",when(col("shipment_date") < lit("2024-04-30"),lit("True")).otherwise(lit("False")))
display(dfbool)
dfbool.printSchema()



In [0]:
# 5. Naming Standardization <br>
# Source File: logistics_source1 & logistics_source2<br>
# Rename: first_name to staff_first_name<br>
# Method 1 - too costly to use
df_fname = dfuncast.withColumn("staff_first_name", col("first_name")).drop("first_name")
display(df_fname)
# Method 2 - recommended to use
df_fname = dfuncast.withColumnRenamed("first_name", "staff_first_name")
display(df_fname)
# Rename: last_name to staff_last_name<br>
df_lname = df_fname.withColumnRenamed("last_name", "staff_last_name")
display(df_lname)
# Rename: hub_location to origin_hub_city<br>
df_hubname = df_lname.withColumnRenamed("hub_location", "origin_hub_city")
display(df_hubname)

In [0]:
# 6. Reordering columns logically in a better standard format:<br>
# Source File: All 3 files<br>
# shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)
from pyspark.sql.functions import *
df_reorder = df_hubname.select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","age").withColumn("ingestion_timestamp",lit(current_timestamp()))
display(df_reorder)

df_json_reorder = dfbool.select("shipment_id","order_id","shipment_status", "cargo_type", "vehicle_type", "payment_mode","source_city", "destination_city","shipment_weight_kg", "shipment_cost","shipment_date")
display(df_json_reorder)






Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
# Apply Record Level De-Duplication
df_dd = df_reorder.distinct()
display(df_dd)

df_ddjs = df_json_reorder.distinct()
display(df_ddjs)

# Apply Column Level De-Duplication (Primary Key Enforcement)
df_pk = df_dd.dropDuplicates(["shipment_id"])
display(df_pk)

df_pkjs = df_ddjs.dropDuplicates(["order_id"])
display(df_pkjs)

##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
# 1. Add Audit Timestamp
df_reorder = df_hubname.select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","age").withColumn("load_dt",lit(current_timestamp()))
display(df_reorder)

# 2. Create Full Name
df_fullname = df_reorder.withColumn("full_name",concat_ws(" ",col("staff_first_name"),col("staff_last_name")))
display(df_fullname)

# 3. Define Route Segment (route_segment) Source File: logistics_shipment_detail_3000.json
# Result: "Chennai" + "-" + "Pune" -> "Chennai-Pune"
df_rsegment = df_pkjs.withColumn("route_segment",concat_ws("-",col("source_city"),col("destination_city")))
display(df_rsegment)

# 4. Generate Vehicle Identifier - "Truck" + "_" + "500001" -> "Truck_500001"
df_vehicleid = df_rsegment.withColumn("vehicle_id",concat_ws("_",col("vehicle_type"),col("shipment_id")))
display(df_vehicleid)
df_vehicleid.printSchema()


###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

In [0]:
from pyspark.sql.functions import year
# 1. Derive Shipment Year (shipment_year)
# Scenario: Management needs an annual performance report to compare growth year-over-year.
# Action: Extract the year component from shipment_date.
# Result: "2024-04-23" -> 2024
# 2. Derive Shipment Month 

df_vehicleid = df_vehicleid.\
withColumn("shipment_year", year("shipment_date")).\
withColumn("shipment_month", month("shipment_date"))
#year() - extract the year from given date/timestamp and retrun as integer
df_vehicleid.printSchema()
display(df_vehicleid)

# 3. Flag Weekend Operations (is_weekend)
# Scenario: The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
# Action: Flag as 'True' if the shipment_date falls on a Saturday or Sunday.

df_weekend = df_vehicleid.withColumn("is_weekend",when(dayofweek(col("shipment_date")).isin(1, 7), True).otherwise(False)) 
#dayofweek - return day of the week as int, isin check if the value is in the list, detect weekend in pyspark, 1-sunday, 7-saturday respectively
df_weekend.printSchema()
display(df_weekend)





###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:
# 1. Calculate Unit Cost (cost_per_kg) - Logic: shipment_cost / shipment_weight_kg
df_w=df_weekend.where(col("shipment_weight_kg") == 0)
display(df_w)
df_costperkg = df_weekend.withColumn("cost_per_kg",try_divide(col("shipment_cost"),col("shipment_weight_kg")).cast("decimal(12,2)"))
df_costperkg.printSchema()
display(df_costperkg)

# 2. Track Shipment Age (days_since_shipment) - Logic: datediff(current_date(), shipment_date)
df_age = df_costperkg.withColumn("days_since_shipment",datediff(current_date(),col("shipment_date")))
df_age.printSchema()
display(df_age)

# 3. Compute Tax Liability (tax_amount)-Logic: shipment_cost * 0.18
df_tax = df_age.withColumn("tax_amount",(col("shipment_cost")*lit(0.18)).cast("decimal(12,2)"))
df_tax.printSchema()
display(df_tax)


###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:
# 1. Remove Redundant Name Columns - Logic: df.drop("first_name", "last_name")
df_fullname = df_fullname.drop("staff_first_name", "staff_last_name")
display(df_fullname)

##### Splitting & Merging/Melting of Columns
*Reshaping columns to extract hidden values or combine fields for better analysis.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

In [0]:
# 1. Splitting (Extraction) Breaking one column into multiple to isolate key information.

# Split Order Code:
# Action: Split order_id ("ORD100000") into two new columns:
# order_prefix ("ORD")
# order_sequence ("100000")
# Method 1
df_split = df_tax.withColumn("order_prefix",split(col("order_id"), "(?<=\\D)(?=\\d)").getItem(0)).withColumn("order_sequence",split(col("order_id"), "(?<=\\D)(?=\\d)").getItem(1))

# Method 2
df_split = df_tax.\
withColumn("order_prefix",regexp_extract(col("order_id"), "([A-Za-z]+)", 1)  # all letters at start
).withColumn("order_sequence",regexp_extract(col("order_id"), "([0-9]+)", 1))    # all numbers

# Method 3
df_split = df_tax.\
withColumn("order_prefix",substring(col("order_id"), 1, 3)).\
withColumn("order_sequence",substring(col("order_id"), 4, 6))
display(df_split)


In [0]:
# Split Date:
# Action: Split shipment_date into three separate columns for partitioning:
# ship_year (2024)
# ship_month (4)
# ship_day (23)

df_split = df_split.withColumnsRenamed({"shipment_year":"ship_year", "shipment_month":"ship_month"}).withColumn("ship_day", dayofmonth("shipment_date"))
display(df_split)

# Merging (Concatenation) Combining multiple columns into a single unique identifier or description.
# Create Route ID:
# Action: Merge source_city ("Chennai") and destination_city ("Pune") to create a descriptive route key:
# route_lane ("Chennai->Pune")
df_route = df_split.withColumn("route_lane",concat_ws("->", col("source_city"), col("destination_city")))
df_route = df_route.drop("route_segment")
display(df_route)

## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
# UDF1: Complex Incentive Calculation
# Scenario: The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.
# Action: Create a Python function calculate_bonus(role, age) and register it as a Spark UDF.
# IF Role == 'Driver' AND Age > 50:
# Bonus = 15% of Salary (Reward for Seniority)
# IF Role == 'Driver' AND Age < 30:
# Bonus = 5% of Salary (Encouragement for Juniors)
# ELSE:
# Bonus = 0
# Result: A new derived column projected_bonus is generated for every row in the dataset.
from pyspark.sql.functions import *

def calculate_bonus(role, age):
  # if age is None:
  #       return 0.0
  if role == "Driver":
    if age > 50:
      return 0.15
    elif age >30:
      return 0.10
    elif age < 30:
      return 0.05
    elif age is None:
      return 0.0
  else:
    return 0.0
  
bonus_udf = udf(calculate_bonus, FloatType())
df_bonus = df_fullname.withColumn("projected_bonus",bonus_udf(col("role"),col("age")))
display(df_bonus)

df_bonus.printSchema()



In [0]:
# UDF2: PII Masking (Privacy Compliance)**
# **Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.
# **Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.
# **Action:** Create a UDF `mask_identity(name)`.
# **Example:**
# * **Input:** `"Rajesh"`
# * **Output:** `"Ra****h"`
# <br>
# **Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

# from pyspark.sql.functions import col, concat, substring, lit, length

df_masked = df_bonus.withColumn("masked_name",concat(substring("full_name", 1, 2),lit("****"),substring("full_name", length("full_name"), 1)))
display(df_masked)


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW customers
using csv
options(
  path = "/Volumes/workspace/default/logistics_project_data/logistics_source1",
  header = true,
  inferSchema = true
);
SELECT
  CONCAT(
    SUBSTRING(first_name, 1, 2),
    REPEAT('*', LENGTH(first_name) - 3),
    SUBSTRING(first_name, -1)
  ) AS masked_name
FROM customers;

## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

**2. Filter (Selection)**<br>
Source File: json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 50,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

**4. Format (Standardization)**<br>
Source File: json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"₹30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).

**5. Group & Aggregate (Summarization)**<br>
Source Files: logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

**6. Sorting (Ordering)**<br>
Source File: json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending) then `priority_flag` (Descending).

**7. Limit (Top-N Analysis)**<br>
Source File: json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
# 1. Select (Projection) - Source Files: logistics_source1 and logistics_source2, Action: Select only first_name, role, and hub_location.
df_fname = dfuncast.select("first_name","role","hub_location")
display(df_fname)

# 2. Filter (Selection)-Source File: json,Filter rows where shipment_status is 'DELAYED' or 'RETURNED/CANCELLED'
df_shipment_status = df_route.filter((col("shipment_status") =="DELAYED")|(col("shipment_status")=="CANCELLED"))
display(df_shipment_status)

# Action: Filter rows where age > 50.
df_age = df_bonus.filter(col("age") > 50)
display(df_age)

In [0]:
# 3. Derive Flags & Columns (Business Logic)
# Source File: json
# Scenario: Identify high-value shipments for security tracking.
# Action: Create flag is_high_value = True if shipment_cost > 50,000.
# Scenario: Flag weekend operations for overtime calculation.
# # Action: Create flag is_weekend = True if day is Saturday or Sunday.
from pyspark.sql.functions import when

#dayofweek - return day of the week as int, isin check if the value is in the list, detect weekend in pyspark, 1-sunday, 7-saturday respectively
df_dflag = df_route.withColumns(
    {
    "is_weekend": when(
        dayofweek(col("shipment_date")).isin([1, 7]),
        True
    ).otherwise(False),

    "is_high_value": when(
        col("shipment_cost") > 50000,
        True
    ).otherwise(False)
})

display(df_dflag)


In [0]:
# **4. Format (Standardization)**<br>
# Source File: json<br>
# * **Scenario:** Finance requires readable currency formats.
# * **Action:** Format `shipment_cost` to string like **"₹30,695.80"**. 
df_dformat=df_dflag.withColumn("shipment_cost", concat(lit("₹"),format_number(col("shipment_cost"), 2)))
# format_number - Formats the number - format like '99,99,999.00', d decimal places and returns the result as a string.
display(df_dformat)

# * **Scenario:** Standardize city names for reporting.
# * **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).
df_dformat=df_dformat.withColumn("source_city", upper(col("source_city")))
display(df_dformat)

In [0]:
# 5. Group & Aggregate (Summarization)
# Source Files: logistics_source1 and logistics_source2
# Scenario: Regional staffing analysis.
# Action: Group by hub_location and Count the number of staff.
df_location = df_fullname.groupBy("origin_hub_city").agg(count("full_name").alias("staff_count"))
display(df_location)
# Scenario: Fleet capacity analysis.
# Action: Group by vehicle_type and Sum the shipment_weight_kg.
df_vehicle_type = df_route.groupBy("vehicle_type").agg(round(sum("shipment_weight_kg"),2).alias("total_weight"))
display(df_vehicle_type)


In [0]:

# 7. Limit (Top-N Analysis)
# Source File: json

# Scenario: Dashboard snapshot of critical delays.
# Action: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.
df_topn = df_route.select("shipment_id","order_id","shipment_cost","shipment_status").filter(col("shipment_status") == "DELAYED").orderBy(col("shipment_cost").desc()).limit(10)
display(df_topn)


In [0]:
# 6. Sorting (Ordering)
# Source File: json
# Scenario: Prioritize the most expensive shipments.
# Action: Sort by shipment_cost in Descending order.
df_json_sort = df_route.orderBy(col("shipment_cost").desc())
display(df_json_sort)
# Scenario: Organize daily dispatch schedule.
# Action: Sort by shipment_date (Ascending) then priority_flag (Descending).
df_route = df_route.withColumn("priority_flag",when((col("shipment_date") <= lit("2024-04-30")),True).otherwise(False))
df_priority_sort = df_route.orderBy(col("shipment_date").asc(), col("priority_flag").desc())
display(df_priority_sort)


## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> logistics_shipment_detail_3000.json<br>
#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`.

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

### **2. Lookup**<br>
Source File: logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Validation. Check if the `hub_location` in the staff file exists in the corporate `Master_City_List`.
* **Action:** Compare values against a reference list.

### **3. Lookup & Enrichment**<br>
Source File: logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup `hub_location` ("Pune") in a Master Latitude/Longitude table and enrich the dataset by adding `lat` and `long` columns for map plotting.

### **4. Schema Modeling (Denormalization)**<br>
Source Files: All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

### **7. Set Operations**<br>
Source Files: logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).

### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

In [0]:
# 1. Joins
# Source Files:
# Left Side (staff_df):
# logistics_source1 & logistics_source2
# Right Side (shipments_df):
# logistics_shipment_detail_3000.json

# 1.1 Frequently Used Simple Joins (Inner, Left)
# Inner Join (Performance Analysis):
# Scenario: We only want to analyze completed work. Connect Staff to the Shipments they handled.
# Action: Join staff_df and shipments_df on shipment_id.
# Result: Returns only rows where a staff member is assigned to a valid shipment.

df_fullname=dfunion.withColumn("shipment_id", col("shipment_id").try_cast("int"))
staff_df=df_fullname
shipments_df=df_route
df_innerjoin = staff_df.join(shipments_df, on="shipment_id", how="inner").select("shipment_id", "first_name", "role", "data_source", "order_id")
display(df_innerjoin)

# Left Join (Idle Resource check):
# Scenario: Find out which staff members are currently idle (not assigned to any shipment).
# Action: Join staff_df (Left) with shipments_df (Right) on shipment_id. Filter where shipments_df.shipment_id is NULL.

df_leftjoin = staff_df.alias("staff_df").join(shipments_df.alias("shipments_df"), on="shipment_id", how="left").filter(col("shipments_df.shipment_id").isNull()).select(
    col("staff_df.shipment_id").alias('Csv_shipment_id'), 
       "first_name", 
       "role", 
       "data_source", 
       "order_id", 
       col("shipments_df.shipment_id").alias('Json_shipment_id'))
display(df_leftjoin)


In [0]:
# 1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)
# Self Join (Peer Finding):
# Scenario: Find all pairs of employees working in the same hub_location.
# Action: Join staff_df to itself on hub_location, filtering where staff_id_A != staff_id_B.

df_selfjoin = staff_df.alias("A").join(staff_df.alias("B"), on="hub_location", how="inner").filter(
    col("A.shipment_id") != col("B.shipment_id")).select(
    col("A.shipment_id").alias('staff_id_A'), 
       "A.first_name", 
       "A.last_name", 
       "A.hub_location", 
       col("B.shipment_id").alias('staff_id_B'), 
       col("B.first_name").alias('first_name_B'), 
       col("B.last_name").alias('last_name_B'))
display(df_selfjoin)

# Right Join (Orphan Data Check):
# Scenario: Identify shipments in the system that have no valid driver assigned (Data Integrity Issue).
# Action: Join staff_df (Left) with shipments_df (Right). Focus on NULLs on the left side.

df_rightjoin = staff_df.alias("staff_df").join(shipments_df.alias("shipments_df"), on="shipment_id", how="right").filter(col("staff_df.shipment_id").isNull()).select(
    col("staff_df.shipment_id").alias('Csv_shipment_id'), 
       "first_name", 
       "role", 
       "order_id", 
       col("shipments_df.shipment_id").alias('Json_shipment_id'),
       "shipment_status")
display(df_rightjoin)

# Full Outer Join (Reconciliation):
# Scenario: A complete audit to find both idle drivers AND unassigned shipments in one view.
# Action: Perform a Full Outer Join on shipment_id.

df_fullouterjoin = staff_df.alias("staff_df").join(shipments_df.alias("shipments_df"), on="shipment_id", how="fullouter").select(
    col("staff_df.shipment_id").alias('Csv_shipment_id'),
    "first_name", 
       "role", 
       col("shipments_df.shipment_id").alias('Json_shipment_id'))
display(df_fullouterjoin)

# Cartesian/Cross Join (Capacity Planning):
# Scenario: Generate a schedule of every possible driver assignment to every pending shipment to run an optimization algorithm.
# Action: Cross Join drivers_df and pending_shipments_df.
display(staff_df)
staff_driver_df = staff_df.filter(col("role") == "Driver")
pending_shipments_df = shipments_df.filter(col("shipment_status").isin("CREATED","DELAYED","IN_TRANSIT"))
df_capacity_plan = staff_driver_df.alias("Csv_Id").crossJoin(pending_shipments_df).select("role", col("Csv_Id.shipment_id"), "shipment_status")
display(df_capacity_plan)
display(pending_shipments_df.count())#json - pending status - 1776 rows
display(staff_driver_df.count())#csv - driver role count - 18 rows
display(df_capacity_plan.count())#cross - capacity plan - 31968 rows


In [0]:
# 1.3 Advanced Joins (Semi and Anti)
# Left Semi Join (Existence Check):
# Scenario: "Show me the details of Drivers who have at least one shipment." (Standard filtering).
# Action: staff_df.join(shipments_df, "shipment_id", "left_semi").
# Benefit: Performance optimization; it stops scanning the right table once a match is found.
df_leftsemi = staff_df.join(shipments_df, on="shipment_id", how="left_semi").filter(col("role") == "Driver").select(
       "shipment_id", 
       "first_name", 
       "role")   
display(df_leftsemi)# returns only left table cols so no need to select right table cols - right semi anti concept is not available

# Left Anti Join (Negation Check):
# Scenario: "Show me the details of Drivers who have never touched a shipment."
# Action: staff_df.join(shipments_df, "shipment_id", "left_anti").
df_leftanti = staff_df.join(shipments_df, on="shipment_id", how="left_anti").filter(col("role") == "Driver").select(
       "shipment_id", 
       "first_name", 
       "role")   
display(df_leftanti)


In [0]:
# 2. Lookup
# Source File: logistics_source1 and logistics_source2 (merged into Staff DF)

# Scenario: Validation. Check if the hub_location in the staff file exists in the corporate Master_City_List.
# Action: Compare values against a reference list.

geo_data = [
    ("AbuDhabi", 24.4539, 54.3773),
    ("Amritsar", 31.6340, 74.8723),
    ("Bangalore", 12.9716, 77.5946),
    ("Birmingham", 52.4862, -1.8904),
    ("Boston", 42.3601, -71.0589),
    ("California", 36.7783, -119.4179),
    ("Chennai", 13.0827, 80.2707),
    ("Coimbatore", 11.0168, 76.9558),
    ("Delhi", 28.7041, 77.1025),
    ("Dubai", 25.2048, 55.2708),
    ("HongKong", 22.3193, 114.1694),
    ("Hyderabad", 17.3850, 78.4867),
    ("Indore", 22.7196, 75.8577),
    ("Jaipur", 26.9124, 75.7873),
    ("Kochi", 9.9312, 76.2673),
    ("London", 51.5074, -0.1278),
    ("Lucknow", 26.8467, 80.9462),
    ("Mumbai", 19.0760, 72.8777),
    ("NewYork", 40.7128, -74.0060),
    ("Pune", 18.5204, 73.8567),
    ("Scranton", 41.4089, -75.6624),
    ("Singapore", 1.3521, 103.8198),
    ("Tokyo", 35.6762, 139.6503)
]

Master_City_List = spark.createDataFrame(geo_data, ["master_hub_location", "latitude", "longitude"])
display(Master_City_List)

df_joined = staff_df.join(
    Master_City_List,
    staff_df.hub_location == Master_City_List.master_hub_location,
    "left"
)
df_lookup = (df_joined.withColumn(
    "location_status",
    when(col("master_hub_location").isNotNull(), "valid")
    .otherwise("invalid")).select(
        "shipment_id", "first_name", "role", "hub_location", "location_status")
)
display(df_lookup)


In [0]:
# 3. Lookup & Enrichment
# Source File: logistics_source1 and logistics_source2 (merged into Staff DF)

# Scenario: Geo-Tagging.
# Action: Lookup hub_location ("Pune") in a Master Latitude/Longitude table and enrich the dataset by adding lat and long columns for map plotting.
df_enrichment = (df_joined.\
                withColumn("lat", when(col("hub_location") == "Pune", col("latitude")).otherwise(None)).\
                withColumn("long", when(col("hub_location") == "Pune", col("longitude")).otherwise(None))).select(
                "shipment_id", "first_name", "role", "hub_location", "lat", "long"
)
display(df_enrichment)

# 4. Schema Modeling (Denormalization)
# Source Files: All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)

# Scenario: Creating a "Gold Layer" Table for PowerBI/Tableau.
# Action: Flatten the Star Schema. Join Staff, Shipments, and Vehicle_Master into one wide table (wide_shipment_history) so analysts don't have to perform joins during reporting.
df_wide_shipment_history = staff_df.join(shipments_df, on = "shipment_id", how = "inner")
display(df_wide_shipment_history)



In [0]:
# 5. Windowing (Ranking & Trends)
# Source Files:
# logistics_source2: Provides hub_location (Partition Key).
# logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)
# Scenario: "Who are the Top 3 Drivers by Cost in each Hub?"
# Action:
# Partition by hub_location.
# Order by total_shipment_cost Descending.
# Apply dense_rank() and `row_number()
# Filter where rank or row_number <= 3.
from pyspark.sql.window import Window
dfls2=dfls2.withColumn("shipment_id", col("shipment_id").try_cast("int")).dropna(subset=["hub_location"])

# df_window_join = dfls2.join(shipments_df, on="shipment_id", how="inner")
df_window_join = (
    dfls2.alias("csv").join(shipments_df.alias("json"), on="shipment_id", how="inner")
        .select(
            col("shipment_id"),
            col("first_name"),
            col("last_name"),
            col("age"),
            col("role"),
            col("hub_location"),
            col("csv.vehicle_type").alias("driver_vehicle_type"),
            col("json.vehicle_type").alias("shipment_vehicle_type"),
            # col("csv.data_source"), 
            col("order_id"),
            col("shipment_status"),
            col("cargo_type"),
            col("payment_mode"),
            col("json.source_city"),
            col("json.destination_city"),
            col("shipment_weight_kg"),
            col("shipment_cost"),
            col("shipment_date"),
            col("vehicle_id"),
            col("ship_year"),
            col("ship_month"),
            col("is_weekend")
        )
)
# display(df_window)
window_spec = Window.partitionBy("hub_location").orderBy(desc("shipment_cost"))

df_partition_window = df_window_join.withColumn("seqnum", row_number().over(window_spec))
df_dense = df_partition_window.withColumn("dense_rank", dense_rank().over(window_spec))
df_ranked = df_dense.filter(col("seqnum") <= 3).select("hub_location","dense_rank", "seqnum","shipment_cost")
display(df_ranked)


In [0]:
# 6. Analytical Functions (Lead/Lag)
# Source File:
# logistics_shipment_detail_3000.json
# Scenario: Idle Time Analysis.
# Action: For each driver, calculate the days elapsed since their previous shipment.

# from pyspark.sql.functions import col, to_date
# shipments_df = shipments_df.withColumn(
#     "shipment_date",
#     to_date(col("shipment_date"), "yyyy-MM-dd")
# )

# driver_df = staff_df.filter("role == 'Driver'") #.select(col("shipment_id").alias("driver_id"))
# display(driver_df)
df_join_driver = staff_df.join(shipments_df, on="shipment_id", how="inner")
# display(df_join_driver)

window_spec = Window.partitionBy("shipment_id").orderBy("shipment_date")

shipments_lag = df_join_driver.withColumn(
    "previous_shipment_date",
    lag("shipment_date").over(window_spec)
)

idle_time_df = shipments_lag.withColumn(
    "idle_days",
    datediff(col("shipment_date"), col("previous_shipment_date"))
).select(
    "shipment_id",
    "shipment_date",
    "previous_shipment_date",
    "idle_days")
display(idle_time_df)


In [0]:
# 7. Set Operations
# Source Files: logistics_source1 and logistics_source2

# Union: Combining Source1 (Legacy) and Source2 (Modern) into one dataset (Already done in Active Munging).
# Intersect: Identifying Staff IDs that appear in both Source 1 and Source 2 (Duplicate/Migration Check).
# Except (Difference): Identifying Staff IDs present in Source 2 but missing from Source 1 (New Hires).

dfls1 = spark.read.csv("/Volumes/workspace/default/logistics_project_data/logistics_source1",header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record')

dfls2=spark.read.csv(path=["/Volumes/workspace/default/logistics_project_data/logistics_source2"],header= True , inferSchema=False , columnNameOfCorruptRecord='corrupt_record')

file1 = ["shipment_id","first_name","last_name","age","role"]

df1 = dfls1.select(file1)
df2 = dfls2.select(file1)

dfunion_set=df1.union(df2).distinct()
display(dfunion_set)

df_intersect = df1.intersect(df2)
display(df_intersect)
df_except = df2.exceptAll(df1)
display(df_except)


In [0]:
# 8. Grouping & Aggregations (Advanced)
# Source Files:
# logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).
# logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).

# Scenario: The CFO wants a subtotal report at multiple levels:
# Total Cost by Hub.
# Total Cost by Hub AND Vehicle Type.
# Grand Total.
# Action: Use cube("hub_location", "vehicle_type") or rollup() to generate all these subtotals in a single query.
# sr2=dfls2.alias("sr2")
# js=dfjson.alias("js")
# from pyspark.sql.functions import sum
# df_cube = df_window_join.cube(col("sr2.hub_location"), col("js.vehicle_type")).agg(
#     sum(col("js.shipment_cost").alias("total_cost")
# ))
# display(df_cube)

# df_cube = (
#     df_window_join
#     .cube("hub_location", "vehicle_type")
#     .agg(sum("shipment_cost").alias("total_cost"))
# )

# display(df_cube)
# df_clean = (
#     df_window_join
#     .withColumnRenamed("vehicle_type", "vehicle_type_src")
# )

# df_cube = (
#     df_clean
#     .cube("hub_location", "vehicle_type_src")
#     .agg(sum("shipment_cost").alias("total_cost"))
# )

# #
# df_cube = (
#     df_window_join
#         .cube("hub_location", "shipment_vehicle_type", "driver_vehicle_type")
#         .agg(sum("shipment_cost").alias("total_cost"))
# )
# display(df_cube)


In [0]:
dfls2 = dfls2.select("vehicle_type", "hub_location").dropDuplicates()
staff = dfls2.alias("staff")
# shipments = shipments_df.alias("shipments")
# staff = dfls2.withColumn("shipment_id", col("shipment_id").try_cast("int"))
df_window_join = staff.join(
    dfjson,
    on="vehicle_type",
    how="full_outer"
)

from pyspark.sql.functions import col, sum

df_cube = df_window_join.cube(
    col("hub_location"), 
    col("vehicle_type")   # explicitly from staff
).agg(
    sum("shipment_cost").alias("total_cost")  # explicitly from shipments
)
df_cube = df_cube.orderBy("hub_location", "vehicle_type")
display(df_cube)


##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

##7.Take the copy of the above notebook and try to write the equivalent SQL for which ever applicable.