#1. Data Munging -
1. Data Munging - (Cleanup) Process of transforming and mapping data from Raw form into Tidy(usable) format with the intent of making it more appropriate and valuable for a variety of downstream purposes such for further Transformation/Enrichment, Egress/Outbound, analytics, Datascience/AI application & Reporting

In [0]:
#We have to define Spark Session to enter into Spark application
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Spark DataFrames bread and butter").getOrCreate()

In [0]:
#Data Exploration programatically

rawdf=spark.read.csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",header=True,inferSchema=True).toDF("shipment_id","first_name","last_name","age","role")
display(rawdf)

In [0]:
#Important passive Munging - EDA of schema/structure functions we can use
rawdf.printSchema()
print(rawdf.schema)
print(rawdf.dtypes)
print(rawdf.columns)


In [0]:
#Important passive Datamunging - EDA data functions we can use
#We identified few patterns on this data
#1. Deduplication of rows and given column(s)
#2. Null values ratio across all columns
#3. Distribution (Dense) of the data across all number columns
#4. Min, Max values
#5. StdDeviation - Standard deviation tells you how much the data varies from the average (mean).
#6. Percentile - Distribution percentage from 0 to 100 in 4 quadrants of 25%
print("actual count of the data",rawdf.count())
print("de-duplicated record (all columns) count",rawdf.distinct().count())#de duplicate the entire columns of the given  dataframe
print("de-duplicated record (all columns) count",rawdf.dropDuplicates().count())#de duplicate the entire columns of the given  dataframe
print("de-duplicated given cid column count",rawdf.dropDuplicates(['shipment_id']).count())#de duplicate the entire columns of the given  dataframe
display(rawdf.describe())
display(rawdf.summary())

###a. Passive Data Munging - (File: logistics_source1 and logistics_source2)
Without modifying the data, identify:
Shipment IDs that appear in both master_v1 and master_v2
Records where:

shipment_id is non-numeric
age is not an integer
Count rows having: 3. fewer columns than expected 4. more columns than expected

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
#Extraction (Ingestion) methodologies
from pyspark.sql.functions import lit,initcap

struct1="shipment_id string ,first_name string ,last_name string ,age string ,role string"
rawdf1=spark.read.schema(struct1).csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",header=True,inferSchema=True).withColumn("sourcesystem1",lit("system1"))
display(rawdf1)
rawdf2=spark.read.csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source2",header=True,inferSchema=True).withColumn("sourcesystem2",lit("system2"))


#display(rawdf2)
rawdf1.write.parquet("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1.parquet",mode='overwrite')
rawdf2.write.parquet("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1.parquet",mode="append")
merge_df=spark.read.parquet("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1.parquet",mergeSchema=True)
display(merge_df)
#Transformation (Munging)

#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
from pyspark.sql.types import *
struct1=StructType([StructField("shipment_id",StringType(),True),
                    StructField("first_name",StringType(),True),
                    StructField("last_name",StringType(),True),
                    StructField("age",StringType(),True),
                    StructField("role",StringType(),True)
                   ])
df_method1=spark.read.schema(struct1).csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",mode='permissive')
#display(df_method1)
print("entire count of data",df_method1.count())
print("after scrubbing, count of data",len(df_method1.collect()))

#method drop_malformed

dfmethod2=spark.read.schema(struct1).csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",mode='dropMalformed')
#display(dfmethod2)
print("entire count of data",dfmethod2.count())
print("after scrubbing, count of data",len(dfmethod2.collect()))



In [0]:
#method3 best methodology of applying active data munging
dfmethiod3=spark.read.schema(struct1).csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",mode='Permissive')
#display(dfmethiod3)
print("entire count of data",dfmethiod3.count())
print("after scrubbing, count of data",len(dfmethiod3.collect()))

In [0]:
#Before actively Cleansing or Scrubbing - We have to create a Rejection Strategy to reduce data challenges in the future
struct11=StructType([StructField("shipment_id",StringType(),True),
                    StructField("first_name",StringType(),True),
                    StructField("last_name",StringType(),True),
                    StructField("age",StringType(),True),
                    StructField("role",StringType(),True),
                    StructField("corruptdata",StringType(),True)
                   ])
dfmethiod3=spark.read.schema(struct11).csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",mode='permissive',columnNameOfCorruptRecord="corruptdata")
#display(dfmethiod3)
print("entire count of data",dfmethiod3.count())
print("after scrubbing, count of data",len(dfmethiod3.collect()))

In [0]:
#We already know how to do cleansing applying the strict Structure on method1 and method2, but we can do cleansing and scrubbing in a controlled fashion by applying functions on the method3 dataframe
#Important na functions we can use to do cleansing
struct11=StructType([StructField("shipment_id",StringType(),True),
                    StructField("first_name",StringType(),True),
                    StructField("last_name",StringType(),True),
                    StructField("age",StringType(),True)    ,
                    StructField("role",StringType(),True)
                   ])

