### Reading data from bronze layer

In [0]:
df_cust=spark.read.table("transportation_service_company.bronze.customers")
df_cust.limit(5).display()

customer_id,first_name,last_name,email,phone_number,city,signup_date,last_updated_timestamp
1,Daniel,Reed,azimmerman@ramirez-nelson.com,811-724-1080,Tiffanyview,2023-12-25,9/15/2025 15:21
2,Jonathan,Hansen,williammiller@serrano-jones.com,632-190-4027x0198,Baldwinburgh,2021-01-23,9/19/2025 20:19
3,Samuel,Rodriguez,troy07@carrillo-webb.info,676.094.0716,New Ashley,2025-05-31,9/13/2025 3:45
4,Victor,Sanchez,jonthompson@thomas.biz,960-929-2694,Stephanieton,2024-08-31,8/23/2025 0:38
5,Sabrina,Black,patricknewman@williams.biz,+1-621-699-9458x79470,South Christopherport,2023-05-18,8/20/2025 16:08


In [0]:
df_cust.limit(1).display()

customer_id,first_name,last_name,email,phone_number,city,signup_date,last_updated_timestamp
1,Daniel,Reed,azimmerman@ramirez-nelson.com,811-724-1080,Tiffanyview,2023-12-25,9/15/2025 15:21


In [0]:
df_cust.limit(1).display()

customer_id,email,phone_number,city,signup_date,last_updated_timestamp,domain,full_name,current_timestamp
1,azimmerman@ramirez-nelson.com,8117241080,Tiffanyview,2023-12-25,9/15/2025 15:21,ramirez-nelson.com,Daniel Reed,2026-01-28T16:53:53.659Z


In [0]:
from typing import List
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from delta.tables import DeltaTable

### This class for generic transformation logic which we will apply on all our tables 

In [0]:
class transformations:
# the purpose of dedup clear duplicated and keep the latest record 
    def dedup(self, df: DataFrame, dedup_cols: list, cdc: str):
        """ Remove duplicate rows based on specified columns, keeping the earliest record by CDC timestamp.
        Args:
            df: Input DataFrame
            dedup_cols: List of column names to use for identifying duplicates
            cdc: Name of the CDC/timestamp column for ordering
            Returns:
            DataFrame with duplicates removed
        """
        # Create composite key from dedup columns
        df=df.withColumn("dedupkey",concat(*dedup_cols))
        
        # Assign row numbers within each dedupkey partition, ordered by CDC timestamp
        df = df.withColumn(
            "row_num", 
            row_number().over(Window.partitionBy("dedupkey").orderBy(col(cdc).desc()))
        )
        
        # Keep only the first occurrence (latest by CDC)
        df = df.filter(col("row_num") == 1)
        
        # Drop temporary columns
        df = df.drop("dedupkey", "row_num")
        return df
    
    ## we want to add a new column to the dataframe called current_timestamp which is the time df ingested to silver layer
    def process_timestamp(self,df):
        df=df.withColumn('current_timestamp',current_timestamp())

        return df


    def upsert(self, df, key_cols, table, cdc):
        # Build merge condition
        merge_condition = ' AND '.join([f"src.{i}=tgt.{i}" for i in key_cols])
        
        # Get Delta table
        dlt_obj = DeltaTable.forName(spark, f"transportation_service_company.silver.{table}")
        
        # Perform merge 
        dlt_obj.alias("tgt").merge(
            df.alias("src"),
            merge_condition
        ).whenMatchedUpdateAll(
        ).whenNotMatchedInsertAll(
        ).execute()
        
        return 1 




### **customers**

%md
%md
## First thing we want to do is get the domains of our customers.
## Why is it useful?
1. Marketing & Ads (Deliverability)
Ad Platform Targeting: You can create "Lookalike" audiences. If you notice your high-value customers use @company.com addresses versus @gmail.com, you can adjust your ad spend toward professional networks (like LinkedIn) versus social networks (like Instagram).


