In [0]:
import sys
import os
sys.path.append(os.path.abspath('../utils/'))

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
import json
from dwh_utils import *
from transformation_utils import *

In [0]:
df = spark.read.table("acme_ecommerce.bronze.orders")

In [0]:
df = deduplicate_data_by_time(spark=spark, df=df, partition_cols=["order_id", "row_id"], order_col="ingest_timestamp")
df = round_value(spark=spark, df=df, column="profit", round_to=2)
df = to_date_col(spark=spark, df=df, col_name="order_date", fmt="d/M/yyyy")
df = to_date_col(spark=spark, df=df, col_name="ship_date", fmt="d/M/yyyy")
df = df.withColumn("unit_price", F.col("price") / F.col("quantity"))
df = round_value(spark=spark, df=df, column="unit_price", round_to=2)

In [0]:
order_alias = df.alias("o")
products_alias = spark.table("acme_ecommerce.silver.products").alias("p")
customers_alias = spark.table("acme_ecommerce.silver.customers").alias("c")

In [0]:
df = (
    order_alias
    .join(
        products_alias,
        on="product_id",
        how="left"
    )
    .join(
        customers_alias,
        on="customer_id",
        how="left"
    ).select(
        "o.order_id",
        "o.row_id",
        "o.order_date",
        "o.price",
        "o.quantity",
        "o.unit_price",
        "o.discount",
        "o.profit",
        "o.product_id",
        "p.category",
        "p.sub-category",
        "o.ship_date",
        "o.ship_mode",
        "o.customer_id",
        "c.customer_name",
        "c.country",
        "o.ingest_timestamp",
        "o.file_path"
    )
)

In [0]:
write_to_delta_with_cdc_by_name(
    spark=spark, df=df, catalog="acme_ecommerce", schema="silver", table="orders", merge_keys=["order_id", "row_id"], append_only=False
)