####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
# ============================================================
# PYSPARK EDA - Logistics Data Analysis
# ============================================================

# 1. Read CSV and create DataFrame with renamed columns
df = spark.read.csv(
    "/Volumes/lakehouse1/dbread/read_volume/logistics/logistics_source1.txt",
    header=True,           # First row is column names
    inferSchema=True       # Automatically detect data types
).toDF("Ship_id", "fname", "lname", "age", "role")

# ============================================================
# 2. Display the actual data (first 20 rows)
# ============================================================
display(df)

# ============================================================
# 3. View the Schema (column names and their data types)
# ============================================================
print("SCHEMA DETAILS:")
df.printSchema()

# ============================================================
# 4. Show Data Types of each column
# ============================================================
print("\nDATA TYPES:")
print(df.dtypes)

# ============================================================
# 5. Show all column names
# ============================================================
print("\nCOLUMN NAMES:")
print(df.columns)

# ============================================================
# 6. Check for Duplicate Records (by Ship_id)
# ============================================================
print("\nDUPLICATE CHECK - Ship_ids appearing more than once:")
df.groupBy("Ship_id").count().filter("count > 1").show()

# ============================================================
# 7. Show Summary Statistics (count, mean, stddev, min, max)
# ============================================================
print("\nDATAFRAME SUMMARY:")
df.summary().show()



# ============================================================
# BONUS: Additional useful checks
# ============================================================

from pyspark.sql.functions import sum, col

# Check total rows
print(f"Total Rows: {df.count()}")

# Check null values in each column
print(f"\nNull Value Count by Column:")
df.select([
    sum(col(c).isNull().cast("int")).alias(c) 
    for c in df.columns
]).show()

In [0]:
# ============================================================
# 1. LOAD BOTH FILES
# ============================================================

# Load logistics_source1 (master_v1)
master_v1 = spark.read.csv(
    "/Volumes/lakehouse1/dbread/read_volume/logistics/logistics_source1.txt",
    header=True,
    inferSchema=True
).toDF("Ship_id", "fname", "lname", "age", "role")

# Load logistics_source2 (master_v2)
master_v2 = spark.read.csv(
    "/Volumes/lakehouse1/dbread/read_volume/logistics/logistics_source2.txt",
    header=True,
    inferSchema=True
).toDF("Ship_id", "fname", "lname", "age", "role","location","vhicle_type")

print("="*60)
print("MASTER_V1 - First 5 rows:")
print("="*60)
master_v1.show(5)

print("\n" + "="*60)
print("MASTER_V2 - First 5 rows:")
print("="*60)
master_v2.show(5)


In [0]:

# ============================================================
# FIND COMMON SHIPMENT IDs BETWEEN TWO DATASETS
# ============================================================

# Extract unique Ship_ids from both datasets
v1_ids = master_v1.select("Ship_id").distinct()
v2_ids = master_v2.select("Ship_id").distinct()

# Find common Ship_ids (intersection - present in both)
common_ids = v1_ids.intersect(v2_ids)

# Display results
print(f"Common Shipment IDs: {common_ids.count()}")
common_ids.show()



In [0]:
from pyspark.sql.functions import col

non_numeric=master_v1.filter(col("Ship_id").rlike("^[0-9]"))
non_numeric.show()

If you want the COUNT (interview-friendly)

In [0]:
non_numeric_count = master_v1.filter(
    ~col("Ship_id").rlike("^[0-9]+$")
).count()

non_numeric_count2 = master_v2.filter(
    ~col("Ship_id").rlike("^[0-9]+$")
).count()


print("Non-numeric Ship_id count:", non_numeric_count)
print("Non-numeric Ship_id count:", non_numeric_count2)


age is not an integer

In [0]:
invalid_age_count = master_v1.filter(
    ~col("age").cast("string").rlike("^[0-9]+$")
).count()

print("Non-integer age count:", invalid_age_count)


In [0]:
invalid_age_count2 = master_v2.filter(
    ~col("age").cast("string").rlike("^[0-9]+$")
).count()

print("Non-integer age count:", invalid_age_count2)

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
# ============================================================
# DATA INTEGRATION - Combining Two Sources with Schema Alignment
# ============================================================

from pyspark.sql.functions import lit, col

# ============================================================
# STEP 1: Read both files without enforcing schema
# ============================================================

