In [0]:

logistics_df=spark.read.csv("/Volumes/workspace/wd36schema2/ingestion_volume/target/logistics_source1",inferSchema=True,header=True).toDF("shipment_id","first_name","last_name","age","role")
logistics_df.show(logistics_df.count(),truncate=False)
logistics_df.printSchema()



In [0]:
total_cnt=logistics_df.count()
print("Total count of records in the dataframe is {}".format(total_cnt))
distinct_records=logistics_df.select("shipment_id","first_name","last_name","age","role").distinct().count()
print("Distinct count of records in the dataframe is {}".format(distinct_records))
print("de-duplicated record (all columns) count",logistics_df.dropDuplicates().count())#de duplicate the entire columns of the given  dataframe
print("de-duplicated given shipment_id column count",logistics_df.dropDuplicates(['shipment_id']).count())#de duplicate the entire columns of the given  dataframe
display(logistics_df.describe())
display(logistics_df.summary())

a. Passive Data Munging - (File: logistics_source1 and logistics_source2)
Without modifying the data, identify:
Shipment IDs that appear in both master_v1 and master_v2
Records where:

shipment_id is non-numeric
age is not an integer
Count rows having: 3. fewer columns than expected 4. more columns than expected

In [0]:
master_v1=spark.read.csv("/Volumes/workspace/wd36schema2/ingestion_volume/target/logistics_source1",inferSchema=True,header=True).toDF("shipment_id","first_name","last_name","age","role")
master_v2=spark.read.csv("/Volumes/workspace/wd36schema2/ingestion_volume/target/logistics_source2",inferSchema=True,header=True).toDF("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type")
master_v2.printSchema()
##Master id appear common in both the dataframes
master_id=master_v1.join(master_v2,["shipment_id"],"inner").select(master_v1["shipment_id"], master_v1["first_name"], master_v1["last_name"], master_v1["age"], master_v1["role"])
master_id.show(master_id.count(),truncate=False)
###combining both the DS
master_df=master_v1.unionByName(master_v2,allowMissingColumns=True) 
master_df.show(master_df.count(),truncate=False)
display(master_df)
master_df.printSchema()


In [0]:
from pyspark.sql import functions as F
non_numeric_S_ids = master_df.filter(master_df['shipment_id'].rlike("[^0-9]"))

print(f"Found {non_numeric_S_ids.count()} records with non-numeric shipment IDs.")
non_numeric_S_ids.show(truncate=False)

#####Age is not an integer#############################3
##M1
non_numeric_age = master_df.filter(master_df['age'].rlike("[^0-9]"))

print(f"Found {non_numeric_age.count()} records with non-integer shipment IDs.")
non_numeric_age.show(truncate=False)
##M2


non_integer_age = master_df.filter(
    ~F.col("age").rlike("^\\d+$") & F.col("age").isNotNull()
)

print(f"Found {non_integer_age.count()} records where age is not a clean integer string.")
non_integer_age.show(truncate=False)

##Count rows having: 3. fewer columns than expected 4. more columns than expected
missing_columns_df=master_df.na.drop( how='any',subset=["shipment_id","first_name","last_name","age","role"]).count()
print(f"Found {missing_columns_df} records with missing columns.")

extra_columns_df=master_df.na.drop( how='any',subset=["hub_location","vehicle_type"]).count()
print(f"Found {extra_columns_df} records with extra columns.")
display(extra_columns_df)


Read both files without enforcing schema
Align them into a single canonical schema: shipment_id, first_name, last_name, age, role, hub_location, vehicle_type, data_source
Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
from pyspark.sql.functions import lit,initcap,col
master_v1_df=spark.read.csv("/Volumes/workspace/wd36schema2/ingestion_volume/target/logistics_source1",header=True)
add_ds_mv1=master_v1_df.select("shipment_id","first_name","last_name","age","role").withColumn("datasource",lit("system1"))
display(add_ds_mv1)
master_v2_df=spark.read.csv("/Volumes/workspace/wd36schema2/ingestion_volume/target/logistics_source2",header=True)
add_ds_mv2=master_v2_df.selectExpr("*", "'system2' as datasource").toDF("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","datasource")
display(add_ds_mv2)
master_dfe=add_ds_mv1.unionByName(add_ds_mv2,allowMissingColumns=True)
master_dfe.show(master_dfe.count(),truncate=False)
master_dfe.printSchema()
select_cols= master_dfe.select("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","datasource").toDF("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","datasource")
display(select_cols)