2. Business Analysis (B2B vs. B2C)
Customer Segmentation: You can easily separate Retail Customers (Gmail, Yahoo, iCloud) from Corporate/B2B Customers (company-specific domains).

3. Data Quality & Fraud
Spam Detection: A sudden spike in sign-ups from a strange, unknown domain (e.g., @xyz123.info) often indicates bot activity or a "disposable email" service used for fraud.

In [0]:
df_cust=df_cust.withColumn('domain',split('email','@')[1])


customer_id,first_name,last_name,email,phone_number,city,signup_date,last_updated_timestamp,domain
1,Daniel,Reed,azimmerman@ramirez-nelson.com,811-724-1080,Tiffanyview,2023-12-25,9/15/2025 15:21,ramirez-nelson.com
2,Jonathan,Hansen,williammiller@serrano-jones.com,632-190-4027x0198,Baldwinburgh,2021-01-23,9/19/2025 20:19,serrano-jones.com
3,Samuel,Rodriguez,troy07@carrillo-webb.info,676.094.0716,New Ashley,2025-05-31,9/13/2025 3:45,carrillo-webb.info
4,Victor,Sanchez,jonthompson@thomas.biz,960-929-2694,Stephanieton,2024-08-31,8/23/2025 0:38,thomas.biz
5,Sabrina,Black,patricknewman@williams.biz,+1-621-699-9458x79470,South Christopherport,2023-05-18,8/20/2025 16:08,williams.biz
6,Melissa,Blair,courtney29@gmail.com,(332)469-2812x91807,East Pamela,2025-09-17,8/24/2025 9:44,gmail.com
7,Jacqueline,Williams,robertmoore@gmail.com,7312603248,Fritzmouth,2023-05-29,8/28/2025 19:40,gmail.com
8,Christina,Arnold,parkerbridget@yahoo.com,(878)682-1357,North Sarah,2023-09-21,9/6/2025 20:36,yahoo.com
9,Lisa,Shelton,cody49@johnson.org,9349468560,South Kyleview,2022-12-13,9/8/2025 14:57,johnson.org
10,Megan,Dean,vritter@yahoo.com,001-967-061-8100x482,Port Jesse,2023-03-12,9/7/2025 23:05,yahoo.com


%md

### The phone number column has unwanted things want remove any thing except numbers

In [0]:
df_cust=df_cust.withColumn('phone_number',regexp_replace('phone_number',r'[^0-9]',''))
df_cust.limit(1).display()

customer_id,first_name,last_name,email,phone_number,city,signup_date,last_updated_timestamp,domain
1,Daniel,Reed,azimmerman@ramirez-nelson.com,8117241080,Tiffanyview,2023-12-25,9/15/2025 15:21,ramirez-nelson.com


%md
### concatinating first name and last name

In [0]:
df_cust=df_cust.withColumn('full_name',concat_ws(' ','first_name','last_name'))
df_cust=df_cust.drop('first_name','last_name')

df_cust.limit(10).display()

customer_id,email,phone_number,city,signup_date,last_updated_timestamp,domain,full_name
1,azimmerman@ramirez-nelson.com,8117241080,Tiffanyview,2023-12-25,9/15/2025 15:21,ramirez-nelson.com,Daniel Reed
2,williammiller@serrano-jones.com,63219040270198,Baldwinburgh,2021-01-23,9/19/2025 20:19,serrano-jones.com,Jonathan Hansen
3,troy07@carrillo-webb.info,6760940716,New Ashley,2025-05-31,9/13/2025 3:45,carrillo-webb.info,Samuel Rodriguez
4,jonthompson@thomas.biz,9609292694,Stephanieton,2024-08-31,8/23/2025 0:38,thomas.biz,Victor Sanchez
5,patricknewman@williams.biz,1621699945879470,South Christopherport,2023-05-18,8/20/2025 16:08,williams.biz,Sabrina Black
6,courtney29@gmail.com,332469281291807,East Pamela,2025-09-17,8/24/2025 9:44,gmail.com,Melissa Blair
7,robertmoore@gmail.com,7312603248,Fritzmouth,2023-05-29,8/28/2025 19:40,gmail.com,Jacqueline Williams
8,parkerbridget@yahoo.com,8786821357,North Sarah,2023-09-21,9/6/2025 20:36,yahoo.com,Christina Arnold
9,cody49@johnson.org,9349468560,South Kyleview,2022-12-13,9/8/2025 14:57,johnson.org,Lisa Shelton
10,vritter@yahoo.com,19670618100482,Port Jesse,2023-03-12,9/7/2025 23:05,yahoo.com,Megan Dean