clenaseddf=spark.read.schema(struct11).csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",mode='permissive',header='True')

print("entire count of data",clenaseddf.count())
print("after scrubbing cleansed count of data",len(clenaseddf.collect()))         

In [0]:
clansed_df1=clenaseddf.na.drop("any")
clansed_df1=clenaseddf.na.drop("any",subset=["shipment_id","role"])
#display(clansed_df1.take(10))
print("entire count of data",clansed_df1.count())
print("after scrubbing cleansed count of data",len(clansed_df1.collect()))

In [0]:
#2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
clen_df2=clansed_df1.na.drop("all",subset=["first_name","last_name"])
#display(clen_df2.take(10))
print("entire count of data",clen_df2.count())
print("after scrubbing cleansed count of data",len(clen_df2.collect()))

In [0]:
#3. Join Readiness Rule - Drop records where the join key is null: shipment_id
clen_df3=clen_df2.na.drop("any",subset=["shipment_id"])
#display(clen_df3.take(10))
print("entire count of data",clen_df3.count())
print("after scrubbing cleansed count of data",len(clen_df3.collect()))

#####Scrubbing: 


Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
clen_df4=clen_df3.na.fill(value= -1,subset=["age"])
#display(clen_df4.take(10))
print("Total number of rows after applying the age defaulting rule: ",clen_df4.count())


In [0]:
rawdf2=spark.read.csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source2",header='True')
display(rawdf2)

In [0]:
merge_df1=merge_df.na.drop("any",subset=["hub_location","vehicle_type"])
merge_df1=merge_df1.na.drop("any",subset=["shipment_id"])
merge_df1=merge_df1.na.drop("all",subset=["first_name","last_name"])
merge_df1=merge_df1.na.fill(value= -1,subset=["age"]).na.fill(value="unknown",subset=["vehicle_type"])
#display(merge_df1)

In [0]:
#Invalid Age Replacement - Replace the following values in age: "ten":-1,"" to "-1"
merge_df2=merge_df1.na.replace({'ten': '-1', '': '-1'}, subset=["shipment_id","age"])
merge_df2=merge_df2.na.replace({'': '-1'}, subset=["shipment_id","age"])
display(merge_df2)

In [0]:
#Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
merge_df3=merge_df2.na.replace({"Truck":"LMV","Bike":"TwoWheeler"},subset=["vehicle_type"])
#display(merge_df3)

In [0]:
#Column Uniformity: role - Convert to lowercase
from pyspark.sql.functions import col
from pyspark.sql.types import *
#display(merge_df3)
merge_df4=merge_df3.withColumn("role",lower(col("role")))
display(merge_df4)

In [0]:
#Source File: DF of merged(logistics_source1 & logistics_source2)
from pyspark.sql.functions import *
from pyspark.sql.types import *

#vehicle_type - Convert values to UPPERCASE
merge_df4=merge_df4.withColumn("vehicle_type",upper(col("vehicle_type")))

#Source Files: hub_location - Convert values to initcap case
merge_df4=merge_df4.withColumn("hub_location",initcap(col("hub_location")))
#display(merge_df4)

#Source Files: DF of merged(logistics_source1 & logistics_source2)
merge_df4=merge_df4.withColumnRenamed("sourcesystem1","logistics_source1")
merge_df4=merge_df4.withColumnRenamed("sourcesystem2","logistics_source2")
#display(merge_df4)
#Standardizing column data types to fix schema drift and enable mathematical operations.
#Source File: DF of merged(logistics_source1 & logistics_source2)
#age: Cast String to Integer
#print(merge_df4.printSchema())
merge_df4=merge_df4.withColumn("age",col("age").cast(IntegerType()))
#print(merge_df4.printSchema())

#Rename: first_name to staff_first_name
#Rename: last_name to staff_last_name
#Rename: hub_location to origin_hub_city
merge_df5=merge_df4.withColumnRenamed("first_name","staff_first_name").withColumnRenamed("last_name","staff_last_name").withColumnRenamed("hub_location","origin_hub_city")
#display(merge_df5)
#shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

merge_df6=merge_df5.select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city")

#Apply Record Level De-Duplication
#Apply Column Level De-Duplication (Primary Key Enforcement)

merge_df6=merge_df6.distinct()
merge_df6=merge_df6.dropDuplicates()
merge_df6=merge_df6.dropDuplicates(["shipment_id"])
merge_df=merge_df6.filter(col("shipment_id") !="ten")
display(merge_df6)


####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

In [0]:
#Column Uniformity: role - Convert to lowercase
from pyspark.sql.functions import col
from pyspark.sql.types import *
#display(merge_df3)
merge_df4=merge_df3.withColumn("role",lower(col("role")))
display(merge_df4)



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql.types import *
jsondf1 = spark.read \
    .option("multiline", "true") \
    .option("inferSchema", "true") \
    .json("/Volumes/we47catalog1/we47db1/we47volume1/logistics_shipment_detail_3000.json")

