In [0]:
dbutils.widgets.text(name="env", defaultValue="dev")

In [0]:
catalog = "my_catalog"
env = schema = dbutils.widgets.get("env")

print(f"env    : {env}")
print(f"catalog: {catalog}")
print(f"schema : {schema}")

products_tbl = f"{catalog}.{schema}.products"
orders_tbl   = f"{catalog}.{schema}.orders"
final_tbl    = f"{catalog}.{schema}.orders_refined"

In [0]:
products_df = spark.table(products_tbl)
orders_df   = spark.table(orders_tbl)

In [0]:
import pyspark.sql.functions as F
from src.helpers.utils import validate_dates_count, write_to_table


Cast order_date col as DATE type and check for any invalid dates in original df.

In [0]:
orders_df = orders_df.withColumn(
    "order_date",
    F.col("order_date").cast("date")
)

invalid_count = validate_dates_count(orders_df, "order_date")
if invalid_count > 0:
    raise Exception(f"Invalid date count: {invalid_count}")


In [0]:
orders_refined = (
    orders_df
    .dropDuplicates()
    .join(products_df, on="product_id", how="inner")
    .withColumn(
        "line_total",
        F.round(F.col("quantity") * F.col("price"), 2)
    )
    .withColumn("processed_at", F.current_timestamp())
    .select(
        "order_id", 
        "customer_id", 
        "product_id", 
        "product_name", 
        "category", 
        "quantity", 
        "price", 
        "line_total", 
        "order_date", 
        "processed_at"
    )
)

In [0]:
write_query = ""

if env in ["stage", "prod"]:
    orders_refined.createOrReplaceTempView("orders_refined_vw")
    write_query = f"""
            MERGE INTO 
                {final_tbl} AS t
            USING 
                orders_refined_vw AS s
                ON t.order_id = s.order_id
            WHEN MATCHED THEN
                UPDATE SET *
            WHEN NOT MATCHED THEN
                INSERT *
            """

output = write_to_table(orders_refined, final_tbl, env, write_query)
print(output)

# TODO:
# Change write mode and add options to experiment:
#   1. mode    - overwrite, append
#   2. options - mergeSchema, allowSchemaEvolution, overwriteSchema
# How does merge query handle schema changes - new columns, data type change, renamed columns?