In [0]:
cust_object= transformations()
df_cust=cust_object.dedup(df_cust,['customer_id'],'last_updated_timestamp')
df_cust=cust_object.process_timestamp(df_cust)



In [0]:
if not spark.catalog.tableExists("transportation_service_company.silver.customers"):
    df_cust.write.mode("append").format("delta").saveAsTable("transportation_service_company.silver.customers")
else:
    cust_object.upsert(df_cust, ['customer_id'],'customers', 'last_updated_timestamp')

### Drivers 

In [0]:
df_drivers=spark.read.table("transportation_service_company.bronze.drivers")
df_drivers.limit(5).display()

driver_id,first_name,last_name,phone_number,vehicle_id,driver_rating,city,last_updated_timestamp
1,Latasha,Lopez,262-924-2955x590,1,4.7,East Dorothy,2025-08-25T06:36:26.000Z
2,Alan,Wiley,0967969634,2,3.98,West Susan,2025-09-14T00:44:57.000Z
3,James,Taylor,424-614-1847,3,3.66,Mcintoshton,2025-08-26T22:28:17.000Z
4,Theresa,Benson,617-017-0101x91777,4,3.86,North Courtneychester,2025-09-01T11:40:55.000Z
5,Karen,Jensen,611-060-5683,5,4.87,Brownburgh,2025-09-04T16:35:04.000Z


In [0]:
df_drivers=df_drivers.withColumn('phone_number',regexp_replace('phone_number',r'[^0-9]',''))
df_drivers.limit(1).display()

driver_id,first_name,last_name,phone_number,vehicle_id,driver_rating,city,last_updated_timestamp
1,Latasha,Lopez,2629242955590,1,4.7,East Dorothy,2025-08-25T06:36:26.000Z


In [0]:
df_drivers=df_drivers.withColumn('full_name',concat_ws(' ','first_name','last_name'))
df_drivers=df_drivers.drop('first_name','last_name')

df_drivers.limit(10).display()

driver_id,phone_number,vehicle_id,driver_rating,city,last_updated_timestamp,full_name
1,2629242955590,1,4.7,East Dorothy,2025-08-25T06:36:26.000Z,Latasha Lopez
2,967969634,2,3.98,West Susan,2025-09-14T00:44:57.000Z,Alan Wiley
3,4246141847,3,3.66,Mcintoshton,2025-08-26T22:28:17.000Z,James Taylor
4,617017010191777,4,3.86,North Courtneychester,2025-09-01T11:40:55.000Z,Theresa Benson
5,6110605683,5,4.87,Brownburgh,2025-09-04T16:35:04.000Z,Karen Jensen
6,5564809096439,6,4.26,Port Williamland,2025-08-31T14:31:37.000Z,Debra Smith
7,179856869529778,7,3.67,West Erinborough,2025-09-17T05:57:45.000Z,Justin Peters
8,706321839008097,8,4.9,Lake Stephen,2025-08-26T06:45:51.000Z,Todd Young
9,17279105045499,9,4.5,West Lindsey,2025-08-27T13:04:32.000Z,Mary Young
10,5096134480,10,4.04,Lauraland,2025-09-09T05:50:23.000Z,Jacob Mack


