In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

Dynamic transformations

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from delta.tables import DeltaTable

In [0]:
import os
import sys

In [0]:
current_dir = os.getcwd()
# print(current_dir)

sys.path.append(current_dir)


#### **Customers**

In [0]:
df_cust = spark.read.table("pyspark_dbt_project.bronze.customers")


In [0]:
print(f"Number of records in customers table: {df_cust.count()}")
display(df_cust.limit(3))

In [0]:
df_cust = df_cust.withColumn("domain", split(col("email"), "@")[1])
display(df_cust.limit(3))

In [0]:
df_cust = df_cust.withColumn("phone_number", regexp_replace(col("phone_number"), r"[^0-9]",""))
display(df_cust.limit(3))

In [0]:
df_cust = df_cust.withColumn("full_name",concat_ws(" ", col("first_name"), col("last_name")))\
    .drop("first_name","last_name")
display(df_cust.limit(3))
                             

In [0]:
from utils.custom_utils import transformations

In [0]:
cust_obj = transformations()

cust_df_trans = cust_obj.dedup(df_cust, ['customer_id'], 'last_updated_timestamp')

print(f"Count after dedup: {cust_df_trans.count()}")
display(cust_df_trans.limit(4))



In [0]:
# Add timestamp column

df_with_ts = cust_obj.process_timestamp(cust_df_trans)
display(df_with_ts.limit(3))

Upsert

In [0]:
if not spark.catalog.tableExists("pyspark_dbt_project.silver.customers"):

    df_cust.write.format("delta")\
            .mode("append")\
            .saveAsTable("pyspark_dbt_project.silver.customers")
else:
    cust_obj.upsert(df_cust, ['customer_id'], 'customers', 'last_updated_timestamp')

In [0]:
%sql
SELECT 
  COUNT(*)
FROM 
  pyspark_dbt_project.silver.customers

#### Drivers table

In [0]:
df_driver = spark.read.table("pyspark_dbt_project.bronze.drivers")

In [0]:
display(df_driver.limit(2))

In [0]:
df_driver = df_driver.withColumn("phone_number", regexp_replace(col("phone_number"), r"[^0-9]",""))
display(df_driver.limit(3))

In [0]:
df_driver = df_driver.withColumn("full_name",concat_ws(" ", col("first_name"), col("last_name")))\
    .drop("first_name","last_name")
display(df_driver.limit(3))

In [0]:
driver_obj = transformations()

In [0]:
driver_df_trans = driver_obj.dedup(df_driver, ['driver_id'], 'last_updated_timestamp')

print(f"Count after dedup: {driver_df_trans.count()}")
display(driver_df_trans.limit(4))

In [0]:
df_driver_with_ts = driver_obj.process_timestamp(driver_df_trans)
display(df_driver_with_ts.limit(3))

In [0]:
if not spark.catalog.tableExists("pyspark_dbt_project.silver.drivers"):

    df_driver.write.format("delta")\
            .mode("append")\
            .saveAsTable("pyspark_dbt_project.silver.drivers")
else:
    cust_obj.upsert(df_driver, ['driver_id'], 'drivers', 'last_updated_timestamp')

In [0]:
%sql
SELECT 
  COUNT(*)
FROM 
  pyspark_dbt_project.silver.drivers

#### Locations

In [0]:
df_loc = spark.read.table("pyspark_dbt_project.bronze.locations")


# df_loc = df_loc.withColumn("phone_number", regexp_replace(col("phone_number"), r"[^0-9]",""))
# display(df_loc.limit(3))

# df_loc = df_loc.withColumn("full_name",concat_ws(" ", col("first_name"), col("last_name")))\
#     .drop("first_name","last_name")
# display(df_loc.limit(3))

In [0]:
loc_obj = transformations()

loc_df_trans = loc_obj.dedup(df_loc, ['location_id'], 'last_updated_timestamp')

# print(f"Count after dedup: {loc_df_trans.count()}")
# display(loc_df_trans.limit(4))