df_s1 = spark.read.csv(
    "/Volumes/lakehouse1/dbread/read_volume/logistics/logistics_source1.txt",
    header=True
)

df_s2 = spark.read.csv(
    "/Volumes/lakehouse1/dbread/read_volume/logistics/logistics_source2.txt",
    header=True
)

print("SOURCE 1 - Columns:")
print(df_s1.columns)
df_s1.show()

print("\nSOURCE 2 - Columns:")
print(df_s2.columns)
df_s2.show()

# ============================================================
# STEP 2: Align columns to canonical schema
# ============================================================
# Both sources already have: shipment_id, first_name, last_name, age, role
# Need to add: hub_location, vehicle_type (if missing)

df_s1_aligned = df_s1.select(
    col("shipment_id"),
    col("first_name"),
    col("last_name"),
    col("age"),
    col("role"), 
)

df_s2_aligned = df_s2.select(
    col("shipment_id"),
    col("first_name"),
    col("last_name"),
    col("age"),
    col("role"),
    col("hub_location"),
    col("vehicle_type")
)

# ============================================================
# STEP 3: Add data_source column
# ============================================================

df_s1_with_source = df_s1_aligned.withColumn("data_source", lit("system1"))
df_s2_with_source = df_s2_aligned.withColumn("data_source", lit("system2"))

print("\nSOURCE 1 - With data_source:")
df_s1_with_source.show()

print("\nSOURCE 2 - With data_source:")
df_s2_with_source.show()

# ============================================================
# STEP 4: Merge both dataframes
# ============================================================

df_merged = df_s1_with_source.unionByName(
    df_s2_with_source,
    allowMissingColumns=True
)

print("\n" + "="*60)
print("MERGED DATA - Final Schema")
print("="*60)
display(df_merged)

print(f"\nTotal rows: {df_merged.count()}")

#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
# Clean data by dropping NULL records
df_cleaned = df_merged.dropna(
    how="any",
    subset=["shipment_id", "role"]
).dropna(
    how="all",
    subset=["first_name", "last_name"]
)

print(f"Cleaned rows: {df_cleaned.count()}")
display(df_cleaned)

In [0]:
from pyspark.sql.functions import col, when

# All scrubbing steps in one operation
df_scrubbed = df_cleaned.fillna({
    "age": -1,
    "vehicle_type": "UNKNOWN"
}).withColumn(
    "age",
    when((col("age") == "ten") | (col("age") == ""), -1).otherwise(col("age"))
).withColumn(
    "vehicle_type",
    when(col("vehicle_type") == "Truck", "LMV")
    .when(col("vehicle_type") == "Bike", "TwoWheeler")
    .otherwise(col("vehicle_type"))
)

display(df_scrubbed)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
# ============================================================
# SHIPMENT DETAILS DATA - Create DataFrame from JSON
# ============================================================

# Read multi-line JSON file
df_shipment = spark.read.json(
    "/Volumes/lakehouse1/dbread/read_volume/logistics/logistics_shipment_detail_3000.json",
    multiLine=True
)

# Display data and schema
print("SHIPMENT DETAILS DATA:")
display(df_shipment)

print("\nSCHEMA:")
df_shipment.printSchema()

print(f"\nTotal records: {df_shipment.count()}")


In [0]:
# ============================================================
# SHIPMENT DETAILS ENRICHMENT - Add Metadata Columns
# ============================================================

from pyspark.sql.functions import lit, current_timestamp

# ============================================================
# Add enrichment columns:
# 1. domain: "Logistics"
# 2. ingestion_timestamp: Current timestamp when data was loaded
# 3. is_expedited: False (default value)
# ============================================================

df_shipment_enriched = df_shipment.withColumn(
    "domain",
    lit("Logistics")
).withColumn(
    "ingestion_timestamp",
    current_timestamp()
).withColumn(
    "is_expedited",
    lit(False)
)

print("ENRICHED SHIPMENT DATA:")
display(df_shipment_enriched)

# ============================================================
# Verify enriched columns
# ============================================================

print("\n" + "="*60)
print("SCHEMA - With Enrichment Columns")
print("="*60)

df_shipment_enriched.printSchema()

print(f"\nTotal records: {df_shipment_enriched.count()}")

In [0]:
# ============================================================
# COLUMN UNIFORMITY - Standardize Text Case Across Sources
# ============================================================

from pyspark.sql.functions import col, lower, upper, initcap