In [0]:
drivers_object= transformations()
df_drivers=drivers_object.dedup(df_drivers,['driver_id'],'last_updated_timestamp')
df_drivers=drivers_object.process_timestamp(df_drivers)

In [0]:
if not spark.catalog.tableExists("transportation_service_company.silver.drivers"):
    df_drivers.write.mode("append").format("delta").saveAsTable("transportation_service_company.silver.drivers")
else:
    cust_object.upsert(df_drivers, ['driver_id'],'drivers', 'last_updated_timestamp')

### Locations

In [0]:
df_locations=spark.read.table("transportation_service_company.bronze.locations")
df_locations.limit(5).display()

location_id,city,state,country,latitude,longitude,last_updated_timestamp
1,Lake Davidport,Hawaii,Dominica,34.5682,151.8097,2025-09-03T13:37:24.000Z
2,Mccarthybury,Minnesota,Dominican Republic,71.6928,-162.2836,2025-09-10T10:54:18.000Z
3,Bellhaven,Arkansas,Japan,-53.3075,69.1475,2025-09-17T06:52:51.000Z
4,Moorechester,New Hampshire,Togo,77.4281,57.0052,2025-08-27T19:04:27.000Z
5,Glennview,North Carolina,Gambia,41.1122,-162.4836,2025-08-20T17:31:06.000Z


In [0]:
locations_object= transformations()
df_locations=locations_object.dedup(df_locations,['location_id'],'last_updated_timestamp')
df_locations=locations_object.process_timestamp(df_locations)

## UPSERT TO SILVER LAYER

In [0]:
if not spark.catalog.tableExists("transportation_service_company.silver.locations"):
    df_locations.write.mode("append").format("delta").saveAsTable("transportation_service_company.silver.locations")
else:
    locations_object.upsert(df_locations, ['location_id'],'locations', 'last_updated_timestamp')

### PAYMENTS

In [0]:
df_payments=spark.read.table("transportation_service_company.bronze.payments")
df_payments.limit(5).display()

payment_id,trip_id,customer_id,payment_method,payment_status,amount,transaction_time,last_updated_timestamp
1,274,126,Cash,Success,38.15,2025-09-17T13:00:12.000Z,2025-08-30T13:40:53.000Z
2,676,131,Cash,Success,52.07,2025-08-14T13:00:12.000Z,2025-09-08T18:21:05.000Z
3,919,132,Card,Failed,55.5,2025-07-27T13:00:12.000Z,2025-08-21T20:24:08.000Z
4,247,34,Wallet,Pending,28.78,2025-07-27T13:00:12.000Z,2025-08-27T15:03:09.000Z
5,386,62,Card,Failed,55.02,2025-08-01T13:00:12.000Z,2025-09-08T23:15:06.000Z


We will make some business logic transformations
-mainly to answer this questions in BI semantic layer 
   *What's our payment success rate by method?"
   *How many payments need follow-up today?"
   *What % of payments are digital vs cash?"
   *Which payment methods have highest failure rates?"


In [0]:
df_payments = df_payments.withColumn(
    'is_successful',
    when(col('payment_status') == "Success", 1).otherwise(0)
  
)
df_payments.limit(5).display()

payment_id,trip_id,customer_id,payment_method,payment_status,amount,transaction_time,last_updated_timestamp,is_successful
1,274,126,Cash,Success,38.15,2025-09-17T13:00:12.000Z,2025-08-30T13:40:53.000Z,1
2,676,131,Cash,Success,52.07,2025-08-14T13:00:12.000Z,2025-09-08T18:21:05.000Z,1
3,919,132,Card,Failed,55.5,2025-07-27T13:00:12.000Z,2025-08-21T20:24:08.000Z,0
4,247,34,Wallet,Pending,28.78,2025-07-27T13:00:12.000Z,2025-08-27T15:03:09.000Z,0
5,386,62,Card,Failed,55.02,2025-08-01T13:00:12.000Z,2025-09-08T23:15:06.000Z,0