#display(jsondf1)
jsondf2=jsondf1.withColumn("domain",lit("Logistics")).withColumn("ingestion_timestamp",current_timestamp()).withColumn("is_expedited",lit(False))
display(jsondf2.take(10))
#Format Standardization:
#Source Files: DF of logistics_shipment_detail_3000.json
#Convert shipment_date to yyyy-MM-dd
jsondf3=jsondf2.withColumn("shipment_date",to_date(col("shipment_date"),"yy-MM-dd"))
#Ensure shipment_cost has 2 decimal precision
jsondf3=jsondf3.withColumn("shipment_cost",round(col("shipment_cost"),2))
display(jsondf3.take(10))
#Source File: DF of logistics_shipment_detail_3000.json
#shipment_weight_kg: Cast to Double
#print(jsondf3.printSchema())
jsondf4=jsondf3.withColumn("shipment_weight_kg",col("shipment_weight_kg").cast(DoubleType()))
#is_expedited: Cast to Boolean
jsondf4=jsondf4.withColumn("is_expedited",col("is_expedited").cast(BooleanType()))
#print(jsondf4.printSchema())

#Apply Record Level De-Duplication
#Apply Column Level De-Duplication (Primary Key Enforcement)
jsondf4=jsondf4.distinct()
jsondf4_dedup=jsondf4.dropDuplicates()
#display(jsondf4_dedup)
#shipment_cost (Metric), ingestion_timestamp (Audit)
jsondf4_select=jsondf4.select("shipment_cost","ingestion_timestamp")
#display(jsondf4_select)
jsondf4_select.distinct()
jsondf4_select.dropDuplicates()
jsondf4_select.dropDuplicates(["shipment_cost"])
display(jsondf4_select)

                           


In [0]:
#Source File: DF of merged(logistics_source1 & logistics_source2)
from pyspark.sql.functions import *
from pyspark.sql.types import *

#vehicle_type - Convert values to UPPERCASE
merge_df4=merge_df4.withColumn("vehicle_type",upper(col("vehicle_type")))


#Source Files: hub_location - Convert values to initcap case

merge_df4=merge_df4.withColumn("hub_location",initcap(col("hub_location")))
#display(merge_df4)

#Source Files: DF of merged(logistics_source1 & logistics_source2)
merge_df4=merge_df4.withColumnRenamed("sourcesystem1","logistics_source1")
merge_df4=merge_df4.withColumnRenamed("sourcesystem2","logistics_source2")
#display(merge_df4)
#Standardizing column data types to fix schema drift and enable mathematical operations.
#Source File: DF of merged(logistics_source1 & logistics_source2)
#age: Cast String to Integer
print(merge_df4.printSchema())
merge_df4=merge_df4.withColumn("age",col("age").cast(IntegerType()))
print(merge_df4.printSchema())

#Rename: first_name to staff_first_name
#Rename: last_name to staff_last_name
#Rename: hub_location to origin_hub_city
merge_df5=merge_df4.withColumnRenamed("first_name","staff_first_name").withColumnRenamed("last_name","staff_last_name").withColumnRenamed("hub_location","origin_hub_city")
display(merge_df5)
#shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)


merge_df6=merge_df5.select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city")

#Apply Record Level De-Duplication
#Apply Column Level De-Duplication (Primary Key Enforcement)
merge_df6=merge_df6.distinct()
merge_df6=merge_df6.dropDuplicates()
merge_df6=merge_df6.dropDuplicates(["shipment_id"])
display(merge_df6)


##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

In [0]:
#1. Add Audit Timestamp (load_dt) Source File: DF of logistics_source1 and logistics_source2
#display(merge_df)
from pyspark.sql.functions import *
from pyspark.sql.types import *
merge_df7=merge_df6.withColumn("load_dt",current_timestamp())
#display(merge_df7)

#Create Full Name (full_name) Source File: DF of logistics_source1 and logistics_source2
merge_df8=merge_df7.withColumn("full_name",concat(col("staff_first_name"),lit(" "),col("staff_last_name")))
#display(merge_df8)

In [0]:
#Define Route Segment (route_segment) Source File: DF of logistics_shipment_detail_3000.json
#Scenario: The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
#Action: Combine source_city and destination_city with a hyphen.
#Result: "Chennai" + "-" + "Pune" -> "Chennai-Pune"

#display(jsondf4)
jsondf5=jsondf4.withColumn("route_segment",concat(col("source_city"),lit("-"),col("destination_city")))
#display(jsondf5)

#4. Generate Vehicle Identifier (vehicle_identifier) Source File: DF of logistics_shipment_detail_3000.json

#Scenario: We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
#Action: Combine vehicle_type and shipment_id to create a composite key.
#Result: "Truck" + "_" + "500001" -> "Truck_500001"

