In [14]:
# Script to create date for Sales Fact Staging
# Import required libraries
import sys
from lib.spark_session import get_spark_session
from lib.utils import date_data, get_string_cols, get_rundate
from lib.job_control import insert_log, get_max_timestamp
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, expr, to_date, lit,to_timestamp,lit,coalesce,split,from_json,explode
from datetime import datetime
from delta import DeltaTable

In [7]:
# Job Parameters
rundate = get_rundate()
schema_name = 'edw_stg'
table_name = 'fact_sales_stg'
table_full_name = f"{schema_name}.{table_name}"
landing_table_full_name = "edw_ld.dim_sales_ld"
print("SPARK_APP: JOB triggered for rundate - " + rundate)

SPARK_APP: JOB triggered for rundate - 20220101


In [3]:
# creating Spark Session
spark : SparkSession = get_spark_session(f"staging_load: {table_full_name}")
print("SPARK_APP: Spark UI - " + spark.sparkContext.uiWebUrl)

SPARK_APP: Spark UI - http://03205cdd01e3:4040


In [4]:
# Configuring Spark Job
spark.conf.set("spark.sql.shuffle.partitions",8)
spark.conf.set("spark.sql.parquet.mergeSchema",True)

In [5]:
# Get the max_timestamp for data load in staging
max_timestamp = get_max_timestamp(spark, schema_name, table_name)
print("SPARK_APP: Max timestamp for staging data load - " + str(max_timestamp))

SPARK_APP: Max timestamp for staging data load - 1900-01-01 00:00:00.000000


In [8]:
df_ld = spark \
    .read \
    .table(landing_table_full_name) \
    .where(f"insert_dt > to_timestamp('{max_timestamp}')")

print("SPARK_APP: Landing Data Count - " + str(df_ld.count()))
print("SPARK_APP: Printing Landing Schema --")
df_ld.printSchema()

SPARK_APP: Landing Data Count - 2
SPARK_APP: Printing Landing Schema --
root
 |-- value: string (nullable = true)
 |-- insert_dt: timestamp (nullable = true)
 |-- rundate: string (nullable = true)



In [9]:
# Determine the schema of the JSON payload from the column
json_schema_df = spark.read.json(df_ld.rdd.map(lambda row: row.value))
json_schema = json_schema_df.schema
print("SPARK_APP: Printing Landing Parsed JSON Schema - " + str(json_schema))

SPARK_APP: Printing Landing Parsed JSON Schema - StructType([StructField('orders', ArrayType(StructType([StructField('cust_id', StringType(), True), StructField('invoice_num', StringType(), True), StructField('order_date', StringType(), True), StructField('order_id', StringType(), True), StructField('order_lines', ArrayType(StructType([StructField('currency_code', StringType(), True), StructField('discount', LongType(), True), StructField('discount_type', StringType(), True), StructField('prod_id', StringType(), True), StructField('qty', LongType(), True), StructField('tax', LongType(), True), StructField('tax_type', StringType(), True)]), True), True), StructField('store_id', StringType(), True), StructField('system_date', StringType(), True)]), True), True)])


In [15]:
# Assign schema dynamically to read the JSON column
df_stg_temp = df_ld.withColumn("sales_data", from_json(df_ld["value"], json_schema))

# Explode the column orders
df_exploded_temp = df_stg_temp.withColumn("orders", explode("sales_data.orders")).select("*", "orders.*")

# Explode the order line items
df_exploded = df_exploded_temp.withColumn("order_line_items", explode("order_lines")).select("*", "order_line_items.*").drop("value", "sales_data", "orders", "order_lines", "order_line_items")
print("SPARK_APP: Printing Exploded & Flattened JSON Schema --")
df_exploded.printSchema()
print("SPARK_APP: Total Exploded records - " + str(df_exploded.count()))

SPARK_APP: Printing Exploded & Flattened JSON Schema --
root
 |-- insert_dt: timestamp (nullable = true)
 |-- rundate: string (nullable = true)
 |-- cust_id: string (nullable = true)
 |-- invoice_num: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- system_date: string (nullable = true)
 |-- currency_code: string (nullable = true)
 |-- discount: long (nullable = true)
 |-- discount_type: string (nullable = true)
 |-- prod_id: string (nullable = true)
 |-- qty: long (nullable = true)
 |-- tax: long (nullable = true)
 |-- tax_type: string (nullable = true)

SPARK_APP: Total Exploded records - 163


In [17]:
# Desired calculations for TAX, DISCOUNT and LINE ITEM TOTAL
# LINE_ITEM_TOTAL = (QTY X PRICE) + TAX - DISCOUNT

# Get Product Dim to get price
df_dim_prod = spark.read.table("edw.dim_product").where("active_flg = 1").select("row_wid", "product_id", "price")

# Join with product and calculate the columns
df_stg_temp = df_exploded.join(df_dim_prod, how="left_outer", on=df_exploded.prod_id == df_dim_prod.product_id) \
    .withColumn("sub_total", expr("qty * price")) \
    .withColumn("tax", expr("case when tax_type = 'INR' then tax else (tax/100)*sub_total end")) \
    .withColumn("discount", expr("case when discount_type = 'INR' then discount else (discount/100)*sub_total end")) \
    .withColumn("line_total", expr("sub_total + tax - discount"))

print("SPARK_APP: Printing Temp STG schema after calculations --")
df_stg_temp.printSchema()