Cleansing (removal of unwanted datasets)

Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
Join Readiness Rule - Drop records where the join key is null: shipment_id


In [0]:
man_column_check=select_cols.na.drop(how='any',subset=["shipment_id","role"])
man_column_check.show(man_column_check.count(),truncate=False)
compl_rule=man_column_check.na.drop(how="all",subset=["first_name","last_name"])
display(compl_rule)
join_readiness_rule=compl_rule.na.drop(subset=["shipment_id"])
display(join_readiness_rule)

Scrubbing (convert raw to tidy)
4. Age Defaulting Rule - Fill NULL values in the age column with: -1
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
6. Invalid Age Replacement - Replace the following values in age: "ten" to -1 "" to -1
7. Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler

In [0]:
age_def_rule=join_readiness_rule.na.fill("-1",subset=["age"])
display(age_def_rule)
vel_typ_rule=age_def_rule.na.fill("unknown",subset=["vehicle_type"])
display(vel_typ_rule)
find_replace_values_dict={'ten':'-1','':'-1'}
scrubbeddf2=vel_typ_rule.na.replace(find_replace_values_dict,subset=["age"])
display(scrubbeddf2)
#Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
find_replace_values_dict2={'Truck':'LMV','Bike':'TwoWheeler'}
scrubbeddf3=scrubbeddf2.na.replace(find_replace_values_dict2,subset=["vehicle_type"])
display(scrubbeddf3)




3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format
Creating shipments Details data Dataframe creation

Create a DF by Reading Data from logistics_shipment_detail.json
As this data is a clean json data, it doesn't require any cleansing or scrubbing.
Standardizations:

Add a column
Source File: DF of logistics_shipment_detail_3000.json
: domain as 'Logistics', current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
Column Uniformity: role - Convert to lowercase
Source File: DF of merged(logistics_source1 & logistics_source2)
vehicle_type - Convert values to UPPERCASE
Source Files: DF of logistics_shipment_detail_3000.json hub_location - Convert values to initcap case
Source Files: DF of merged(logistics_source1 & logistics_source2)
Format Standardization:
Source Files: DF of logistics_shipment_detail_3000.json
Convert shipment_date to yyyy-MM-dd
Ensure shipment_cost has 2 decimal precision
Data Type Standardization
Standardizing column data types to fix schema drift and enable mathematical operations.
Source File: DF of merged(logistics_source1 & logistics_source2)
age: Cast String to Integer
Source File: DF of logistics_shipment_detail_3000.json
shipment_weight_kg: Cast to Double
Source File: DF of logistics_shipment_detail_3000.json
is_expedited: Cast to Boolean
Naming Standardization
Source File: DF of merged(logistics_source1 & logistics_source2)
Rename: first_name to staff_first_name
Rename: last_name to staff_last_name
Rename: hub_location to origin_hub_city
Reordering columns logically in a better standard format:
Source File: DF of Data from all 3 files
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)
Deduplication:

Apply Record Level De-Duplication
Apply Column Level De-Duplication (Primary Key Enforcement)

Creating shipments Details data Dataframe creation

Create a DF by Reading Data from logistics_shipment_detail.json
As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:

df = spark.read.option("multiline", "true").json( "/Volumes/workspace/wd36schema2/ingestion_volume/logistics_shipment_detail_3000.json" )

df.show(df.count(),truncate=False)
display(df)



Add a column
Source File: DF of logistics_shipment_detail_3000.json
: domain as 'Logistics', current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'