# ============================================================
# SOURCE 1: df_scrubbed (Merged logistics_source1 & source2)
# ============================================================

# Convert role to LOWERCASE
# Convert hub_location to INITCAP (First Letter Capitalized)

df_scrubbed_uniform = df_scrubbed.withColumn(
    "role",
    lower(col("role"))
).withColumn(
    "hub_location",
    initcap(col("hub_location"))
)

print("STANDARDIZED DATA - df_scrubbed:")
print("="*60)
df_scrubbed_uniform.show()

print("\nUnique ROLE values (lowercase):")
df_scrubbed_uniform.select("role").distinct().show()

print("\nUnique HUB_LOCATION values (initcap):")
df_scrubbed_uniform.select("hub_location").distinct().show()

# ============================================================
# SOURCE 2: df_shipment_enriched (JSON shipment details)
# ============================================================

# Convert vehicle_type to UPPERCASE

df_shipment_uniform = df_shipment_enriched.withColumn(
    "vehicle_type",
    upper(col("vehicle_type"))
)

print("\n" + "="*60)
print("STANDARDIZED DATA - df_shipment_enriched:")
print("="*60)
df_shipment_uniform.show()

print("\nUnique VEHICLE_TYPE values (uppercase):")
df_shipment_uniform.select("vehicle_type").distinct()


Format Standardization:<BR>
Source Files: DF of logistics_shipment_detail_3000.json<BR>
Convert shipment_date to yyyy-MM-dd<BR>
Ensure shipment_cost has 2 decimal precision<BR>

In [0]:
# ============================================================
# FORMAT STANDARDIZATION - Date & Currency Formatting
# ============================================================

from pyspark.sql.functions import col, try_to_date, round as spark_round

# ============================================================
# SOURCE: df_shipment_enriched (logistics_shipment_detail_3000.json)
# ============================================================

# ============================================================
# STEP 1: Convert shipment_date to yyyy-MM-dd format
# Use try_to_date() to safely handle invalid dates
# ============================================================

# ============================================================
# STEP 2: Ensure shipment_cost has 2 decimal precision
# Cast to double and round to 2 decimal places
# ============================================================

df_shipment_formatted = df_shipment_enriched.withColumn(
    "shipment_date",
    try_to_date(col("shipment_date"), "dd-MM-yy")
).withColumn(
    "shipment_cost",
    spark_round(col("shipment_cost").cast("double"), 2)
)

print("FORMATTED DATA - Date & Currency Standardized:")
print("="*60)
display(df_shipment_formatted)

# ============================================================
# VERIFICATION - Check formatting quality
# ============================================================

print("\n" + "="*60)
print("SHIPMENT DATE VALUES (yyyy-MM-dd format):")
print("="*60)

df_shipment_formatted.select("shipment_date").show(5)

print("\n" + "="*60)
print("SHIPMENT COST VALUES (2 decimal precision):")
print("="*60)

df_shipment_formatted.select("shipment_cost").show(5)

# ============================================================
# DATE PARSING QUALITY CHECK
# ============================================================

total_rows = df_shipment_formatted.count()
null_dates = df_shipment_formatted.filter(col("shipment_date").isNull()).count()
valid_dates = total_rows - null_dates

print("\n" + "="*60)
print("DATA QUALITY SUMMARY:")
print("="*60)

print(f"Total records: {total_rows}")
print(f"Valid dates: {valid_dates}")
print(f"Invalid dates (NULL): {null_dates}")

# ============================================================
# SCHEMA - Verify data types
# ============================================================

print("\n" + "="*60)
print("SCHEMA - Data Types After Formatting")
print("="*60)

df_shipment_formatted.printSchema()

In [0]:
# ============================================================
# COMPLETE TYPE CASTING - All conversions for merged data
# ============================================================

from pyspark.sql.functions import col

df_type_casted = df_scrubbed_uniform.withColumn(
    "age",
    col("age").cast("int")
)

print("TYPE CASTED DATA:")
display(df_type_casted)

print("\nDATATYPES:")
df_type_casted.printSchema()

In [0]:
# This works with older Spark versions too
df_renamed = df_type_casted.withColumnRenamed(
    "first_name",
    "staff_first_name"
).withColumnRenamed(
    "last_name",
    "staff_last_name"
).withColumnRenamed(
    "hub_location",
    "origin_hub_city"
)

display(df_renamed)

In [0]:
# ============================================================
# COLUMN REORDERING - Logical & Standard Format
# ============================================================

