In [1]:
# Script to Load Dimesion

In [17]:
# Import required libraries
import sys
from lib.spark_session import get_spark_session
from lib.utils import date_data, get_string_cols, get_rundate
from lib.job_control import insert_log, get_max_timestamp
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, expr, to_date, date_format, udf, lit
from pyspark.sql.types import StringType
from datetime import datetime
from delta import DeltaTable
import uuid

In [3]:
# JOB Parameters
rundate = get_rundate()
schema_name = "edw"
table_name = "dim_customer"
table_full_name = f"{schema_name}.{table_name}"
staging_table_full_name = "edw_stg.dim_customer_stg"
print("SPARK_APP: JOB triggered for rundate - " + rundate)

In [4]:
# Generate Spark Session
spark: SparkSession = get_spark_session(f"Dimension load - {table_full_name}")
print("SPARK_APP: Spark UI - " + spark.sparkContext.uiWebUrl)

SPARK_APP: Spark UI - http://46346aae9d54:4040


In [5]:
# Spark Configs
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [6]:
# Read data from Staging
df_stg = spark \
    .read \
    .table(staging_table_full_name)

print("SPARK_APP: Staging Data Count - " + str(df_stg.count()))
print("SPARK_APP: Printing Staging Schema --")
df_stg.printSchema()

SPARK_APP: Staging Data Count - 19
SPARK_APP: Printing Staging Schema --
root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- email: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- plan_type: string (nullable = true)
 |-- insert_dt: timestamp (nullable = true)
 |-- rundate: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- effective_start_dt: timestamp (nullable = true)
 |-- effective_end_dt: timestamp (nullable = true)
 |-- active_flg: integer (nullable = true)
 |-- update_dt: timestamp (nullable = true)



In [7]:
# Generated uuid UDF for Surrogate Key
uuidUDF = udf(lambda : str(uuid.uuid4()),StringType())

In [20]:
# Generate SURROGATE KEYs
df_dim_temp = df_stg \
    .withColumn("row_wid", uuidUDF()) \
    .withColumn("hist_record_end_timestamp", expr("cast(effective_start_dt as TIMESTAMP) - INTERVAL 1 seconds")) \
    .withColumn("hist_record_active_flg", lit(0)) \
    .withColumn("hist_record_update_dt", current_timestamp()) 

print("SPARK_APP: Dim Temp Data Count - " + str(df_dim_temp.count()))
print("SPARK_APP: Printing Dim Temp Schema --")
df_dim_temp.printSchema()

SPARK_APP: Dim Temp Data Count - 19
SPARK_APP: Printing Dim Temp Schema --
root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- email: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- plan_type: string (nullable = true)
 |-- insert_dt: timestamp (nullable = true)
 |-- rundate: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- effective_start_dt: timestamp (nullable = true)
 |-- effective_end_dt: timestamp (nullable = true)
 |-- active_flg: integer (nullable = true)
 |-- update_dt: timestamp (nullable = true)
 |-- row_wid: string (nullable = true)
 |-- hist_record_end_timestamp: timestamp (nullable = true)
 |-- hist_record_active_flg: integer (nullable = false)
 |-- hist_re

In [21]:
# Get the delta table for Upserts (SCD2)
dt_dim = DeltaTable.forName(spark, table_full_name)

# Validate if the load is full load
if get_max_timestamp(spark, schema_name, table_name) == "1900-01-01 00:00:00.000000":
    print("SPARK_APP: Table is set for full load") 
    # Truncate the Dimension table
    dt_dim.delete(f"1=1")
    # Vaccumm the table to remove all older snapshots
    spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
    dt.vacuum(0)

# Create the UPSERT logic
dt_dim.alias("dim_customer") \
    .merge(df_dim_temp.alias("dim_temp"), "dim_customer.customer_id = dim_temp.customer_id and dim_customer.active_flg = 1") \
    .whenMatchedUpdate(set =
        {
           "effective_end_dt" : "dim_temp.hist_record_end_timestamp",
            "active_flg" : "dim_temp.hist_record_active_flg",
            "update_dt" : "dim_temp.hist_record_update_dt"
        }  
    ) \
    .execute()
print("SPARK_APP: Updated History Records")

SPARK_APP: Updated History Records


In [22]:
# Get the logs from delta table version
dt_dim.history().limit(1).select("version","operationMetrics.executionTimeMs", 
                                 "operationMetrics.numTargetRowsInserted",
                                "operationMetrics.numTargetRowsUpdated",
                                "operationMetrics.numOutputRows").show(1, False)

+-------+---------------+---------------------+--------------------+-------------+
|version|executionTimeMs|numTargetRowsInserted|numTargetRowsUpdated|numOutputRows|
+-------+---------------+---------------------+--------------------+-------------+
|1      |4335           |0                    |0                   |0            |
+-------+---------------+---------------------+--------------------+-------------+



In [26]:
# Insert all records in Delta Table in APPEND mode
df_dim_temp.drop("hist_record_end_timestamp", "hist_record_active_flg", "hist_record_update_dt", "name") \
    .write \
    .format("delta") \
    .mode("append") \
    .saveAsTable(table_full_name)
print("SPARK_APP: Active Records inserted into Dimesion Table")

SPARK_APP: Active Records inserted into Dimesion Table


In [27]:
# Get the logs from delta table version
dt_dim.history().limit(1).select("version","operationMetrics.executionTimeMs", 
                                 "operationMetrics.numTargetRowsInserted",
                                "operationMetrics.numTargetRowsUpdated",
                                "operationMetrics.numOutputRows").show(1, False)

+-------+---------------+---------------------+--------------------+-------------+
|version|executionTimeMs|numTargetRowsInserted|numTargetRowsUpdated|numOutputRows|
+-------+---------------+---------------------+--------------------+-------------+
|1      |null           |null                 |null                |19           |
+-------+---------------+---------------------+--------------------+-------------+



In [28]:
# Add job details in JOB CONTROL
insert_log(spark, schema_name, table_name, datetime.now(), rundate)
print("SPARK_APP: Update JOB Control Log")

SPARK_APP: Update JOB Control Log


In [29]:
spark.sql(f"select * from edw.job_control where table_name = '{table_name}' order by insert_dt desc limit 1").show(truncate=False)

+-----------+------------+--------------------------+--------+--------------------------+
|schema_name|table_name  |max_timestamp             |rundate |insert_dt                 |
+-----------+------------+--------------------------+--------+--------------------------+
|edw        |dim_customer|2023-02-06 11:51:40.262862|20220101|2023-02-06 11:51:42.052823|
+-----------+------------+--------------------------+--------+--------------------------+



In [30]:
# Generate Symlink manifest for Athena Access
dt = DeltaTable.forName(spark, table_full_name)
dt.generate("symlink_format_manifest")
print("SPARK_APP: Symlink Manifest file generated")

SPARK_APP: Symlink Manifest file generated


In [31]:
spark.stop()