In [0]:
from pyspark.sql.functions import lit, current_timestamp
add_col1_df = df.withColumns({"domain": lit("logistics"),
    "ingestion_timestamp":current_timestamp(), "is_expedited": lit("False")})
add_col1_df.show(add_col1_df.count(), truncate=False)
display(add_col1_df)

Column Uniformity: role - Convert to lowercase
Source File: DF of merged(logistics_source1 & logistics_source2)
vehicle_type - Convert values to UPPERCASE
Source Files: DF of logistics_shipment_detail_3000.json hub_location - Convert values to initcap case
Source Files: DF of merged(logistics_source1 & logistics_source2)

In [0]:
from pyspark.sql.functions import col, lower, upper, initcap
scrubbeddf3_lw=scrubbeddf3.withColumns({"shipment_id":lower(col("shipment_id")),"first_name":lower(col("first_name")),"last_name":lower(col("last_name")),"age":lower(col("age")),"role":lower(col("role")),"hub_location":initcap(col("hub_location")),"vehicle_type": upper(col ("vehicle_type")),"datasource": lower(col("datasource"))})
display(scrubbeddf3_lw)

case_conv_json_df=add_col1_df.withColumns({"cargo_type":lower(col("cargo_type")),"destination_city":lower(col("destination_city")),"order_id":lower(col("order_id")),  "payment_mode":lower(col("payment_mode")),
"shipment_cost":lower(col("shipment_cost")),
"shipment_date":lower(col("shipment_date")),
"shipment_id":lower(col("shipment_id")),
"shipment_status":lower(col("shipment_status")),
"shipment_weight_kg":lower(col("shipment_weight_kg")),
"source_city":lower(col("source_city")),
"vehicle_type":lower(col("vehicle_type")),
"domain":lower(col("domain")),
 "ingestion_timestamp":lower(col("ingestion_timestamp")),
  "is_expedited":lower(col("is_expedited"))})

In [0]:
age_cast = scrubbeddf3_lw.withColumn("age", scrubbeddf3_lw["age"].cast("int"))
display(age_cast)
rename = age_cast.withColumnsRenamed({
    "first_name": "staff_first_name",
    "last_name": "staff_last_name",
    "hub_location": "origin_hub_city"
})
display(rename)
from pyspark.sql.functions import col
shipment_weight_cast = case_conv_json_df.withColumns({
    "shipment_weight_kg": case_conv_json_df["shipment_weight_kg"].cast("Double"),
    "is_expedited": case_conv_json_df["is_expedited"].cast("Boolean")
})
shipment_weight_cast.printSchema()
display(shipment_weight_cast)


Format Standardization:
Source Files: DF of logistics_shipment_detail_3000.json
Convert shipment_date to yyyy-MM-dd
Ensure shipment_cost has 2 decimal precision

In [0]:
from pyspark.sql.functions import to_date,round,col,date_format
formatteddf=shipment_weight_cast.withColumns({
    "shipment_date": date_format(to_date(col("shipment_date"), "dd-MM-yy"), "yyyy-MM-dd"),
    "shipment_cost": round(col("shipment_cost"),2)})
formatteddf.printSchema()
display(formatteddf)

Data Type Standardization
Standardizing column data types to fix schema drift and enable mathematical operations.
Source File: DF of merged(logistics_source1 & logistics_source2)
age: Cast String to Integer
Source File: DF of logistics_shipment_detail_3000.json
shipment_weight_kg: Cast to Double
Source File: DF of logistics_shipment_detail_3000.json
is_expedited: Cast to Boolean
Naming Standardization
Source File: DF of merged(logistics_source1 & logistics_source2)
Rename: first_name to staff_first_name
Rename: last_name to staff_last_name
Rename: hub_location to origin_hub_city
Reordering columns logically in a better standard format:
Source File: DF of Data from all 3 files
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
#Reordering columns logically in a better standard format: Source File: DF of Data from all 3 files shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)
joined_df=rename.unionByName(shipment_weight_cast,allowMissingColumns=True)
display(joined_df)
reorder_joined_df=joined_df.select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","shipment_cost","ingestion_timestamp")
display(reorder_joined_df)


