In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from typing import List
from pyspark.sql import DataFrame
from delta.tables import DeltaTable

class transformation():
    def dedup(self, df: DataFrame, dedup_key: List[str], cdc: str) -> DataFrame:
        window_spec = Window.partitionBy(*dedup_key).orderBy(col(cdc).desc())
        df = df.withColumn("row_number", row_number().over(window_spec))
        df = df.filter(col("row_number") == 1).drop("row_number")
        return df

    def process_date(self, df: DataFrame) -> DataFrame:
        return df.withColumn('process_date', current_timestamp())
    
    def upsert(self,df:DataFrame,key_cols:List,table,cdc):
        condition=[f'trg.{k} = src.{k}' for k in key_cols]
        final_cond=' and '.join(condition)
        trg_df=DeltaTable.forName(spark,f'Project2.silver.{table}')
        trg_df.alias("trg").merge(df.alias("src"),final_cond)\
                            .whenMatchedUpdateAll(condition=f'src.{cdc} > trg.{cdc}')\
                            .whenNotMatchedInsertAll()\
                            .execute()
        

### Customer

In [0]:
df_cust=spark.read.table('Project2.bronze.customer')

In [0]:
obj_cust=transformation()
df_cust=obj_cust.dedup(df_cust,['customer_id'],'last_updated_timestamp')
df_cust=df_cust.withColumn('customer_name',concat_ws(' ',col('first_name'),col('last_name')))\
     .withColumn('phone_number',regexp_replace(col('phone_number'),r'[^0-9]',''))\
     .drop('first_name','last_name')
df_cust=obj_cust.process_date(df_cust)

In [0]:
if spark.catalog.tableExists(f'Project2.silver.customer'):
    obj_cust.upsert(df_cust,['customer_id'],'customer','last_updated_timestamp')
else:
    df_cust.write.format('delta').mode('append').saveAsTable(f'Project2.silver.customer')

### Driver Table

In [0]:
df_driver=spark.read.table('Project2.bronze.driver')

In [0]:
%sql
select count(*) from project2.silver.trips


In [0]:
obj_driver=transformation()
df_driver=obj_driver.dedup(df_driver,['driver_id'],'last_updated_timestamp')
df_driver=df_driver.withColumn('driver_name',concat_ws(' ',col('first_name'),col('last_name')))\
     .withColumn('phone_number',regexp_replace(col('phone_number'),r'[^0-9]',''))\
     .drop('first_name','last_name')
df_driver=obj_driver.process_date(df_driver)

In [0]:
if spark.catalog.tableExists(f'Project2.silver.driver'):
    obj_driver.upsert(df_driver,['driver_id'],'driver','last_updated_timestamp')
else:
    df_driver.write.format('delta').mode('append').saveAsTable(f'Project2.silver.driver')

### Location Table


In [0]:
df_location=spark.read.table('Project2.bronze.location')

In [0]:
obj_loc=transformation()
df_location=obj_loc.dedup(df_location,['location_id'],'last_updated_timestamp')
df_location=obj_loc.process_date(df_location)

In [0]:
if spark.catalog.tableExists(f'Project2.silver.location'):
    obj_loc.upsert(df_location,['location_id'],'location','last_updated_timestamp')
else:
    df_location.write.format('delta').mode('append').saveAsTable(f'Project2.silver.location')

### Payment Table

In [0]:
df_payment = spark.read.table('Project2.bronze.payment')

In [0]:
obj_payment=transformation()
df_payment=obj_payment.dedup(df_payment,['payment_id'],'last_updated_timestamp')
df_payment=obj_payment.process_date(df_payment)

In [0]:
if spark.catalog.tableExists(f'Project2.silver.payment'):
    obj_payment.upsert(df_payment,['payment_id'],'payment','last_updated_timestamp')
else:
    df_payment.write.format('delta').mode('append').saveAsTable(f'Project2.silver.payment')

### Vehicle Table

In [0]:
df_vehicle=spark.read.table('Project2.bronze.vehicle')

In [0]:
obj_vehicle=transformation()
df_vehicle=obj_vehicle.dedup(df_vehicle,['vehicle_id'],'last_updated_timestamp')\
                      .withColumn('year',to_date(col('year'),'yyyy'))\
                      .withColumn('model',upper(col('model')))
df_vehicle=obj_vehicle.process_date(df_vehicle)

In [0]:
if spark.catalog.tableExists(f'Project2.silver.vehicle'):
    obj_vehicle.upsert(df_vehicle,['vehicle_id'],'vehicle','last_updated_timestamp')
else:
    df_vehicle.write.format('delta').mode('append').saveAsTable(f'Project2.silver.vehicle')