In [0]:
df_payments = df_payments.withColumn(
    "payment_type",
    when(col("payment_method") == "Cash", "Cash")
    .when(col("payment_method").isin(["Card", "Wallet"]), "Digital")
    .otherwise("Other")
)

In [0]:
df_payments=df_payments.withColumn("needs_followup",when(col("payment_status").isin(["Failed","Pending"]),"Yes").otherwise("No"))

In [0]:
df_payments.limit(5).display()

payment_id,trip_id,customer_id,payment_method,payment_status,amount,transaction_time,last_updated_timestamp,is_successful,payment_type,needs_followup
1,274,126,Cash,Success,38.15,2025-09-17T13:00:12.000Z,2025-08-30T13:40:53.000Z,1,Cash,No
2,676,131,Cash,Success,52.07,2025-08-14T13:00:12.000Z,2025-09-08T18:21:05.000Z,1,Cash,No
3,919,132,Card,Failed,55.5,2025-07-27T13:00:12.000Z,2025-08-21T20:24:08.000Z,0,Digital,Yes
4,247,34,Wallet,Pending,28.78,2025-07-27T13:00:12.000Z,2025-08-27T15:03:09.000Z,0,Digital,Yes
5,386,62,Card,Failed,55.02,2025-08-01T13:00:12.000Z,2025-09-08T23:15:06.000Z,0,Digital,Yes


In [0]:
df_payments = df_payments.withColumn(
    'online_payment_status',
    when(
        (col('payment_method').isin(['Card', 'Wallet'])) & (col('payment_status') == 'Success'),
        'Online_Success'
    )
    .when(
        (col('payment_method').isin(['Card', 'Wallet'])) & (col('payment_status') == 'Failed'),
        'Online_Failure'
    )
    .when(
        (col('payment_method').isin(['Card', 'Wallet'])) & (col('payment_status') == 'Pending'),
        'Online_Pending'
    )
    .otherwise('Offline')
)
                                                        
                                

In [0]:
df_payments = df_payments.withColumn(
    "payment_date", 
    to_date(to_timestamp("transaction_time")))
df_payments.limit(5).display()




payment_id,trip_id,customer_id,payment_method,payment_status,amount,transaction_time,last_updated_timestamp,is_successful,payment_type,needs_followup,online_payment_status,payment_date
1,274,126,Cash,Success,38.15,2025-09-17T13:00:12.000Z,2025-08-30T13:40:53.000Z,1,Cash,No,Offline,2025-09-17
2,676,131,Cash,Success,52.07,2025-08-14T13:00:12.000Z,2025-09-08T18:21:05.000Z,1,Cash,No,Offline,2025-08-14
3,919,132,Card,Failed,55.5,2025-07-27T13:00:12.000Z,2025-08-21T20:24:08.000Z,0,Digital,Yes,Online_Failure,2025-07-27
4,247,34,Wallet,Pending,28.78,2025-07-27T13:00:12.000Z,2025-08-27T15:03:09.000Z,0,Digital,Yes,Online_Pending,2025-07-27
5,386,62,Card,Failed,55.02,2025-08-01T13:00:12.000Z,2025-09-08T23:15:06.000Z,0,Digital,Yes,Online_Failure,2025-08-01


In [0]:
df_payments.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- trip_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_time: timestamp (nullable = true)
 |-- last_updated_timestamp: timestamp (nullable = true)
 |-- is_successful: integer (nullable = false)
 |-- payment_type: string (nullable = false)
 |-- needs_followup: string (nullable = false)
 |-- online_payment_status: string (nullable = false)
 |-- payment_date: date (nullable = true)
 |-- current_timestamp: timestamp (nullable = false)



In [0]:
payments_object= transformations()
df_payments=payments_object.dedup(df_payments,['payment_id'],'last_updated_timestamp')
df_payments=payments_object.process_timestamp(df_payments)