#Reordering columns logically in a better standard format: Source File: DF of Data from all 3 files shipment_id (Identifier)

Deduplication:

Apply Record Level De-Duplication
Apply Column Level De-Duplication (Primary Key Enforcement)



In [0]:
row_lvl_dedup=reorder_joined_df.distinct()
display(row_lvl_dedup)
col_lvl_dedup=row_lvl_dedup.dropDuplicates(subset=['shipment_id'])
display(col_lvl_dedup)


Data Enrichment - Detailing of data
Makes your data rich and detailed

Adding of Columns (Data Enrichment)
Creating new derived attributes to enhance traceability and analytical capability.

1. Add Audit Timestamp (load_dt) Source File: DF of logistics_source1 and logistics_source2

Scenario: We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
Action: Add a column load_dt using the function current_timestamp().

In [0]:
from pyspark.sql.functions import col, current_timestamp, concat_ws
add_tmsp_df=rename.withColumn("load_dt",current_timestamp())
display(add_tmsp_df)
ad_full_n_column=add_tmsp_df.select(col("shipment_id").alias("shipment_id"),concat_ws(" ",col("staff_first_name"),col("staff_last_name")).alias("Full_name"),"role","origin_hub_city","vehicle_type","datasource","load_dt")
display(ad_full_n_column)


Define Route Segment (route_segment) Source File: DF of logistics_shipment_detail_3000.json

Scenario: The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
Action: Combine source_city and destination_city with a hyphen.
Result: "Chennai" + "-" + "Pune" -> "Chennai-Pune"

Scenario: We need a unique tracking code that immediately tells us the vehicle type and the shipment ID. Action: Combine vehicle_type and shipment_id to create a composite key. Result: "Truck" + "_" + "500001" -> "Truck_500001"


In [0]:
from pyspark.sql.functions import col, concat, lit
stand_src_df=formatteddf.select(col("shipment_id"),"order_id",concat(col("source_city"),lit("-"),col("destination_city")).alias("route_segment"),"cargo_type","payment_mode","shipment_status","shipment_cost","shipment_weight_kg","shipment_date","vehicle_type",concat(col("vehicle_type"),lit("_"),col("shipment_id")).alias("vehicle_identifier"),"domain",
"ingestion_timestamp","is_expedited")
display(stand_src_df)

Deriving of Columns (Time Intelligence)
Extracting temporal features from dates to enable period-based analysis and reporting.
Source File: logistics_shipment_detail_3000.json
1. Derive Shipment Year (shipment_year)

Scenario: Management needs an annual performance report to compare growth year-over-year.
Action: Extract the year component from shipment_date.
Result: "2024-04-23" -> 2024
2. Derive Shipment Month (shipment_month)

Scenario: Analysts want to identify seasonal peaks (e.g., increased volume in December).
Action: Extract the month component from shipment_date.
Result: "2024-04-23" -> 4 (April)
3. Flag Weekend Operations (is_weekend)

Scenario: The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
Action: Flag as 'True' if the shipment_date falls on a Saturday or Sunday.
4. Flag shipment status (is_expedited)

Scenario: The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
Action: Flag as 'True' if the shipment_status IN_TRANSIT or DELIVERED.

Derive Shipment Year (shipment_year)
Scenario: Management needs an annual performance report to compare growth year-over-year. Action: Extract the year component from shipment_date. Result: "2024-04-23" -> 2024 2. Derive Shipment Month (shipment_month)

In [0]:
from pyspark.sql.functions import col, current_timestamp, month,year
shipment_year_df=stand_src_df.withColumn("shipment_year",year(col("shipment_date").cast("date")))
display(shipment_year_df)
shipment_month_df=shipment_year_df.withColumn("shipment_month",month(col("shipment_date").cast("date")))
display(shipment_month_df)


Flag Weekend Operations (is_weekend)

Scenario: The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
Action: Flag as 'True' if the shipment_date falls on a Saturday or Sunday.
4. Flag shipment status (is_expedited)



