In [0]:
# For Job run During deployment

# parameters for jobs

# catalog        = dbutils.widgets.get("p_catalog")
# bronze_schema  = dbutils.widgets.get("p_bronze_schema")
# silver_schema  = dbutils.widgets.get("p_silver_schema")

# example for tables use below code

# full_bronze_table = f"{catalog}.{bronze_schema}.trips"
# full_silver_table = f"{catalog}.{silver_schema}.trips"

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql import Window
from delta.tables import DeltaTable
# import os
# import sys
# project_path=os.getcwd()
# sys.path.append(project_path)

# from utilities import *

### Python UserDefinedClasses
### 1)Deduplication

In [0]:
class transformations:

    def dedup(self, df: DataFrame, dedup_cols: List, cdc: str):
        df = df.withColumn("dedupKey", concat(*dedup_cols))
        df = df.withColumn("dedupCounts", row_number().over(Window.partitionBy("dedupKey").orderBy(desc(cdc))))
        df = df.filter(col('dedupCounts') == 1)
        df = df.drop("dedupKey", "dedupCounts")

        return df
    def process_timestamp(self, df):
        df = df.withColumn("process_timestamp", current_timestamp())
        return df


    def upsert(self, df, key_cols, table,cdc):
        merge_condition = " AND ".join([f"src.{i} = trg.{i}" for i in key_cols])
        dlt_obj = DeltaTable.forName(spark,f"uber_cata.silver.{table}")

        dlt_obj.alias("trg") \
            .merge(df.alias("src"), merge_condition) \
            .whenMatchedUpdateAll(condition=f"src.{cdc} >= trg.{cdc}") \
            .whenNotMatchedInsertAll() \
            .execute()

        return 1


### Customers

In [0]:
df_cust=spark.read.table("uber_cata.bronze.customers")


In [0]:
df_cust.display()

In [0]:
df_cust=df_cust.withColumn("domain",split(col("email"),"@")[1])


In [0]:
df_cust=df_cust.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]",""))

In [0]:
df_cust=df_cust.withColumn("full_name",concat(col("first_name"),lit(" "),col("last_name")))
df_cust=df_cust.drop("first_name","last_name")

In [0]:
cust_obj=transformations()
df_cust=cust_obj.dedup(df_cust,['customer_id'],'last_updated_timestamp')
df_cust.display()

In [0]:
df_cust=cust_obj.process_timestamp(df_cust)
df_cust.display()


In [0]:
if not spark.catalog.tableExists("uber_cata.silver.customers"):
  df_cust.write.format("delta").mode("append").saveAsTable("uber_cata.silver.customers")
else:
  cust_obj.upsert(df_cust,['customer_id'],'customers','last_updated_timestamp')

### Drivers

In [0]:
df_driver=spark.read.table("uber_cata.bronze.drivers")

In [0]:
df_driver=df_driver.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]",""))
df_driver=df_driver.withColumn("full_name",concat(col("first_name"),lit(" "),col("last_name")))
df_driver=df_driver.drop("first_name","last_name")

df_driver.display()

In [0]:
driver_obj=transformations()
df_driver=driver_obj.dedup(df_driver,['driver_id'],'last_updated_timestamp')

In [0]:
df_driver=driver_obj.process_timestamp(df_driver)

In [0]:
if not spark.catalog.tableExists("uber_cata.silver.drivers"):
  df_driver.write.format("delta").mode("append").saveAsTable("uber_cata.silver.drivers")
else:
  driver_obj.upsert(df_driver,['driver_id'],'drivers','last_updated_timestamp')

### Locations

In [0]:
df_loc=spark.read.table("uber_cata.bronze.locations")

In [0]:
loc_obj=transformations()
df_loc=loc_obj.dedup(df_loc,['location_id'],'last_updated_timestamp')
df_loc=loc_obj.process_timestamp(df_loc)
if not spark.catalog.tableExists("uber_cata.silver.locations"):
  df_loc.write.format("delta").mode("append").saveAsTable("uber_cata.silver.locations")
else:
  loc_obj.upsert(df_loc,['location_id'],'locations','last_updated_timestamp')

### Payments

In [0]:
df_pay=spark.read.table("uber_cata.bronze.payments")
df_pay.display()

In [0]:
df_pay=df_pay.withColumn("online_payment_status",when((col('payment_method')=='Card') & (col('payment_status')=='Success'),'Online-success').when((col('payment_method')=='Card') & (col('payment_status')=='Failed'),'Online-failed').when((col('payment_method')=='Card') & (col('payment_status')=='Pending'),'Online-pending').otherwise('Offline'))

In [0]:
pay_obj=transformations()
df_pay=pay_obj.dedup(df_pay,['payment_id'],'last_updated_timestamp')
df_pay=pay_obj.process_timestamp(df_pay)

In [0]:
if not spark.catalog.tableExists("uber_cata.silver.payments"):
  df_pay.write.format("delta").mode("append").saveAsTable("uber_cata.silver.payments")
else:
  pay_obj.upsert(df_pay,['payment_id'],'payments','last_updated_timestamp')

### Trips

In [0]:
df_veh=spark.read.table("uber_cata.bronze.vehicles")

In [0]:
df_veh=df_veh.withColumn("make",upper(col("make")))

In [0]:
veh_obj=transformations()
df_veh=veh_obj.dedup(df_veh,['vehicle_id'],'last_updated_timestamp')
df_veh=veh_obj.process_timestamp(df_veh)

In [0]:
if not spark.catalog.tableExists("uber_cata.silver.trips"):
  df_veh.write.format("delta").mode("append").saveAsTable("uber_cata.silver.vehicles")
else:
  veh_obj.upsert(df_trip,['veh_id'],'vehicles','last_updated_timestamp')