In [0]:
# Read the staged data and perform the merge operations 

stg_booking_data = spark.sql("select * from dev_databricks_projects.zoom_data.staging_booking_data")
stg_customer_data = spark.sql("select * from dev_databricks_projects.zoom_data.staging_customers_data")

stg_booking_data.printSchema()
stg_customer_data.printSchema()

root
 |-- end_time: timestamp (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- booking_date: string (nullable = true)
 |-- booking_id: string (nullable = true)
 |-- car_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- valid_status: string (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- email: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- signup_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- is_email_valid: string (nullable = true)
 |-- def_status: string (nullable = true)



In [0]:
%sql

create table if not exists dev_databricks_projects.zoom_data.customer_data_scd1
(
id bigint GENERATED ALWAYS AS IDENTITY,
customer_id  string ,
email string ,
name string , 
phone_number string, 
signup_date date,
status string ,
is_email_valid string, 
def_status string ,
primary key (id)
);

create table if not exists dev_databricks_projects.zoom_data.booking_data_scd1
(
id bigint GENERATED ALWAYS AS IDENTITY,
end_time timestamp,
start_time timestamp,
booking_date string,
booking_id string,
car_id string,
customer_id string,
status string,
total_amount double,
valid_status string,
primary key (id)
);

In [0]:
from delta.tables import *
from pyspark.sql.functions import *
# Apply the merge operations on the bookings and customer datasets 

#booking dataset
# First check if the target table exists or not. If not them directly upload stage data as it is first load
booking_target_table = "dev_databricks_projects.zoom_data.booking_data_scd1"
customer_target_table = "dev_databricks_projects.zoom_data.customer_data_scd1"

if not spark.catalog.tableExists(booking_target_table):
  stg_booking_data.write.format("delta").saveAsTable(booking_target_table)
else:
  #Perform the merge related operations
  target_table = DeltaTable.forName(spark, booking_target_table)
  merge_condition = "source.booking_id = target.booking_id"
  stg_cancelled_booking_data = stg_booking_data.filter(col('status')=='cancelled')


   # Execute merge operation for updated and new records

  target_table.alias('target').merge(stg_booking_data.alias('source'),merge_condition)\
    .whenMatchedUpdate(set={
          'end_time' : 'source.end_time',
          'start_time' : 'source.start_time',
          'booking_date' :'source.booking_date',
          'booking_id' :'source.booking_id',
          'car_id' :'source.car_id',
          'customer_id': 'source.customer_id',
          'status' :'source.status',
          'total_amount': 'source.total_amount',
          'valid_status' :'source.valid_status'
      })\
        .whenNotMatchedInsert(values={
          'end_time' : 'source.end_time',
          'start_time' : 'source.start_time',
          'booking_date' :'source.booking_date',
          'booking_id' :'source.booking_id',
          'car_id' :'source.car_id',
          'customer_id': 'source.customer_id',
          'status' :'source.status',
          'total_amount': 'source.total_amount',
          'valid_status' :'source.valid_status'
        })\
          .execute()
# Delete cancelled records
  target_table.alias('target').merge(stg_cancelled_booking_data.alias('source'),merge_condition)\
    .whenMatchedDelete()\
      .execute()
    
  


In [0]:
# Customer dataset 

customer_target_table = "dev_databricks_projects.zoom_data.customer_data_scd1"

#check if the customer table exists
if not spark.catalog.tableExists(customer_target_table):
    stg_customer_data.write.format("delta").saveAsTable(customer_target_table)
else:
    #merge condition to update and insert records
    target_table = DeltaTable.forName(spark, customer_target_table)
    merge_condition = "source.customer_id = target.customer_id"
    target_table.alias('target').merge(stg_customer_data.alias('source'),merge_condition)\
        .whenMatchedUpdate(set={
            'email' : 'source.email',
            'name' :'source.name',
            'phone_number' :'source.phone_number',
            'signup_date' :'source.signup_date',
            'status' :'source.status',
            'is_email_valid' :'source.is_email_valid',
            'def_status' :'source.def_status'

        })\
            .whenNotMatchedInsert(values={
                'email' : 'source.email',
                'name' :'source.name',
                'phone_number' :'source.phone_number',
                'signup_date' :'source.signup_date',
                'status' :'source.status',
                'is_email_valid' :'source.is_email_valid',
                'def_status' :'source.def_status'
            })\
                .execute()