In [0]:
from pyspark.sql.functions import col, dayofweek, when
Weekend_flag =shipment_month_df.withColumn( "is_weekend", when(dayofweek(col("shipment_date").cast("date")).isin(1, 7), True).otherwise(False) ) 
display(Weekend_flag)

Flag shipment status (is_expedited)
Scenario: The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
Action: Flag as 'True' if the shipment_status IN_TRANSIT or DELIVERED.


In [0]:
from pyspark.sql.functions import when, col

shipment_flag_df = Weekend_flag.withColumn(
    "is_expedited",
    when(col("shipment_status").isin("IN_TRANSIT", "DELIVERED"), True).otherwise(False)
)
display(shipment_flag_df)


In [0]:
from pyspark.sql.functions import col, datediff, current_date, try_divide
df_calc_cost = shipment_flag_df.withColumn( "cost_per_kg", try_divide(col("shipment_cost"), col("shipment_weight_kg")) )
df_calc_date_diff = df_calc_cost.withColumn( "days_since_shipment", datediff(current_date(), col("shipment_date").cast("date")) )
df_calc = df_calc_date_diff.withColumn( "tax_amount", col("shipment_cost") * 0.18 )
display(df_calc)


Remove/Eliminate (drop, select, selectExpr)
Excluding unnecessary or redundant columns to optimize storage and privacy.
Source File: DF of logistics_source1 and logistics_source2

1. Remove Redundant Name Columns

Scenario: Since we have already created the full_name column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
Action: Drop the first_name and last_name columns.
Logic: df.drop("first_name", "last_name")

In [0]:
display(ad_full_n_column)

Splitting & Merging/Melting of Columns
Reshaping columns to extract hidden values or combine fields for better analysis.
Source File: DF of logistics_shipment_detail_3000.json
1. Splitting (Extraction) Breaking one column into multiple to isolate key information.

Split Order Code:
Action: Split order_id ("ORD100000") into two new columns:
order_prefix ("ORD")
order_sequence ("100000")
Split Date:
Action: Split shipment_date into three separate columns for partitioning:
ship_year (2024)
ship_month (4)

In [0]:
from pyspark.sql.functions import substring,length,col,day
#display(df_calc)
split_df1=df_calc.withColumn("order_prefix",substring(col("order_id"),0,3))
split_df2=df_calc.withColumn("order_sequence",substring(col("order_id"),4,length(col("order_id"))))
#display(split_df2)
split_df3=split_df2.withColumn("shipment_day",day(col("shipment_date")))
display(split_df3)


display(ad_full_n_column)
3. Data Customization & Processing - Application of Tailored Business Specific Rules
UDF1: Complex Incentive Calculation
Scenario: The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

Action: Create a Python function calculate_bonus(role, age) and register it as a Spark UDF.

Logic:

IF Role == 'Driver' AND Age > 50:
Bonus = 15% of Salary (Reward for Seniority)
IF Role == 'Driver' AND Age < 30:
Bonus = 5% of Salary (Encouragement for Juniors)
ELSE:
Bonus = 0
Result: A new derived column projected_bonus is generated for every row in the dataset.

In [0]:
def bonus_calc(role, age, sr_salary,jr_salary): 
    if role == "driver" and age > 50:     
        return sr_salary * 0.15
    elif role == "driver" and age< 30:     
        return jr_salary * 0.05 
    else:
         return 0

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

bonus_udf = udf(bonus_calc, DoubleType())


In [0]:
df_with_bonus = rename.withColumn(
    "projected_bonus",
    bonus_udf(col("role"), col("age"), lit(20000),lit(10000))
)
display(df_with_bonus)

In [0]:

display(rename)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def mask_identity(name):
    if name is None or len(name) < 3:
        return name
    return name[:2] + "****" + name[-1]

mask_udf = udf(mask_identity, StringType())

df_masked = df_with_bonus.withColumn(
    "masked_name",
    mask_udf(col("staff_first_name"))
)
display(df_masked)

