In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable

In [0]:
df = spark.read.table("uber.bronze.customers")

In [0]:
display(df)

In [0]:
df = df.withColumn("domain", split(df.email, "@")[1])

display(df)

In [0]:
df = df.withColumn("full_name", concat_ws(" ", df.first_name, df.last_name)).drop("first_name", "last_name")

display(df)

In [0]:
 class Transformations:
     @classmethod
     def dedup(cls, df, key, order_by_col):
         df_with_rank = df.withColumn("row_num", row_number().over(Window.partitionBy(key).orderBy(col(order_by_col).desc())))
         dedup_df = df_with_rank.filter(col("row_num") == 1).drop("row_num")
         return dedup_df
     
     @classmethod
     def add_last_process_date(cls, df):
         df = df.withColumn("last_updated_timestamp", current_timestamp())
         return df
     
     @classmethod
     def upsert(cls, df, table_name, keycols, cdc):
         merger_condition = " AND ".join([f"t.{keycol} = s.{keycol}" for keycol in keycols])
         delta_table = DeltaTable.forName(spark, table_name)
         delta_table.alias("t").merge(
             df.alias("s"),
             ("t.customer_id = s.customer_id"))\
             .whenMatchedUpdateAll()\
                 .whenNotMatchedInsertAll()\
                 .execute()

         return 1

In [0]:
transform_df = Transformations.dedup(df, "customer_id", "last_updated_timestamp")
transform_df = Transformations.add_last_process_date(transform_df)
display(transform_df)

In [0]:
if not spark.catalog.tableExists("uber.silver.customers"):
    transform_df.write.format("delta")\
        .mode("append")\
            .saveAsTable("uber.silver.customers")
else:
    Transformations.upsert(transform_df, "uber.silver.customers", ["customer_id"], "update")