from pyspark.sql.functions import col

# ============================================================
# COMBINE DATA FROM ALL SOURCES
# Join df_renamed (merged sources) with df_shipment_formatted (JSON)
# ============================================================

# Join on shipment_id
df_combined = df_renamed.join(
    df_shipment_formatted,
    on="shipment_id",
    how="inner"
)

# ============================================================
# REORDER COLUMNS - Logical Standard Format
# ============================================================

# Column Categories:
# 1. IDENTIFIER: shipment_id
# 2. DIMENSIONS: staff_first_name, staff_last_name, role
# 3. LOCATION: origin_hub_city
# 4. METRICS: shipment_cost
# 5. AUDIT: ingestion_timestamp

df_final = df_combined.select(
    col("shipment_id"),
    col("staff_first_name"),
    col("staff_last_name"),
    col("role"),
    col("origin_hub_city"),
    col("shipment_cost"),
    col("ingestion_timestamp")
)

print("FINAL DATA - Reordered Columns:")
print("="*60)
display(df_final)

# ============================================================
# VERIFICATION - Schema with logical order
# ============================================================

print("\n" + "="*60)
print("SCHEMA - Ordered by Category")
print("="*60)

df_final.printSchema()

print("\n" + "="*60)
print("COLUMN ORDER:")
print("="*60)

for i, col_name in enumerate(df_final.columns, 1):
    print(f"{i}. {col_name}")

# ============================================================
# DATA SUMMARY
# ============================================================

print("\n" + "="*60)
print("DATA SUMMARY:")
print("="*60)

print(f"Total records: {df_final.count()}")
print(f"Total columns: {len(df_final.columns)}")

print("\nFirst 5 records:")
df_final.show(5, truncate=False)

In [0]:
# ============================================================
# DE-DUPLICATION - Remove Duplicates at Record & Key Level
# ============================================================

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# ============================================================
# STEP 1: RECORD LEVEL DE-DUPLICATION
# Remove completely duplicate rows (all columns identical)
# ============================================================

df_record_dedup = df_final.dropDuplicates()

print("AFTER RECORD LEVEL DE-DUPLICATION:")
print("="*60)

original_count = df_final.count()
after_record_dedup = df_record_dedup.count()
duplicates_removed = original_count - after_record_dedup

print(f"Original records: {original_count}")
print(f"After removing duplicates: {after_record_dedup}")
print(f"Duplicate rows removed: {duplicates_removed}")

display(df_record_dedup)

# ============================================================
# STEP 2: COLUMN LEVEL DE-DUPLICATION (PRIMARY KEY)
# Keep only 1 record per shipment_id (Primary Key)
# If multiple records exist for same shipment_id, keep first one
# ============================================================

# Define window - partition by shipment_id, order by ingestion_timestamp
window_spec = Window.partitionBy("shipment_id").orderBy("ingestion_timestamp")

# Add row number for each shipment_id
df_with_row_num = df_record_dedup.withColumn(
    "row_num",
    row_number().over(window_spec)
)

# Keep only row_num = 1 (first record per shipment_id)
df_pk_dedup = df_with_row_num.filter(col("row_num") == 1).drop("row_num")

print("\n" + "="*60)
print("AFTER PRIMARY KEY DE-DUPLICATION (shipment_id):")
print("="*60)

after_pk_dedup = df_pk_dedup.count()
pk_duplicates_removed = after_record_dedup - after_pk_dedup

print(f"Before PK de-duplication: {after_record_dedup}")
print(f"After PK de-duplication: {after_pk_dedup}")
print(f"Duplicate shipment_ids removed: {pk_duplicates_removed}")

display(df_pk_dedup)

# ============================================================
# VERIFICATION - Check for remaining duplicates
# ============================================================

print("\n" + "="*60)
print("FINAL DATA QUALITY CHECK:")
print("="*60)

# Check for duplicate shipment_ids
duplicates_check = df_pk_dedup.groupBy("shipment_id").count().filter("count > 1")

print(f"Duplicate shipment_ids remaining: {duplicates_check.count()}")

if duplicates_check.count() == 0:
    print("✓ No duplicate shipment_ids (Primary Key Enforced!)")
else:
    print("⚠️ Duplicate shipment_ids found:")
    duplicates_check.show()

# ============================================================
# SUMMARY
# ============================================================