Data Core Curation & Processing (Pre-Wrangling)
Applying business logic to focus, filter, and summarize data before final analysis.

1. Select (Projection)
Source Files: DF of logistics_source1 and logistics_source2

Scenario: The Driver App team only needs location data, not sensitive HR info.
Action: Select only first_name, role, and hub_location.
2. Filter (Selection)
Source File: DF of json

Scenario: We need a report on active operational problems.
Action: Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.
Scenario: Insurance audit for senior staff.
Action: Filter rows where age > 50.
3. Derive Flags & Columns (Business Logic)
Source File: DF of json

Scenario: Identify high-value shipments for security tracking.
Action: Create flag is_high_value = True if shipment_cost > 50,000.
Scenario: Flag weekend operations for overtime calculation.
Action: Create flag is_weekend = True if day is Saturday or Sunday.
4. Format (Standardization)
Source File: DF of json

Scenario: Finance requires readable currency formats.
Action: Format shipment_cost to string like "₹30,695.80".
Scenario: Standardize city names for reporting.
Action: Format source_city to Uppercase (e.g., "chennai" → "CHENNAI").
5. Group & Aggregate (Summarization)
Source Files: DF of logistics_source1 and logistics_source2

Scenario: Regional staffing analysis.
Action: Group by hub_location and Count the number of staff.
Scenario: Fleet capacity analysis.
Action: Group by vehicle_type and Sum the shipment_weight_kg.
6. Sorting (Ordering)
Source File: DF of json

Scenario: Prioritize the most expensive shipments.
Action: Sort by shipment_cost in Descending order.
Scenario: Organize daily dispatch schedule.
Action: Sort by shipment_date (Ascending).
7. Limit (Top-N Analysis)
Source File: DF of json

Scenario: Dashboard snapshot of critical delays.
Action: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.

In [0]:
location_df=df_masked.select("staff_first_name","role","origin_hub_city")
insurance_df=df_masked.select("*").where(col("age")>50)
display(location_df)
display(insurance_df)


2. Filter (Selection)
Source File: DF of json

Scenario: We need a report on active operational problems.
Action: Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.
Scenario: Insurance audit for senior staff.
Action: Filter rows where age > 50.

In [0]:
filter_df1=split_df3.select("*").where((col("shipment_status")=="delayed") | (col("shipment_status")=="returned"))
display(filter_df1)


3. Derive Flags & Columns (Business Logic)
Source File: DF of json

Scenario: Identify high-value shipments for security tracking.
Action: Create flag is_high_value = True if shipment_cost > 50,000.
Scenario: Flag weekend operations for overtime calculation.
Action: Create flag is_weekend = True if day is Saturday or Sunday.

In [0]:
from pyspark.sql.functions import col,when
derive_col=filter_df1.withColumn("is_high_value",when(col("shipment_cost")>50000,True).otherwise(False))
display(derive_col)

Scenario: Finance requires readable currency formats.
Action: Format shipment_cost to string like "₹30,695.80".
Scenario: Standardize city names for reporting.
Action: Format source_city to Uppercase (e.g., "chennai" → "CHENNAI").

In [0]:
from pyspark.sql.functions import format_number, concat, lit, col 
df_currency = derive_col.withColumn( "shipment_cost_fmt", concat( lit("₹"), format_number(col("shipment_cost"), 2) ) )
display(df_currency)
sort_shipment_cost = df_currency.orderBy(col("shipment_cost").desc())
display(sort_shipment_cost)
daily_dispatch = sort_shipment_cost.orderBy(col("shipment_date").asc())
display(daily_dispatch)

In [0]:
#Scenario: Dashboard snapshot of critical delays.
#Action: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.
dashboard_df=daily_dispatch.select("*").where(col("shipment_status")=="delayed").orderBy(col("shipment_cost").desc()).limit(10)
display(dashboard_df)   

 Group & Aggregate (Summarization)
Source Files: DF of logistics_source1 and logistics_source2

Scenario: Regional staffing analysis.
Action: Group by hub_location and Count the number of staff.
Scenario: Fleet capacity analysis.
Action: Group by vehicle_type and Sum the shipment_weight_kg.