jsondf6=jsondf5.withColumn("vehicle_identifier",concat(col("vehicle_type"),lit("_"),col("shipment_id")))
display(jsondf6)
#5. Generate Shipment Identifier (shipment_identifier)


#####Deriving of Columns (Time Intelligence)

In [0]:
#1. Derive Shipment Year (shipment_year)

#Extracting temporal features from dates to enable period-based analysis and reporting.
#Source File: logistics_shipment_detail_3000.json

#Scenario: Management needs an annual performance report to compare growth year-over-year.
#Action: Extract the year component from shipment_date.
#Result: "2024-04-23" -> 2024
jsondf7=jsondf6.withColumn("shipment_year",year(col("shipment_date").cast(DateType())))

#2. Derive Shipment Month (shipment_month)

jsondf7=jsondf7.withColumn("shipment_month",month(col("shipment_date")))

#3. Flag Weekend Operations (is_weekend)

#Scenario: The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
#Action: Flag as 'True' if the shipment_date falls on a Saturday or Sunday.
jsondf7=jsondf7.withColumn("is_weekend",when((dayofweek(col("shipment_date"))==1)|(dayofweek(col("shipment_date"))==7),"True").otherwise("False"))
display(jsondf7)


#4. Flag shipment status (is_expedited)

#Scenario: The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
#Action: Flag as 'True' if the shipment_status IN_TRANSIT or DELIVERED.
jsondf7=jsondf7.withColumn("is_expedited",when((col("shipment_status")=="IN_TRANSIT")|(col("shipment_status")=="DELIVERED"),"True").otherwise("False"))
display(jsondf7)
#1. Derive Shipment Year (shipment_year)

#####Enrichment/Business Logics (Calculated Fields)
Deriving new metrics and financial indicators using mathematical and date-based operations.
Source File: logistics_shipment_detail_3000.json

In [0]:
#. Calculate Unit Cost (cost_per_kg)

#Scenario: The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
#Action: Divide shipment_cost by shipment_weight_kg.
#Logic: shipment_cost / shipment_weight_kg

jsondf8=jsondf7.withColumn("cost_per_kg",col("shipment_cost")/col("shipment_weight_kg"))
#display(jsondf8)

#2. Track Shipment Age (days_since_shipment)

#Scenario: The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
#Action: Calculate the difference in days between the current_date and the shipment_date.
#Logic: datediff(current_date(), shipment_date)

jsondf8=jsondf8.withColumn("days_since_shipment",datediff(current_date(),col("shipment_date")))
display(jsondf8)


#3. Compute Tax Liability (tax_amount)

#Scenario: For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
#Action: Calculate 18% GST on the total shipment_cost.
#Logic: shipment_cost * 0.18

jsondf9=jsondf8.withColumn("tax_amount",col("shipment_cost")*0.18)
display(jsondf9)


#####Remove/Eliminate (drop, select, selectExpr)
Excluding unnecessary or redundant columns to optimize storage and privacy.
Source File: DF of logistics_source1 and logistics_source2

In [0]:
#. Remove Redundant Name Columns

#Scenario: Since we have already created the full_name column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
#Action: Drop the first_name and last_name columns.
#Logic: df.drop("first_name", "last_name")

merge_df8=merge_df8.drop("staff_first_name", "staff_last_name")
display(merge_df8)


#####Splitting & Merging/Melting of Columns
Reshaping columns to extract hidden values or combine fields for better analysis.
Source File: DF of logistics_shipment_detail_3000.json

In [0]:
#1. Splitting (Extraction) Breaking one column into multiple to isolate key information.
display(jsondf9)
#Split Order Code:
#Action: Split order_id ("ORD100000") into two new columns:
#order_prefix ("ORD")
#order_sequence ("100000")
jsondf10=jsondf9.withColumn("order_prefix",regexp_extract('order_id','([A-Z]+)',1))
jsondf10=jsondf9.withColumn("order_sequence",regexp_extract('order_id','([0-9]+)',1))
display(jsondf10)

#Split Date:
#Action: Split shipment_date into three separate columns for partitioning:
#ship_year (2024)
#ship_month (4)
#ship_day (23)

jsondf11=jsondf10.withColumn("ship_year",year(col("shipment_date")))
jsondf11=jsondf11.withColumn("ship_month",month(col("shipment_date")))
jsondf11=jsondf11.withColumn("ship_day",dayofmonth(col("shipment_date")))

display(jsondf11)


####2. Merging (Concatenation) Combining multiple columns into a single unique identifier or description.

In [0]:
#Create Route ID:
#Action: Merge source_city ("Chennai") and destination_city ("Pune") to create a descriptive route key:
#route_lane ("Chennai->Pune")
jsondf11=jsondf11.withColumn("route_lane",concat(col("source_city"),lit("->"),col("destination_city")))
display(jsondf11)

In [0]:
from pyspark.sql.functions import datediff, lag, col, try_to_date
from pyspark.sql.window import Window