print("\n" + "="*60)
print("DE-DUPLICATION SUMMARY:")
print("="*60)

print(f"""
Starting records: {original_count}
After record-level de-dup: {after_record_dedup} (removed {duplicates_removed})
After PK de-dup: {after_pk_dedup} (removed {pk_duplicates_removed})

Final clean dataset: {after_pk_dedup} records
""")

# Final schema
print("\nFINAL SCHEMA:")
df_pk_dedup.printSchema()

%md
##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
# ============================================================
# DATA ENRICHMENT - Add Audit Timestamp Column
# ============================================================

from pyspark.sql.functions import current_timestamp, concat, col, lit

# Add load_dt column to track when records were ingested
df_source_enriched = df_renamed.withColumn(
    "load_dt",
    current_timestamp()
).withColumn(
    "full_name",
    concat(col("staff_first_name"), lit(" "), col("staff_last_name"))
)
# Display the data
display(df_source_enriched )

# Check the new column
print(f"Total records: {df_source_enriched .count()}")
print(f"Total columns: {len(df_source_enriched .columns)}")
df_source_enriched.printSchema()

In [0]:
# ============================================================
# DATA ENRICHMENT - Time Intelligence, Business Calculations & Splitting
# ============================================================

from pyspark.sql.functions import concat, col, lit, year, month, day, dayofweek, round as spark_round, when, datediff, current_date, substring

# Extract temporal features, status flags, calculate metrics, and split columns
df_shipment_enriched = df_shipment_formatted.withColumn(
    "route_segment",
    concat(col("source_city"), lit("-"), col("destination_city"))
).withColumn(
    "vehicle_identifier",
    concat(col("vehicle_type"), lit("_"), col("shipment_id"))
).withColumn(
    "ship_year",
    year(col("shipment_date"))
).withColumn(
    "ship_month",
    month(col("shipment_date"))
).withColumn(
    "ship_day",
    day(col("shipment_date"))
).withColumn(
    "is_weekend",
    (dayofweek(col("shipment_date")) == 1) | (dayofweek(col("shipment_date")) == 7)
).withColumn(
    "shipment_status_flag",
    (col("shipment_status") == "IN_TRANSIT") | (col("shipment_status") == "DELIVERED")
).withColumn(
    "cost_per_kg",
    when(col("shipment_weight_kg") != 0, spark_round(col("shipment_cost") / col("shipment_weight_kg"), 2)).otherwise(None)
).withColumn(
    "days_since_shipment",
    datediff(current_date(), col("shipment_date"))
).withColumn(
    "tax_amount",
    spark_round(col("shipment_cost") * 0.18, 2)
).withColumn(
    "order_prefix",
    substring(col("order_id"), 1, 3)
).withColumn(
    "order_sequence",
    substring(col("order_id"), 4, 10)
)

# Display the data
display(df_shipment_enriched)

# Check the new columns
print(f"Total records: {df_shipment_enriched.count()}")
print(f"Total columns: {len(df_shipment_enriched.columns)}")
df_shipment_enriched.printSchema()

In [0]:
# ============================================================
# DATA OPTIMIZATION - Remove Redundant Columns
# ============================================================

# Drop individual name columns (redundant with full_name)
df_source_optimized = df_source_enriched.drop(
    "staff_first_name",
    "staff_last_name"
)

# Display the data
display(df_source_optimized)

# Check removed columns
print(f"Total records: {df_source_optimized.count()}")
print(f"Total columns: {len(df_source_optimized.columns)}")
df_source_optimized.printSchema()

## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
# ============================================================
# CUSTOM BUSINESS LOGIC - Performance Bonus Calculation UDF
# ============================================================

from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType

# Define Python function for bonus calculation
def calculate_bonus(role, age):
    if role and role.lower() == "driver":
        if role == 'driver' and age > 30:
            return 0.15  # 15% for senior drivers
        elif role == 'driver' and age <=30:
            return 0.05  # 5% for junior drivers
    return 0.0

# Register as Spark UDF
bonus_udf = udf(calculate_bonus, DoubleType())

# Apply UDF to create projected_bonus column
df_customized = df_source_optimized.withColumn(
    "projected_bonus",
    bonus_udf(col("role"), col("age"))
)

# Display the data
display(df_customized)

# Check the new column
print(f"Total records: {df_customized.count()}")
print(f"Total columns: {len(df_customized.columns)}")
df_customized.printSchema()