### UPSERT TO SILVER LAYER

In [0]:
if not spark.catalog.tableExists("transportation_service_company.silver.payments"):
    df_payments.write.mode("append").format("delta").saveAsTable("transportation_service_company.silver.payments")
else:
    payments_object.upsert(df_payments, ['payment_id'],'payments', 'last_updated_timestamp')

### Vehicles

In [0]:
df_vehicles=spark.read.table("transportation_service_company.bronze.vehicles")
df_vehicles.limit(5).display()

vehicle_id,license_plate,model,make,year,vehicle_type,last_updated_timestamp
1,NXT-8646,Message,"Francis, Smith and Lee",2023,Hatchback,2025-09-02T06:05:20.000Z
2,03S R43,Region,Lawson Group,2017,Sedan,2025-08-31T05:55:09.000Z
3,SKO H06,Prepare,"Moreno, Ruiz and Barker",2023,Luxury,2025-08-30T05:07:29.000Z
4,5R235,Pattern,"Welch, Martinez and Hendricks",2019,Van,2025-09-09T05:52:10.000Z
5,925A,Process,Rivera-Anderson,2014,Hatchback,2025-08-27T11:42:16.000Z


In [0]:
df_vehicles=df_vehicles.withColumn('make',upper(col('make')))


In [0]:
vehicles_object= transformations()
df_vehicles=vehicles_object.dedup(df_vehicles,['vehicle_id'],'last_updated_timestamp')
df_vehicles=vehicles_object.process_timestamp(df_vehicles)



In [0]:
if not spark.catalog.tableExists("transportation_service_company.silver.vehicles"):
    df_vehicles.write.mode("append").format("delta").saveAsTable("transportation_service_company.silver.vehicles")
else:
    vehicles_object.upsert(df_vehicles, ['vehicle_id'],'vehicles', 'last_updated_timestamp')

### TRIPS

In [0]:
df_trips=spark.read.table("transportation_service_company.bronze.trips")
df_trips.limit(5).display()

trip_id,driver_id,customer_id,vehicle_id,trip_start_time,trip_end_time,start_location,end_location,distance_km,fare_amount,payment_method,trip_status,last_updated_timestamp
1,29,185,29,2025-07-31T18:00:11.000Z,2025-07-31T18:07:11.000Z,New Victoriahaven,New Cynthiamouth,9.09,28.41,Card,Cancelled,2025-09-06T01:55:37.000Z
2,2,180,2,2025-08-29T04:00:11.000Z,2025-08-29T04:36:11.000Z,West Wanda,Donaldburgh,22.68,76.3,Card,Ongoing,2025-08-25T21:45:53.000Z
3,43,182,43,2025-08-25T09:00:11.000Z,2025-08-25T09:36:11.000Z,North Robert,West Donald,29.23,47.0,Cash,Completed,2025-09-07T12:33:38.000Z
4,4,152,4,2025-08-10T21:00:11.000Z,2025-08-10T21:16:11.000Z,Chadville,West Marcusmouth,25.07,35.69,Wallet,Completed,2025-09-04T10:41:23.000Z
5,23,132,23,2025-09-12T15:00:11.000Z,2025-09-12T15:37:11.000Z,Schneiderview,Ochoaton,33.2,49.46,Card,Ongoing,2025-09-20T04:33:10.000Z


In [0]:
trips_object= transformations()
df_trips=trips_object.dedup(df_trips,['trip_id'],'last_updated_timestamp')
df_trips=trips_object.process_timestamp(df_trips)



### UPSERT TO SILVER

In [0]:
if not spark.catalog.tableExists("transportation_service_company.silver.trips"):
    df_trips.write.mode("append").format("delta").saveAsTable("transportation_service_company.silver.trips")
else:
    trips_object.upsert(df_trips, ['trip_id'],'trips', 'last_updated_timestamp')