window_df = spark.read.json("/Volumes/we47catalog1/we47db1/we47volume1/logistics_shipment_detail_3000.json", multiLine=True)
display(window_df)

# Use try_to_date to convert shipment_date to DATE
window_df = window_df.withColumn("shipment_date", try_to_date(col("shipment_date"), "yy-MM-dd"))

lagdf = window_df.withColumn(
    "prev_shipment_date",
    lag("shipment_date", 1).over(Window.partitionBy(col("vehicle_type")).orderBy(col("shipment_date")))
)
lagdf = lagdf.withColumn(
    "days_elapsed",
    datediff(col("shipment_date"), col("prev_shipment_date"))
)
lagdf = lagdf.drop("prev_shipment_date")
display(lagdf)


## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
#Scenario: The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

#Action: Create a Python function calculate_bonus(role, age) and register it as a Spark UDF.

#Logic:

#IF Role == 'Driver' AND Age > 50:
#Bonus = 15% of Salary (Reward for Seniority)
#IF Role == 'Driver' AND Age < 30:
#Bonus = 5% of Salary (Encouragement for Juniors)
#ELSE:
#Bonus = 0
#Result: A new derived column projected_bonus is generated for every row in the dataset.
#display(merge_df5)
#Lets Create an UDF to calculate agecateogy of customers
#1. Import udf library
from pyspark.sql.functions import udf
#2. Create a Python function - Can only work in single local computer
def calculate_bonus(role, age):
    if role == 'driver' and age > 50:
        return 0.15
    elif role == 'driver' and age < 30:
        return 0.5
    else:
        return 0.0
#3. Register the Python function as a Spark UDF
bonus_udf = udf(calculate_bonus)
#4. Apply the UDF to the DataFrame
jsondf11_udf = merge_df5.withColumn("projected_bonus", bonus_udf(col("role"), col("age")))
display(jsondf11_udf)

####UDF2: PII Masking (Privacy Compliance)

In [0]:
#Scenario: For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

#Business Rule: Show the first 2 letters, mask the middle characters with ****, and show the last letter.

#Action: Create a UDF mask_identity(name).

#Example:

#Input: "Rajesh"
#Output: "Ra****h"
#**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

from pyspark.sql.functions import udf
def mask_identity(name):
    if name is None:
        return None
    return name[:2] + '*' * (len(name) - 4) + name[-1]
mask_identity_udf = udf(mask_identity)
udf_df=merge_df5.withColumn("staff_first_name",mask_identity_udf(col("staff_last_name")))
display(udf_df)

## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*


#####1. Select (Projection)
Source Files: DF of logistics_source1 and logistics_source2

Scenario: The Driver App team only needs location data, not sensitive HR info.
Action: Select only first_name, role, and hub_location.

In [0]:
#1. Select (Projection)
#Source Files: DF of logistics_source1 and logistics_source2

#Scenario: The Driver App team only needs location data, not sensitive HR info.
#Action: Select only first_name, role, and hub_location.
display(merge_df4)
predf=merge_df4.select("first_name","role","hub_location")
display(predf)



In [0]:
#2. Filter (Selection)
#Source File: DF of json

#Scenario: We need a report on active operational problems.
#Action: Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.
#display(jsondf11)

predfjson=jsondf11.filter((col("shipment_status") == "DELAYED") | (col("shipment_status") == "RETURNED"))
#display(predfjson)

#Scenario: Insurance audit for senior staff.
#Action: Filter rows where age > 50.
predf1=merge_df4.filter(col("age") > 50)
#display(predf1)

#3. Derive Flags & Columns (Business Logic)
#Source File: DF of json

#Scenario: Identify high-value shipments for security tracking.
#Action: Create flag is_high_value = True if shipment_cost > 50,000.

predfjson1=predfjson.withColumn("is_high_value",when(col("shipment_cost") > 50000,"True").otherwise("False"))
#display(predfjson1)

#Scenario: Flag weekend operations for overtime calculation.
#Action: Create flag is_weekend = True if day is Saturday or Sunday.
predfjson1=predfjson1.withColumn("is_weekend",when((dayofweek(col("shipment_date"))==1)|(dayofweek(col("shipment_date"))==7),"True").otherwise("False"))
display()


###4. Format (Standardization)
Source File: DF of json

Scenario: Finance requires readable currency formats.
Action: Format shipment_cost to string like "₹30,695.80".
Scenario: Standardize city names for reporting.
Action: Format source_city to Uppercase (e.g., "chennai" → "CHENNAI").

In [0]:
#4. Format (Standardization)
#Source File: DF of json

#Scenario: Finance requires readable currency formats.
#Action: Format shipment_cost to string like "₹30,695.80".

display(predfjson1)
predfjson1.printSchema()
predfjson1=predfjson1.withColumn("shipment_cost",format_number(col("shipment_cost"),2))

display(predfjson1)



In [0]:
#Scenario: Standardize city names for reporting.
#Action: Format source_city to Uppercase (e.g., "chennai" → "CHENNAI").

