In [0]:
from pyspark.sql import SparkSession, functions as F


spark = SparkSession.builder.appName("GuestIngestion").getOrCreate()



guest_df = spark.read.option("recursiveFileLookup", "true").parquet("/mnt/bronze/guests")
required_columns = ['GUEST_NAME', 'EMAIL_ADDRESS', 'GUEST_NUMBER', 'PHONE_NUM', 'STREET_ADDRESS', 'DATE', 'STATE','CreateDate_TS']


guest_df = guest_df.select(*required_columns)
for col_name in required_columns:
    guest_df = guest_df.withColumnRenamed(col_name, col_name.lower())

    
guest_df = guest_df.withColumn("guest_date", F.to_date(F.col("date")))
guest_df = guest_df.withColumn(
    "hash_code",
    F.sha2(F.concat_ws("||", "guest_name", "guest_number"), 256)
)


guest_df.createOrReplaceTempView("guests")


In [0]:
print(guest_df.count())

In [0]:
guest_df.display()

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS silver; CREATE TABLE IF NOT EXISTS silver.guests (
    guest_name string,
    email string,
    guest_number string,  
    phone_number string,
    street_address string,
    guest_date date,
    state string,
    is_current boolean,
    audit_ts timestamp,
    hash_code string

    );
    
TRUNCATE TABLE silver.guests





In [0]:
%sql
MERGE INTO silver.guests AS target
USING guests AS source
ON target.hash_code  = source.hash_code  AND target.is_current = true
WHEN MATCHED AND (
target.phone_number != source.phone_num OR 
target.email != source.email_address OR
target.guest_date != source.guest_date OR
target.street_address != source.street_address)
THEN
UPDATE SET
target.is_current = false,
target.audit_ts = current_timestamp()





In [0]:
%sql
MERGE INTO
  silver.guests AS target
USING
  guests AS source
ON
  target.hash_code  = source.hash_code

WHEN NOT MATCHED THEN INSERT (
    guest_name,
    email,
    guest_number,
    phone_number,
    street_address,
    guest_date,
    state,
    is_current,
    audit_ts,
    hash_code
  )
  Values
  (
    source.guest_name,
    source.email_address,
    source.guest_number,
    source.phone_num,
    source.street_address,
    source.guest_date,
    source.state,
    true,
    current_timestamp(),
    hash_code

  )
  

In [0]:
%sql
Select * from silver.guests   

In [0]:
business_key = ["GUEST_NUMBER", "GUEST_NAME", "PHONE_NUM"]