In [0]:
from pyspark.sql.functions import countDistinct, col
Reg_analysis=df_masked.groupBy("origin_hub_city").agg(countDistinct("shipment_id").alias("Total staff"))
display(Reg_analysis)

In [0]:
#Join Operation
staff_df=df_masked.select("*")
shipment_df=daily_dispatch.select("*")
# Join Operation 
inner_join_df = ( staff_df .join(shipment_df, on="shipment_id", how="inner") ) 
display(inner_join_df)

In [0]:
left_join_df = ( staff_df .join(shipment_df, on="shipment_id", how="left") ) .where(shipment_df.shipment_id.isNull())
display(left_join_df)
right_join_df = ( staff_df .join(shipment_df, on="shipment_id", how="right") )
display(right_join_df)

In [0]:
staff_a = staff_df.alias("a")
staff_b = staff_df.alias("b")

peer_pairs = (
    staff_a.join(
        staff_b,
        (col("a.origin_hub_city") == col("b.origin_hub_city")) &
        (col("a.shipment_id") != col("b.shipment_id")),
        "inner"
    )
    .select(
        col("a.shipment_id").alias("staff_id_A"),
        col("b.shipment_id").alias("staff_id_B"),
        col("a.origin_hub_city")
    )
)

display(peer_pairs)


In [0]:
#Full Outer Join (Reconciliation):
#Scenario: A complete audit to find both idle drivers AND unassigned shipments in one view.
#Action: Perform a Full Outer Join on shipment_id.
full_outer_join_df = ( staff_df .join(shipment_df, on="shipment_id", how="full") )
display(full_outer_join_df)


In [0]:
driver_df=staff_df.select("*").where(col("role")=="driver")
shipment_df=dashboard_df.join(driver_df)
display(shipment_df)

Left Semi Join (Existence Check):
Scenario: "Show me the details of Drivers who have at least one shipment." (Standard filtering).
Action: staff_df.join(shipments_df, "shipment_id", "left_semi").
Benefit: Performance optimization; it stops scanning the right table once a match is found.
Left Anti Join (Negation Check):
Scenario: "Show me the details of Drivers who have never touched a shipment."
Action: staff_df.join(shipments_df, "shipment_id", "left_anti").

In [0]:
semi_lft_df = ( driver_df .join(shipment_df, on="shipment_id", how="left_semi") )
display(semi_lft_df)
anti_lft_df = ( staff_df .join(shipment_df, on="shipment_id", how="left_anti") )
display(anti_lft_df)


Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv

Scenario: Validation. Check if the hub_location in the staff file exists in the dataframe of corporate Master_City_List.csv.
Action: Compare values against this Master_City_List list.


In [0]:

Master_city_df=spark.read.csv("/Volumes/workspace/wd36schema2/ingestion_volume/Master_City_List.csv",header=True,inferSchema=True)
Master_city_df_rn=Master_city_df.withColumnRenamed("city_name","origin_hub_city")
display(Master_city_df_rn)

In [0]:
semi_drvr_df=(staff_df.join(Master_city_df_rn,on="origin_hub_city",how="left_semi"))
display(semi_drvr_df)

In [0]:
s=staff_df.alias("s")
m=Master_city_df_rn.alias("m")
geo_enriched_df = ( s.join( m, on="origin_hub_city", how="left" ) .select(col( "s.*"), col("m.Latitude").alias("Lat"), col("m.Longitude").alias("Long" ) ))
display(geo_enriched_df)

Scenario: Creating a "Gold Layer" Table for PowerBI/Tableau.
Action: Flatten the Star Schema. Join Staff, Shipments, and Vehicle_Master into one wide table (wide_shipment_history) so analysts don't have to perform joins during reporting.

In [0]:
from pyspark.sql.functions import col, concat, lit
Vehicle_df = staff_df.withColumn("vehicle_identifier", concat(col("vehicle_type"), lit("_"), col("shipment_id")))
display(Vehicle_df)

