In [0]:
df = spark.sql("SELECT * FROM bronze.agent WHERE merge_flag = FALSE")


## Transformations on Agent data

1. Remove all rows where branch id does not exists in **branch table**

In [0]:
df_branch = spark.sql("SELECT * FROM bronze.branch")
df_result = spark.sql("SELECT agent.* from bronze.agent INNER JOIN bronze.branch ON agent.branch_id = branch.branch_id where agent.merge_flag=FALSE") #check for branch ids present in agent table which are also in branch table


2. Perform phone number validation

In [0]:
from pyspark.sql.functions import length, col

df_phone = df_result.filter(length(col("agent_phone"))== 10)


3. Replace all null values in email col with default admin email and merge into silver layer 

In [0]:
from pyspark.sql.functions import trim, when
df_emails = df_phone.withColumn(
    "agent_email",
    when(col("agent_email").isNull() | (trim(col("agent_email")) == ""), "default@admin.com")
    .otherwise(col("agent_email"))
)


4. Add merged_date_timestamp(current_timestamp) and merge to silver layer

In [0]:
df_emails.createOrReplaceTempView("transformed_agent")
spark.sql("MERGE INTO silver.agent t USING transformed_agent s ON t.agent_id = s.agent_id WHEN MATCHED THEN UPDATE SET t.agent_name = s.agent_name, t.agent_email = s.agent_email, t.agent_phone = s.agent_phone,t.branch_id = s.branch_id, t.create_timestamp = s.create_timestamp, t.merged_timestamp = current_timestamp() WHEN NOT MATCHED THEN INSERT (agent_id, agent_name, agent_email, agent_phone, branch_id, create_timestamp, merged_timestamp) VALUES (s.agent_id, s.agent_name, s.agent_email, s.agent_phone, s.branch_id, s.create_timestamp, current_timestamp())") #meege transformed data to silver table


In [0]:
spark.sql("update bronze.agent set merge_flag = true where merge_flag = false")