In [0]:
from datetime import datetime

current_date = datetime.now()
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
day = current_date.strftime("%d")

policy = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load(f"s3://dms-my-target-postgres-rds/public/policy/{year}/{month}/{day}/*.csv")

In [0]:
display(policy)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2042006140274714>, line 1[0m
[0;32m----> 1[0m display(policy)

File [0;32m/databricks/python_shell/lib/dbruntime/display.py:133[0m, in [0;36mDisplay.display[0;34m(self, input, *args, **kwargs)[0m
[1;32m    131[0m     [38;5;28;01mpass[39;00m
[1;32m    132[0m [38;5;28;01melif[39;00m [38;5;28mself[39m[38;5;241m.[39m_cf_helper [38;5;129;01mis[39;00m [38;5;129;01mnot[39;00m [38;5;28;01mNone[39;00m [38;5;129;01mand[39;00m [38;5;28misinstance[39m([38;5;28minput[39m, ConnectDataFrame):
[0;32m--> 133[0m     [38;5;28mself[39m[38;5;241m.[39mdisplay_connect_table([38;5;28minput[39m, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m    134[0m [38;5;28;01melif[39;00m [38;5;28misinstance[39m([38;5;28minput[39m, ConnectDataFrame):
[1;32m    135[0m     [38;5;28;

In [0]:
policy_cleaned = policy.drop("processed_time", "source")
policy_cleaned.write.format("delta").mode("overwrite").save("s3://dms-my-target-postgres-rds/processed/policy/policy_raw")



In [0]:
from pyspark.sql.functions import to_timestamp, date_format, col, row_number
from pyspark.sql.window import Window

# Convert event_time_source to timestamp and format as dd/MM/yyyy HH:mm:ss
policy_formatted = policy_cleaned.withColumn(
    "event_time_source_formatted",
    date_format(to_timestamp("event_time_source"), "dd/MM/yyyy HH:mm:ss")
)

# Window to get latest event_time_source per party_id
window_spec = Window.partitionBy("policy_id").orderBy(col("event_time_source").desc())

policy_latest = (
    policy_formatted.withColumn(
        "rn", row_number().over(window_spec)
    )
    .filter(col("rn") == 1)
    .drop("rn")
    .drop("event_time_source")
)

display(policy_latest)



In [0]:
# Define gold table path
gold_table_path = "s3://dms-my-target-postgres-rds/gold/policy/policy_gold"

# Rename 'event_time_source_formatted' to 'effective_date'
policy_gold_df = policy_latest.withColumnRenamed(
    "event_time_source_formatted", "effective_date"
)

# Write DataFrame to Delta table and create SQL table with schema
#policy_gold_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(gold_table_path)

# Register gold table as a SQL table if not exists
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS policy_gold
    USING DELTA
    LOCATION '{gold_table_path}'
""")

# Register party_gold_df as a temp view for SQL access
policy_gold_df.createOrReplaceTempView("policy_latest_gold")



In [0]:
%sql
MERGE INTO policy_gold
USING policy_latest_gold AS latest
ON policy_gold.policy_id = latest.policy_id
WHEN MATCHED AND latest.Op = 'U' THEN
  UPDATE SET
    policy_gold.policy_id = latest.policy_id,
    policy_gold.policy_number = latest.policy_number,
    policy_gold.party_id = latest.party_id,
    policy_gold.start_date = latest.start_date,
    policy_gold.end_date = latest.end_date,
    policy_gold.status = latest.status
WHEN MATCHED AND latest.Op = 'D' THEN
  DELETE
WHEN NOT MATCHED AND latest.Op IN ('I', 'U') THEN
  INSERT (
    policy_id,
    policy_number,
    party_id,
    start_date,
    end_date,
    status
  )
  VALUES (
    latest.policy_id,
    latest.policy_number,
    latest.party_id,
    latest.start_date,
    latest.end_date,
    latest.status
  )



In [0]:

display(spark.sql("SELECT * FROM policy_gold"))



In [0]:
# Join the two tables and create policy_party_joined
policy_gold_df = spark.table(
    "workspace.default.policy_gold"
)
party_gold_df = spark.table(
    "workspace.default.party_gold"
)

policy_party_joined_df = (
    policy_gold_df.join(
        party_gold_df,
        on="party_id",
        how="left"
    )
    .drop("effective_date", "Op", "party_id", "policy_id")
    .withColumnRenamed("policy_number", "POLICY_NUMBER")
    .withColumnRenamed("start_date", "START_DATE")
    .withColumnRenamed("end_date", "END_DATE")
    .withColumnRenamed("status", "STATUS")
    .withColumnRenamed("name", "PARTY_NAME")
    .withColumnRenamed("type", "PARTY_TYPE")
)

# Save the joined DataFrame as a new table
policy_party_joined_df.write.format(
    "delta"
).option(
    "overwriteSchema", "true"
).mode(
    "overwrite"
).saveAsTable(
    "workspace.default.policy_party_joined"
)

# Now you can query the new table
display(
    spark.sql("SELECT * FROM workspace.default.policy_party_joined")
)