### CUSTOMERS

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from pyspark.sql.functions import expr

In [0]:
class Transformations:
    def dedup(self, df: DataFrame, dedup_cols: List[str], cdc: str) -> DataFrame:
        # 1. Tạo khóa định danh (nối các cột lại để tạo "dedupKey")
        df = df.withColumn("dedupKey", concat(*[col(c) for c in dedup_cols]))
        
        # 2. Đánh số thứ tự cho từng nhóm bản ghi trùng nhau, sắp theo cột cdc
        df = df.withColumn(
            "dedupCounts",
            row_number().over(
                Window.partitionBy("dedupKey").orderBy(col(cdc).desc())
            )
        )
        
        # 3. Giữ lại chỉ bản ghi đầu tiên trong mỗi nhóm
        df = df.filter(col("dedupCounts") == 1)
        
        # 4. Xóa các cột phụ
        df = df.drop("dedupKey", "dedupCounts")

        return df
    
    def process_timestamp(self, df):
        df = df.withColumn("process_timestamp", current_timestamp())
        return df
    
    def upsert(self, df, key_cols, table, cdc):
        merge_condition = " AND ".join([f"src.{i} = trg.{i}" for i in key_cols])
        dlt_obj = DeltaTable.forName(spark, f"pysparkdbt.silver.{table}")
        dlt_obj.alias("trg").merge(df.alias("src"), merge_condition)\
                        .whenMatchedUpdateAll(condition = expr(f"src.{cdc} >= trg.{cdc}"))\
                        .whenNotMatchedInsertAll()\
                        .execute()
        return 1

In [0]:
df_customers = spark.read.table("pysparkdbt.bronze.customers")

In [0]:
df_customers = df_customers.withColumn("domain", split(col('email'), '@')[1])

In [0]:
df_customers = df_customers.withColumn("phone_number", regexp_replace("phone_number",r"[^0-9]",""))

In [0]:
df_customers = df_customers.withColumn("full_nume", concat_ws(" ",col("first_name"),col("last_name")))

In [0]:
df_customers = df_customers.drop("first_name", "last_name")

In [0]:
cust_obj = Transformations()

cust_df_trns = cust_obj.dedup(df_customers, ['customer_id'], 'last_updated_timestamp')

In [0]:
df_customers = cust_obj.process_timestamp(cust_df_trns)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.customers"):
    df_customers.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.customers")
else:
    cust_obj.upsert(df_customers, ["customer_id"], "customers", "last_updated_timestamp")

## DRIVERS

In [0]:
df_drivers = spark.read.table("pysparkdbt.bronze.drivers")

In [0]:
df_drivers = df_drivers.withColumn("phone_number", regexp_replace("phone_number",r"[^0-9]",""))

In [0]:
df_drivers = df_drivers.withColumn("full_nume", concat_ws(" ",col("first_name"),col("last_name")))
df_drivers = df_drivers.drop("first_name", "last_name")

In [0]:
driver_obj = Transformations()

In [0]:
df_drivers = driver_obj.dedup(df_drivers, ['driver_id'], 'last_updated_timestamp')


In [0]:
df_drivers = driver_obj.process_timestamp(df_drivers)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.drivers"):
    df_drivers.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.drivers")

else:
    driver_obj.upsert(df_drivers, ["driver_id"], "drivers", "last_updated_timestamp")

## LOCATIONS

In [0]:
df_locations = spark.read.table("pysparkdbt.bronze.locations")

In [0]:
loc_obj = Transformations()


In [0]:
df_locations = loc_obj.dedup(df_locations, ['location_id'], 'last_updated_timestamp')
df_locations = loc_obj.process_timestamp(df_locations)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.locations"):
    df_locations.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.locations")

else:
    loc_obj.upsert(df_locations, ["location_id"], "locations", "last_updated_timestamp")  

## PAYMENTS

In [0]:
df_payments = spark.read.table("pysparkdbt.bronze.payments")

In [0]:
df_payments = df_payments.withColumn("online_payments_status", 
            when((col("payment_method") == 'Card') & (col("payment_status") == 'Success'), "online_success")
            .when((col("payment_method") == 'Card') & (col("payment_status") == 'Failed'), "online_failed")
            .when((col("payment_method") == 'Card') & (col("payment_status") == 'Pending'), "online_pending")
            .otherwise("offline"))

In [0]:
payment_obj = Transformations()
df_payments = payment_obj.dedup(df_payments, ['payment_id'], 'last_updated_timestamp')
df_payments = payment_obj.process_timestamp(df_payments)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.payments"):
    df_payments.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.payments")

else:
    payment_obj.upsert(df_payments, ["payment_id"], "payments", "last_updated_timestamp")

## VEHICLES

In [0]:
df_vehicles = spark.read.table("pysparkdbt.bronze.vehicles")

In [0]:
df_vehicles = df_vehicles.withColumn("make", upper(col("make")))

In [0]:
vehicles_obj = Transformations()
df_vehicles = vehicles_obj.dedup(df_vehicles, ['vehicle_id'], 'last_updated_timestamp')
df_vehicles = vehicles_obj.process_timestamp(df_vehicles)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.vehicles"):
    df_vehicles.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.vehicles")

else:
    vehicles_obj.upsert(df_vehicles, ["vehicle_id"], "vehicles", "last_updated_timestamp")