df_loc_with_ts = loc_obj.process_timestamp(loc_df_trans)
display(df_loc_with_ts.limit(3))

if not spark.catalog.tableExists("pyspark_dbt_project.silver.locations"):

    df_loc_with_ts.write.format("delta")\
            .mode("append")\
            .saveAsTable("pyspark_dbt_project.silver.locations")
else:
    cust_obj.upsert(df_loc_with_ts, ['location_id'], 'locations', 'last_updated_timestamp')

In [0]:
%sql
SELECT 
  COUNT(*)
FROM 
  pyspark_dbt_project.silver.locations

#### Payments

In [0]:
df_payments = spark.read.table("pyspark_dbt_project.bronze.payments")

In [0]:
from pyspark.sql.functions import when, col

df_payments = df_payments.withColumn(
    "online_payment_status",
    when(
        ((col("payment_method") == "Card") & (col("payment_status") == "Success")),
        "online-success"
    ).when(
        ((col("payment_method") == "Card") & (col("payment_status") == "Failed")),
        "online-failed"
    ).when(
        ((col("payment_method") == "Card") & (col("payment_status") == "Pending")),
        "online-pending"
    ).otherwise("offline")
)

In [0]:
display(df_payments.limit(15))

In [0]:
pay_obj = transformations()

pay_df_trans = pay_obj.dedup(df_payments, ['payment_id'], 'last_updated_timestamp')

# print(f"Count after dedup: {loc_df_trans.count()}")
# display(loc_df_trans.limit(4))

df_payments_with_ts = pay_obj.process_timestamp(pay_df_trans)
display(df_payments_with_ts.limit(3))

if not spark.catalog.tableExists("pyspark_dbt_project.silver.payments"):

    df_payments_with_ts.write.format("delta")\
            .mode("append")\
            .saveAsTable("pyspark_dbt_project.silver.payments")
else:
    pay_obj.upsert(df_payments_with_ts, ['payment_id'], 'payments', 'last_updated_timestamp')

In [0]:
%sql
SELECT 
  COUNT(*)
FROM 
  pyspark_dbt_project.silver.payments

#### Vehicles

In [0]:
df_vehicles = spark.read.table("pyspark_dbt_project.bronze.vehicles")

In [0]:
df_veh = df_vehicles.withColumn("make", upper(col("make")))
display(df_veh.limit(2))

In [0]:
veh_obj = transformations()

veh_df_trans = veh_obj.dedup(df_veh, ['vehicle_id'], 'last_updated_timestamp')

print(f"Count after dedup: {veh_df_trans.count()}")
# display(loc_df_trans.limit(4))

df_veh_with_ts = veh_obj.process_timestamp(veh_df_trans)
display(df_veh_with_ts.limit(3))

if not spark.catalog.tableExists("pyspark_dbt_project.silver.vehicles"):

    df_veh_with_ts.write.format("delta")\
            .mode("append")\
            .saveAsTable("pyspark_dbt_project.silver.vehicles")
else:
    veh_obj.upsert(df_veh_with_ts, ['vehicle_id'], 'vehicles', 'last_updated_timestamp')

In [0]:
%sql
SELECT 
  COUNT(*)
FROM 
  pyspark_dbt_project.silver.vehicles

#### Trips

In [0]:
# df_trips = spark.read.table("pyspark_dbt_project.bronze.trips")

In [0]:
# trip_obj = transformations()

# trip_df_trans = trip_obj.dedup(df_trips, ['trip_id'], 'last_updated_timestamp')

# # print(f"Count after dedup: {loc_df_trans.count()}")
# # display(loc_df_trans.limit(4))

# df_trips_with_ts = trip_obj.process_timestamp(trip_df_trans)
# display(df_trips_with_ts.limit(3))

# if not spark.catalog.tableExists("pyspark_dbt_project.silver.trips"):

#     df_driver.write.format("delta")\
#             .mode("append")\
#             .saveAsTable("pyspark_dbt_project.silver.trips")
# else:
#     trip_obj.upsert(df_trips, ['trip_id'], 'trips', 'last_updated_timestamp')