predfjson2=predfjson1.withColumn("source_city",upper(col("source_city")))
display(predfjson2)

###5. Group & Aggregate (Summarization)
Source Files: DF of logistics_source1 and logistics_source2

Scenario: Regional staffing analysis.
Action: Group by hub_location and Count the number of staff.
Scenario: Fleet capacity analysis.
Action: Group by vehicle_type and Sum the shipment_weight_kg.

In [0]:
#Scenario: Regional staffing analysis.
#Action: Group by hub_location and Count the number of staff.

merge_df_grp_agg=merge_df4.groupBy("hub_location").count()
display(merge_df_grp_agg)

#5. Join (Combination)
#Scenario: Fleet capacity analysis.
#Action: Group by vehicle_type and Sum the shipment_weight_kg.

predfjson2_group_agg=predfjson2.groupBy("vehicle_type").agg(sum("shipment_weight_kg")).alias("shipment_weight_kg")
display(predfjson2_group_agg)

####6. Sorting (Ordering)
Source File: DF of json



In [0]:
#Scenario: Prioritize the most expensive shipments.
#Action: Sort by shipment_cost in Descending order.
predfjson2_order=predfjson2.orderBy(col("shipment_cost").desc())
display(predfjson2_order)

#Scenario: Organize daily dispatch schedule.
#Action: Sort by shipment_date (Ascending) then priority_flag (Descending).
predfjson2_order_sort=predfjson2_order.orderBy(col("shipment_date").asc())
display(predfjson2_order_sort)

#####7. Limit (Top-N Analysis)
Source File: DF of json

In [0]:
#Scenario: Dashboard snapshot of critical delays.
#Action: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.
predfjson2_order_sort_limit=predfjson2_order_sort.filter(col("shipment_status")=='DELAYED').orderBy(col("shipment_cost").desc()).limit(10)
display(predfjson2_order_sort_limit)

## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

####1. Joins
#######Source Files:
#######Left Side (staff_df):
#######DF of logistics_source1 & logistics_source2
#######Right Side (shipments_df):
#######DF of logistics_shipment_detail_3000.json

In [0]:
#display(merge_df4)
staff_df=merge_df4
staff_df=staff_df.filter(col("shipment_id") !="ten")
display(staff_df)
shipments_df=jsondf11
display(jsondf11)

In [0]:
#1 Frequently Used Simple Joins (Inner, Left)
#Inner Join (Performance Analysis):
#Scenario: We only want to analyze completed work. Connect Staff to the Shipments they handled.
#Action: Join staff_df and shipments_df on shipment_id.
#Result: Returns only rows where a staff member is assigned to a valid shipment.
innerdf=staff_df.join(shipments_df,on='shipment_id',how='inner')
display(innerdf)
#Left Join (Idle Resource check):
#Scenario: Find out which staff members are currently idle (not assigned to any shipment).
#Action: Join staff_df (Left) with shipments_df (Right) on shipment_id. Filter where shipments_df.shipment_id is NULL.
leftdf=staff_df.join(shipments_df,on='shipment_id',how='left')
leftdf=leftdf.filter(col("shipment_id").isNull())
#leftdf=leftdf.filter(col("shipment_id").isNotNull())
display(leftdf)

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**

In [0]:
#Self Join (Peer Finding):
#Scenario: Find all pairs of employees working in the same hub_location.
#Action: Join staff_df to itself on hub_location, filtering where staff_id_A != staff_id_B.
selfdf=staff_df.alias("staff_id_A").join(staff_df.alias("staff_id_B"),on='hub_location',how='inner').filter(col("staff_id_A.shipment_id") != col("staff_id_B.shipment_id"))
#display(selfdf)
#Right Join (Orphan Data Check):
#Scenario: Identify shipments in the system that have no valid driver assigned (Data Integrity Issue).
#Action: Join staff_df (Left) with shipments_df (Right). Focus on NULLs on the left side.

rightdf=staff_df.join(shipments_df,on='shipment_id',how='right')
rightdf=rightdf.filter(col("first_name").isNull())

#display(rightdf)
#2 Frequently Used Complex Joins (Outer, Cross)
#Full Outer Join (Reconciliation):
#Scenario: A complete audit to find both idle drivers AND unassigned shipments in one view.
#Action: Perform a Full Outer Join on shipment_id.

fulldf=staff_df.join(shipments_df,on='shipment_id',how='outer')
display(fulldf)

#Cartesian/Cross Join (Capacity Planning):
#Scenario: Generate a schedule of every possible driver assignment to every pending shipment to run an optimization algorithm.
#Action: Cross Join drivers_df and pending_shipments_df.

crossdf=staff_df.join(shipments_df,how='cross')
display(crossdf)
#3 Frequently Used Joins with Conditions (Left Semi)

###1.3 Advanced Joins (Semi and Anti)

