In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
import os
import sys

In [0]:
current_dir = os.getcwd()
sys.path.append(current_dir)

###CUSTOMERS

In [0]:
df_cust = spark.read.table("pyspark_proj.bronze.customers")
df_cust.display()

In [0]:
df_cust = df_cust.withColumn("domain",split(df_cust.email, "@").getItem(1))

df_cust.display()

In [0]:
df_cust = df_cust.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]", ""))

df_cust.display()

In [0]:
df_cust = df_cust.withColumn("Full_Name",concat_ws(" ",df_cust.first_name,df_cust.last_name))
df_cust = df_cust.drop("first_name","last_name")

df_cust.display()


In [0]:

from utils.custom_utils import transformations


In [0]:
cust_obj = transformations(spark)

cust_df_trans = cust_obj.dedup(df_cust,["customer_id"],'last_updated_timestamp')

cust_df_trans.display()

In [0]:
df_cust = cust_obj.process_timestamp(cust_df_trans)
display(df_cust)

In [0]:
if not spark.catalog.tableExists("pyspark_proj.silver.customers"):
  df_cust.write.format("delta")\
    .mode("append")\
    .saveAsTable("pyspark_proj.silver.customers")
else:
    cust_obj.upsert(df_cust,['customer_id'],'customers','last_updated_timestamp') 
    #source, key_cols, target,cdc

In [0]:
%sql
select * from pyspark_proj.silver.customers

###DRIVERS

In [0]:
df_driver = spark.read.table("pyspark_proj.bronze.drivers")
df_driver.display()

In [0]:
df_driver = df_driver.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]", ""))

df_driver = df_driver.withColumn("Full_Name",concat_ws(" ",df_driver.first_name,df_driver.last_name))
df_driver = df_driver.drop("first_name","last_name")

df_driver.display()

In [0]:
driver_obj = transformations(spark)
df_driver = driver_obj.dedup(df_driver,["driver_id"],'last_updated_timestamp')
df_driver = driver_obj.process_timestamp(df_driver)
display(df_driver)

In [0]:
if not spark.catalog.tableExists("pyspark_proj.silver.drivers"):
  df_driver.write.format("delta")\
    .mode("append")\
    .saveAsTable("pyspark_proj.silver.drivers")
else:
    driver_obj.upsert(df_driver,['driver_id'],'drivers','last_updated_timestamp') 


In [0]:
%sql
select * from pyspark_proj.silver.drivers

###LOCATIONS

In [0]:
df_loc = spark.read.table("pyspark_proj.bronze.locations")
display(df_loc)


In [0]:
loc_obj = transformations(spark)
df_loc = loc_obj.dedup(df_loc,["location_id"],'last_updated_timestamp')
df_loc = loc_obj.process_timestamp(df_loc)
display(df_loc)


In [0]:
if not spark.catalog.tableExists("pyspark_proj.silver.locations"):
  df_loc.write.format("delta")\
    .mode("append")\
    .saveAsTable("pyspark_proj.silver.locations")
else:
    loc_obj.upsert(df_loc,['location_id'],'locations','last_updated_timestamp')

In [0]:
%sql
select * from pyspark_proj.silver.locations

###PAYMENTS

In [0]:
df_pay = spark.read.table("pyspark_proj.bronze.payments")
display(df_pay)


In [0]:
df_pay = df_pay.withColumn("online_payment_status", 
                           when(((col('payment_method')=='Card') & (col('payment_status')=='Success')),'online-success')
                           .when(((col('payment_method')=='Card') & (col('payment_status')=='Failed')),'online-failed')
                           .when(((col('payment_method')=='Card') & (col('payment_status')=='Pending')),'online-pending').otherwise('offline'))
                           
display(df_pay)

In [0]:
payment_obj = transformations(spark)
df_pay = payment_obj.dedup(df_pay,["payment_id"],'last_updated_timestamp')
df_pay = payment_obj.process_timestamp(df_pay)

if not spark.catalog.tableExists("pyspark_proj.silver.payments"):
  df_pay.write.format("delta")\
    .mode("append")\
    .saveAsTable("pyspark_proj.silver.payments")
else:
    payment_obj.upsert(df_pay,['payment_id'],'payments','last_updated_timestamp')


In [0]:
%sql
select * from pyspark_proj.silver.payments

###VEHICLES

In [0]:
df_vehicles = spark.read.table("pyspark_proj.bronze.vehicles")
display(df_vehicles)


In [0]:
df_vehicles = df_vehicles.withColumn("make",upper(col("make")))
display(df_vehicles)

In [0]:
vehicle_obj = transformations(spark)
df_vehicles = vehicle_obj.dedup(df_vehicles,["vehicle_id"],'last_updated_timestamp')
df_vehicles = vehicle_obj.process_timestamp(df_vehicles)

if not spark.catalog.tableExists("pyspark_proj.silver.vehicles"):
  df_vehicles.write.format("delta")\
    .mode("append")\
    .saveAsTable("pyspark_proj.silver.vehicles")
else:
    vehicle_obj.upsert(df_vehicles,['vehicle_id'],'vehicles','last_updated_timestamp')

In [0]:
%sql
select * from pyspark_proj.silver.vehicles