# Alias both DataFrames to avoid ambiguity
a = staff_df.alias("a")
b = shipment_df.alias("b")
denormalized_tbl = a.join(b, on="shipment_id", how="full").select(col("a.*"), col("b.age").alias("shipment_age"))
display(denormalized_tbl)

denormalized_tbl.write.saveAsTable("workspace.wd36schema2.wide_shipment_history", mode="overwrite")
display(spark.sql("select * from workspace.wd36schema2.wide_shipment_history"))

Source Files:
DF of logistics_source2: Provides hub_location (Partition Key).
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)

Scenario: "Who are the Top 3 Drivers by Cost in each Hub?"
Action:
Partition by hub_location.
Order by total_shipment_cost Descending.
Apply dense_rank() and `row_number()
Filter where rank or row_number <= 3.

In [0]:
from pyspark.sql.functions import row_number, desc, dense_rank, substring, length, col, regexp_replace
from pyspark.sql.window import Window

a = staff_df.alias("a")
b = shipment_df.alias("b")
orderjoineddf = (
    a.join(b, on="origin_hub_city", how="inner")
     .select(
         col("a.*"),
         col("b.shipment_cost_fmt"),
         col("b.shipment_date"),
         regexp_replace(substring(col("b.shipment_cost_fmt"), 2, length(col("b.shipment_cost_fmt"))), ",", "").cast("double").alias("shipment_cost"),
     )
)
display(orderjoineddf)




sk_orderjoinedf = orderjoineddf.withColumn(
    "seqnum",
    row_number().over(Window.partitionBy("origin_hub_city").orderBy(desc("shipment_cost_fmt")))
)
display(sk_orderjoinedf)

Rk_orderjoinedf = orderjoineddf.withColumn(
    "Rank",
    dense_rank().over(Window.partitionBy("origin_hub_city").orderBy(desc("shipment_cost_fmt")))
)
display(Rk_orderjoinedf)

first_3_seq_num = sk_orderjoinedf.filter(sk_orderjoinedf.seqnum <= 3)
display(first_3_seq_num)

first_3_rank = Rk_orderjoinedf.filter(Rk_orderjoinedf.Rank <= 3)
display(first_3_rank)





In [0]:
from pyspark.sql.functions import lead,lag,asc
sk_orderjoinedf1=orderjoineddf.withColumn("nexttransamt",lead("shipment_cost_fmt",1,-1).over(Window.partitionBy("shipment_id").orderBy(asc("shipment_date")))).withColumn("priortransamt",lag("shipment_cost_fmt",1,-1).over(Window.partitionBy("shipment_id").orderBy(asc("shipment_date"))))
display(sk_orderjoinedf1)

pattern_df = sk_orderjoinedf1.withColumn(
    "transpattern",
    when((col("priortransamt") == -1), "first transaction").
    when(col("shipment_cost_fmt") > col("priortransamt"), "increase").
    when(col("shipment_cost_fmt") < col("priortransamt"), "decrease"). # This line was cut off
    otherwise("no change") # Added a default condition for when amt == priortransamt
)


In [0]:
a=master_v1.select("shipment_id")
b=master_v2.select("shipment_id")
intersectdf=a.intersect(b)
print("intersection",intersectdf.count())
intersectdf.show(100)

subtractdf=b.subtract(a)
print("df1 subtraction df2",subtractdf.count())
subtractdf.show(100)

In [0]:
from pyspark.sql import functions as F

# Cast shipment_cost_fmt to double, using try_cast logic


rollupdf = orderjoineddf.rollup("origin_hub_city", "vehicle_type")  .agg(F.sum("shipment_cost").alias("sumamt"), F.avg("shipment_cost").alias("avgamt")) .orderBy("origin_hub_city", "sumamt")
display(rollupdf)
cubedf=orderjoineddf.cube("origin_hub_city","vehicle_type").agg(F.sum("shipment_cost").alias("sumamt"),F.avg("shipment_cost").alias("avgamt")).orderBy("origin_hub_city","sumamt")
display(cubedf)