In [0]:
#Left Semi Join (Existence Check):
#Scenario: "Show me the details of Drivers who have at least one shipment." (Standard filtering).
#Action: staff_df.join(shipments_df, "shipment_id", "left_semi").

leftsemidf=staff_df.join(shipments_df,on='shipment_id',how='left_semi')
display(leftsemidf)

#Benefit: Performance optimization; it stops scanning the right table once a match is found.
#Left Anti Join (Negation Check):
#Scenario: "Show me the details of Drivers who have never touched a shipment."
#Action: staff_df.join(shipments_df, "shipment_id", "left_anti").

leftantidf=staff_df.join(shipments_df,on='shipment_id',how='left_anti')
display(leftantidf)
#4 Frequently Used Joins with Conditions (Left Anti)
#Left Anti Join (Negation Check)

In [0]:
#Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)

#Scenario: Validation. Check if the hub_location in the staff file exists in the corporate Master_City_List.
#Action: Compare values against a reference list.
master_cities = [
    ("Bangalore",),
    ("Chennai",),
    ("Mumbai",),
    ("Delhi",),
    ("Pune",),
    ("Hyderabad",),
    ("Jaipur",),
    ("Kochi",),
    ("Indore",)
]

master_city_df = spark.createDataFrame(master_cities, ["hub_location"])
display(master_city_df)

In [0]:
invalid_hubs_df = staff_df.join(
    master_city_df,
    on="hub_location",
    how="left_semi"
)

display(invalid_hubs_df)

In [0]:
master_df=spark.read.csv("/Volumes/we47catalog1/we47db1/we47volume1/Master_City_List.csv",header=True,inferSchema=True)
display(master_df)

In [0]:
#Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv

#Scenario: Validation. Check if the hub_location in the staff file exists in the dataframe of corporate Master_City_List.csv.
#Action: Compare values against this Master_City_List list.
lookdf_semi=staff_df.join(master_df,on=col('hub_location')==col('city_name'),how='left_semi')
display(lookdf_semi)

##Scenario: Validation. Check if the hub_location in the staff file not exists in the dataframe of corporate Master_City_List.csv.

lookdf_anti=staff_df.join(master_df,on=col('hub_location')==col('city_name'),how='left_anti')
display(lookdf_anti)

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv 

In [0]:
#Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv dataframe

#Scenario: Geo-Tagging.
#Action: Lookup hub_location (eg. "Pune") in a Master Latitude/Longitude Master_City_List.csv dataframe and enrich our logistics_source (merged dataframe) by adding lat and long columns for map plotting.


staff_df_look=staff_df.join(master_df,on=col("hub_location") == col("city_name"),how='left')
display(staff_df_look)

staff_df_look_enrich=staff_df.join(master_df,on=col("hub_location") == col("city_name"),how='inner')
display(staff_df_look_enrich)

### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)

In [0]:
#Scenario: Creating a "Gold Layer" Table for PowerBI/Tableau.
#Action: Flatten the Star Schema. Join Staff, Shipments, and Vehicle_Master into one wide table (wide_shipment_history) so analysts don't have to perform joins during reporting.

#applying left outer to ensure all customer data is present
star_df=staff_df.join(shipments_df,on='shipment_id',how='left')
#display(star_df)

star_df1=star_df.join(master_df, star_df.hub_location == master_df.city_name, how='inner')
display(star_df1)





###5. Windowing (Ranking & Trends)

In [0]:
#Source Files:
#DF of logistics_source2: Provides hub_location (Partition Key).
#logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)
from pyspark.sql.functions import row_number,col
from pyspark.sql.window import Window
window_df1=spark.read.csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source2",header="True",inferSchema="True")
window_df1=window_df1.filter(col("shipment_id") !="ten")
#display(window_df1)
window_df1.printSchema()
window_df2=spark.read.json("/Volumes/we47catalog1/we47db1/we47volume1/logistics_shipment_detail_3000.json",multiLine=True)
#display(window_df2)
#window_df2.printSchema()

window_df3=window_df1.join(window_df2,on='shipment_id',how='inner')
#display(window_df3)
###`row_number()
row_df4=window_df3.withColumn("row_number_column",row_number().over(Window.partitionBy(col("hub_location")).orderBy(col("shipment_cost").desc())))
row_df4=row_df4.filter(col("row_number_column")<=3)
display(row_df4)

####dence_rank
dence_df5=window_df3.withColumn("dence_rank_column", dense_rank().over(Window.partitionBy(col("hub_location")).orderBy(col("shipment_cost").desc())))
dence_df5=dence_df5.filter(col("dence_rank_column")<=3)
display(dence_df5)


###6. Analytical Functions (Lead/Lag)

In [0]:
from pyspark.sql.functions import datediff, lag, col, try_to_date
from pyspark.sql.window import Window

window_df = spark.read.json("/Volumes/we47catalog1/we47db1/we47volume1/logistics_shipment_detail_3000.json", multiLine=True)
display(window_df)