SPARK_APP: Printing Temp STG schema after calculations --
root
 |-- insert_dt: timestamp (nullable = true)
 |-- rundate: string (nullable = true)
 |-- cust_id: string (nullable = true)
 |-- invoice_num: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- system_date: string (nullable = true)
 |-- currency_code: string (nullable = true)
 |-- discount: double (nullable = true)
 |-- discount_type: string (nullable = true)
 |-- prod_id: string (nullable = true)
 |-- qty: long (nullable = true)
 |-- tax: double (nullable = true)
 |-- tax_type: string (nullable = true)
 |-- row_wid: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- sub_total: double (nullable = true)
 |-- line_total: double (nullable = true)



In [27]:
# Format the landing data
df_stg = df_stg_temp.select("cust_id", "store_id", "order_date", "qty", "tax", "discount", "line_total", "order_id", "invoice_num", "prod_id", "row_wid") \
    .withColumn("product_wid", expr("row_wid")) \
    .withColumn("integration_key", expr("order_id||'~'||prod_id||'~'||store_id||'~'||cust_id||'~'||order_date")) \
    .withColumn("qty", expr("cast (qty as int)")) \
    .withColumn("rundate", lit(rundate)) \
    .withColumn("insert_dt", current_timestamp()) \
    .withColumn("update_dt", current_timestamp()) \
    .drop("row_wid")

print("SPARK_APP: Staging Data Count - " + str(df_stg.count()))
print("SPARK_APP: Printing Staging Schema --")
df_stg.printSchema()

SPARK_APP: Staging Data Count - 163
SPARK_APP: Printing Staging Schema --
root
 |-- cust_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- qty: integer (nullable = true)
 |-- tax: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- line_total: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- invoice_num: string (nullable = true)
 |-- prod_id: string (nullable = true)
 |-- product_wid: string (nullable = true)
 |-- integration_key: string (nullable = true)
 |-- rundate: string (nullable = false)
 |-- insert_dt: timestamp (nullable = false)
 |-- update_dt: timestamp (nullable = false)



In [28]:
df_stg.show(5,truncate = False)

+-------+--------+----------+---+----+--------+----------+----------------+----------------+-------+----------------------------------------------------------+------------------------------------------+--------+--------------------------+--------------------------+
|cust_id|store_id|order_date|qty|tax |discount|line_total|order_id        |invoice_num     |prod_id|product_wid                                               |integration_key                           |rundate |insert_dt                 |update_dt                 |
+-------+--------+----------+---+----+--------+----------+----------------+----------------+-------+----------------------------------------------------------+------------------------------------------+--------+--------------------------+--------------------------+
|C018   |S003    |2022-07-19|9  |11.0|5.4     |545.6     |ORD2022071900000|INV2022071900000|P009   |net.razorvine.pickle.objects.ClassDictConstructor@55c03255|ORD2022071900000~P009~S003~C018~2022-07-19|

In [29]:
# Write the data to Staging table in overwrite mode for truncate
df_stg.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(table_full_name)

print("SPARK_APP: Data written to staging table")

SPARK_APP: Data written to staging table


In [30]:
# Add job details in JOB CONTROL
insert_log(spark, schema_name, table_name, datetime.now(), rundate)
print("SPARK_APP: Update JOB Control Log")

SPARK_APP: Update JOB Control Log


In [31]:
spark.sql(f"select * from edw.job_control where table_name = '{table_name}' order by insert_dt desc limit 1").show(truncate=False)

+-----------+--------------+--------------------------+--------+--------------------------+
|schema_name|table_name    |max_timestamp             |rundate |insert_dt                 |
+-----------+--------------+--------------------------+--------+--------------------------+
|edw_stg    |fact_sales_stg|2024-06-02 08:54:13.403334|20220101|2024-06-02 08:54:14.003559|
+-----------+--------------+--------------------------+--------+--------------------------+



In [32]:
# Get the logs from delta table version
dt = DeltaTable.forName(spark, table_full_name)
dt.history().limit(1).select("version","operationMetrics.executionTimeMs", 
                                 "operationMetrics.numTargetRowsInserted",
                                "operationMetrics.numTargetRowsUpdated",
                                "operationMetrics.numOutputRows").show(1, False)

+-------+---------------+---------------------+--------------------+-------------+
|version|executionTimeMs|numTargetRowsInserted|numTargetRowsUpdated|numOutputRows|
+-------+---------------+---------------------+--------------------+-------------+
|1      |null           |null                 |null                |163          |
+-------+---------------+---------------------+--------------------+-------------+



In [33]:
# Generate Symlink manifest for Athena Access
dt.generate("symlink_format_manifest")
print("SPARK_APP: Symlink Manifest file generated")

SPARK_APP: Symlink Manifest file generated


In [34]:
spark.sql("select * from edw_stg.fact_sales_stg limit 10").show()

+-------+--------+----------+---+------------------+--------+----------+----------------+----------------+-------+--------------------+--------------------+--------+--------------------+--------------------+
|cust_id|store_id|order_date|qty|               tax|discount|line_total|        order_id|     invoice_num|prod_id|         product_wid|     integration_key| rundate|           insert_dt|           update_dt|
+-------+--------+----------+---+------------------+--------+----------+----------------+----------------+-------+--------------------+--------------------+--------+--------------------+--------------------+
|   C018|    S003|2022-07-19|  9|              11.0|     5.4|     545.6|ORD2022071900000|INV2022071900000|   P009|net.razorvine.pic...|ORD2022071900000~...|20220101|2024-06-02 08:53:...|2024-06-02 08:53:...|
|   C018|    S003|2022-07-19|  5|              13.0|     8.0|     105.0|ORD2022071900000|INV2022071900000|   P013|net.razorvine.pic...|ORD2022071900000~...|20220101|202

In [35]:
spark.stop()