### This note book incrementally loads the data from the stage table to all the Dimensional and Fact Tables

In [0]:
# Importing the required libraries and functions.
from pyspark.sql.functions import (
    col,
    current_timestamp,
    to_timestamp,
    concat,
    substring,
    max,
)
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
# Reading the stage table
df = spark.read.table("stg_store.stg_orders")

# Calculating the current maximum TransactionTime from fact_order table
df_fact_orders_max_transaction_time = spark.read.table("store.fact_orders")
max_transaction_time = df_fact_orders_max_transaction_time.agg(
    max(col("TransactionTime")).alias("max_transaction_time")
).collect()[0][0]


# Formatting the TransactionTime
df_temp = df.withColumn(
    "TransactionTime",
    to_timestamp(
        concat(
            substring(col("TransactionTime"), 5, 4),  # Extract year (e.g., "2023")
            substring(col("TransactionTime"), 9, 3),  # Extract month (e.g., "Jan")
            substring(col("TransactionTime"), -4, 4),  # Extract day (e.g., "10")
            substring(col("TransactionTime"), 11, 9),  # Extract time (e.g., "12:30:00")
        ),
        "MMM dd yyyy HH:mm:ss",
    ),
).filter((col("UserID") != -1) & (col("UserID") != -1) & (col("ItemCode") != -1))

if max_transaction_time is None:
    # Load all records if no max_transaction_time is found
    df_incremental = df_temp
else:
    # Incremental filter based on maximum TransactionTime
    df_incremental = df_temp.filter(col("TransactionTime") > max_transaction_time)

In [0]:
# Loading the dim_users table with only new users
dim_users_df = spark.table("store.dim_users")

new_records_dim_users_df = (
    df_incremental.join(dim_users_df, on="UserId", how="left_anti")
    .select("UserId")
    .distinct()
)

# Adding surrogate key and load date columns
new_records_dim_users_with_skey_df = new_records_dim_users_df.withColumn(
    "user_skey", monotonically_increasing_id().cast("int")
).withColumn("load_date", current_timestamp())

# Rearranging the columns as per table
df_dim_users_rearranged = new_records_dim_users_with_skey_df.select(
    "user_skey", "userid", "load_date"
)

# Appending the data to dim users table
df_dim_users_rearranged.write.format("delta").mode("append").saveAsTable(
    "store.dim_users"
)

In [0]:
# Loading the dim_items table with only new items
dim_items_df = spark.table("store.dim_items")

new_records_dim_items_df = (
    df_incremental.join(dim_items_df, on="ItemCode", how="left_anti")
    .select("ItemCode", "ItemDescription", "CostPerItem")
    .distinct()
)

# Adding surrogate key and load date columns
new_records_dim_items_with_skey_df = new_records_dim_items_df.withColumn(
    "item_skey", monotonically_increasing_id().cast("int")
).withColumn("load_date", current_timestamp())

# Rearranging the columns as per table
df_dim_items_rearranged = new_records_dim_items_with_skey_df.select(
    "item_skey", "ItemCode", "ItemDescription", "CostPerItem", "load_date"
)

# Appending the data to dim items table
df_dim_items_rearranged.write.format("delta").mode("append").saveAsTable(
    "store.dim_items"
)

In [0]:
# Loading the dim_country table with only new countries
dim_country_df = spark.table("store.dim_country")

new_records_dim_country_df = (
    df_incremental.join(dim_country_df, on="country", how="left_anti")
    .select("country")
    .distinct()
)

# Adding surrogate key and load date columns
new_records_dim_country_with_skey_df = new_records_dim_country_df.withColumn(
    "country_skey", monotonically_increasing_id().cast("int")
).withColumn("load_date", current_timestamp())

# Rearranging the columns as per table
df_dim_country_rearranged = new_records_dim_country_with_skey_df.select(
    "country_skey", "country", "load_date"
)

# Appending the data to dim country table
df_dim_country_rearranged.write.format("delta").mode("append").saveAsTable(
    "store.dim_country"
)

In [0]:
# Reading the updated dimension tables as dataframes

df_dim_users = spark.read.table("store.dim_users")
df_dim_items = spark.read.table("store.dim_items")
df_dim_country = spark.read.table("store.dim_country")

# Lookup on the dimension tables to assign the respective keys
df_fact_orders = (
    df_incremental.join(
        df_dim_users, df_incremental["UserId"] == df_dim_users["userID"], "left"
    )
    .join(df_dim_items, df_incremental["ItemCode"] == df_dim_items["ItemCode"], "left")
    .join(
        df_dim_country, df_incremental["Country"] == df_dim_country["country"], "left"
    )
    .select(
        monotonically_increasing_id().alias("key").cast("int"),
        df_incremental["TransactionId"],
        df_incremental["TransactionTime"],
        df_dim_users["user_skey"].alias("user_skey"),
        df_dim_items["item_skey"].alias("item_skey"),
        df_incremental["NumberOfItemsPurchased"],
        df_dim_country["country_skey"].alias("country_skey"),
        df_incremental["load_date"],
    )
)

# Appending the data to fact_orders table
df_fact_orders.write.format("delta").mode("append").saveAsTable("store.fact_orders ")