# Use try_to_date to convert shipment_date to DATE
window_df = window_df.withColumn("shipment_date", try_to_date(col("shipment_date"), "yy-MM-dd"))

lagdf = window_df.withColumn(
    "prev_shipment_date",
    lag("shipment_date", 1).over(Window.partitionBy(col("vehicle_type")).orderBy(col("shipment_date")))
)
lagdf = lagdf.withColumn(
    "days_elapsed",
    datediff(col("shipment_date"), col("prev_shipment_date"))
)
lagdf = lagdf.drop("prev_shipment_date")
display(lagdf)


### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2

In [0]:
#Union: Combining Source1 (Legacy) and Source2 (Modern) into one dataset (Already done in Active Munging).
from pyspark.sql.functions import lit,initcap

struct1="shipment_id string ,first_name string ,last_name string ,age string ,role string"
rawdf1=spark.read.schema(struct1).csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source1",header=True,inferSchema=True).withColumn("sourcesystem1",lit("system1"))
rawdf1_u1=rawdf1.select("shipment_id","first_name","last_name","age","role")
#display(rawdf1)
rawdf2=spark.read.csv("/Volumes/we47catalog1/we47db1/we47volume1/logistics_source2",header=True,inferSchema=True).withColumn("sourcesystem2",lit("system2"))
rawdf2_u2=rawdf2.select("shipment_id","first_name","last_name","age","role")
#display(rawdf2)

uniondf=rawdf1_u1.union(rawdf2_u2)    
display(uniondf)

uniondf1=rawdf1_u1.unionAll(rawdf2_u2).distinct()
display(uniondf)

#Intersect: Identifying Staff IDs that appear in both Source 1 and Source 2 (Duplicate/Migration Check).

common_df=rawdf1_u1.intersect(rawdf2_u2).distinct()
display(common_df)

#Except (Difference): Identifying Staff IDs present in Source 2 but missing from Source 1 (New Hires).
#returns df1-df2 data (excessive df1 data)
uniondf2=rawdf1_u1.subtract(rawdf2_u2)
display(uniondf2)




###8. Grouping & Aggregations (Advanced)

In [0]:
#Source Files:
#DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).
#DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).
#Scenario: The CFO wants a subtotal report at multiple levels:
#Total Cost by Hub.
#Total Cost by Hub AND Vehicle Type.
#Grand Total.

display(jsondf4)
rawdf2=rawdf2.filter(col("shipment_id") !="ten")
display(rawdf2)

mergedf=rawdf2.alias("raw").join(jsondf4.alias("json"),on=col("raw.shipment_id")==col("json.shipment_id"), how="inner")
display(mergedf)

agg_sum=mergedf.cube(mergedf["raw.hub_location"], mergedf["raw.vehicle_type"]).agg(sum(mergedf["json.shipment_cost"]).alias("total_cost"))
display(agg_sum)









##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

In [0]:
##Schema Modeling data stored into delta lake using spark dataframe

merge_df6.write.mode("overwrite").format("delta").saveAsTable("we47catalog1.we47db1.schema_model_table")
##same file store into as parquet file into volume of delta lake:
merge_df6.write.mode("overwrite").format("parquet").save("/Volumes/we47catalog1/we47db1/we47volume1/logistic_data_model")

##lookup data stored into delta lake using spark dataframe

staff_df_look.write.format("delta").mode("overwrite").saveAsTable("we47catalog1.we47db1.staff_lookup")
##same file store into as parquet file into volume of delta lake:
staff_df_look.write.format("parquet").mode("overwrite").save("/Volumes/we47catalog1/we47db1/we47volume1/logistic_data_model/staff_lookup")

##Enrich data stored into delta lake using spark dataframe
staff_df_look_enrich.write.mode("overwrite").format("delta").saveAsTable("we47catalog1.we47db1.staff_lookup_enrich")
staff_df_look_enrich.write.mode("overwrite").format("parquet").save("/Volumes/we47catalog1/we47db1/we47volume1/logistic_data_model/staff_lookup_enrich")

#grouping and aggregation data:
agg_sum.write.format("delta").mode("overwrite").saveAsTable("we47catalog1.we47db1.agg_sum_table")
agg_sum.write.format("parquet").mode("overwrite").save("/Volumes/we47catalog1/we47db1/we47volume1/logistic_data_model/group_agg")

#set operations
uniondf2.write.mode("overwrite").format("delta").saveAsTable("we47catalog1.we47db1.uniondf2_table")
uniondf2.write.mode("overwrite").format("parquet").save("/Volumes/we47catalog1/we47db1/we47volume1/logistic_data_model/uniondf2_table_data")

common_df.write.mode("overwrite").format("delta").saveAsTable("we47catalog1.we47db1.common_df_table")
common_df.write.mode("overwrite").format("delta").save("/Volumes/we47catalog1/we47db1/we47volume1/logistic_data_model/common_df_table_data")



