### Reading Delta Tables

In [0]:
df_transactions = spark.read.table("sales_db.transactions")
df_calendar = spark.read.table("sales_db.calendar")

In [0]:
print(df_transactions.count())
print(df_calendar.count())

4503108
1820


In [0]:
df_transactions.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- line_id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- dt: timestamp (nullable = true)
 |-- pos_site_id: string (nullable = true)
 |-- sku_id: string (nullable = true)
 |-- fscldt_id: integer (nullable = true)
 |-- price_substate_id: string (nullable = true)
 |-- sales_units: integer (nullable = true)
 |-- sales_dollars: double (nullable = true)
 |-- discount_dollars: double (nullable = true)
 |-- original_order_id: long (nullable = true)
 |-- original_line_id: integer (nullable = true)



In [0]:
display(df_transactions.limit(2))

order_id,line_id,type,dt,pos_site_id,sku_id,fscldt_id,price_substate_id,sales_units,sales_dollars,discount_dollars,original_order_id,original_line_id
155291291,3,Cancel,2016-02-01T17:43:01.000+0000,CATMAIN,7208420100,20160201,FP,1,49.95,0.0,155291291,0
155026106,1,Cancel,2016-02-02T13:02:35.000+0000,CATMAIN,2800380501,20160202,FP,1,119.95,0.0,155026106,0


In [0]:
df_calendar.printSchema()

root
 |-- fscldt_id: integer (nullable = true)
 |-- fscldt_label: string (nullable = true)
 |-- fsclwk_id: integer (nullable = true)
 |-- fsclwk_label: string (nullable = true)
 |-- fsclmth_id: integer (nullable = true)
 |-- fsclmth_label: string (nullable = true)
 |-- fsclqrtr_id: integer (nullable = true)
 |-- fsclqrtr_label: string (nullable = true)
 |-- fsclyr_id: integer (nullable = true)
 |-- fsclyr_label: integer (nullable = true)
 |-- ssn_id: string (nullable = true)
 |-- ssn_label: string (nullable = true)
 |-- ly_fscldt_id: integer (nullable = true)
 |-- lly_fscldt_id: integer (nullable = true)
 |-- fscldow: integer (nullable = true)
 |-- fscldom: integer (nullable = true)
 |-- fscldoq: integer (nullable = true)
 |-- fscldoy: integer (nullable = true)
 |-- fsclwoy: integer (nullable = true)
 |-- fsclmoy: integer (nullable = true)
 |-- fsclqoy: integer (nullable = true)
 |-- date: date (nullable = true)



In [0]:
display(df_calendar.limit(2))

fscldt_id,fscldt_label,fsclwk_id,fsclwk_label,fsclmth_id,fsclmth_label,fsclqrtr_id,fsclqrtr_label,fsclyr_id,fsclyr_label,ssn_id,ssn_label,ly_fscldt_id,lly_fscldt_id,fscldow,fscldom,fscldoq,fscldoy,fsclwoy,fsclmoy,fsclqoy,date
20180204,"Feb 4, 2018",201801,"WK 01, 2018",201801,"Feb, 2018",20181,"Q1, 2018",2018,2018,SPRG2018,Spring 2018,20170205,20160207,1,1,1,1,1,1,1,2018-02-04
20180205,"Feb 5, 2018",201801,"WK 01, 2018",201801,"Feb, 2018",20181,"Q1, 2018",2018,2018,SPRG2018,Spring 2018,20170206,20160208,2,2,2,2,1,1,1,2018-02-05


### Weekly Sales Table

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as ty
from datetime import datetime

processed_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
df_mview_weekly_sales = (
    df_transactions.alias("t")
    .join(
        df_calendar.alias("c"),
        df_transactions.fscldt_id == df_calendar.fscldt_id,
        "inner",
    )
    .groupby(
        "t.pos_site_id",
        "t.sku_id",
        "c.fsclwk_id",
        "t.price_substate_id",
        "t.type",
    )
    .agg(
        F.sum("t.sales_units").alias("total_sales_units"),
        F.sum("t.sales_dollars").alias("sales_dollars"),
        F.sum("t.discount_dollars").alias("discount_dollars"),
    )
    .withColumn("processed_date", F.lit(processed_date).cast(ty.TimestampType())) 
)

In [0]:
display(df_mview_weekly_sales.limit(2))

pos_site_id,sku_id,fsclwk_id,price_substate_id,type,total_sales_units,sales_dollars,discount_dollars,processed_date
CATMAIN,2785140701,201801,FP,Sale,16,1108.7100000000005,10.49,2025-03-09T06:03:30.000+0000
177,1AV5420000,201801,MD2,Sale,9,49.890000000000015,39.93000000000001,2025-03-09T06:03:30.000+0000


### Weekly Sales Table to Delta

In [0]:
df_mview_weekly_sales.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("sales_db.weekly_sales")

In [0]:
df_mview_weekly_sales.count()

Out[28]: 887849

### Incremental Data - New Rows

In [0]:
from pyspark.sql.types import *
schema = StructType([
    StructField("pos_site_id", StringType(), True),
    StructField("sku_id", StringType(), True),
    StructField("fsclwk_id", IntegerType(), True),
    StructField("price_substate_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("total_sales_units", LongType(), True),
    StructField("sales_dollars", DoubleType(), True),
    StructField("discount_dollars", DoubleType(), True),
    StructField("processed_date", TimestampType(), True)
])
data = [
    ("CATMAIN_NEW", "2785140701", 201801, "FP", "Sale", 16, 1108.71, 10.49,datetime.now()),
    ("177_NEW", "1AV5420000", 201801, "MD2", "Sale", 9, 49.89, 39.93,datetime.now())
]
df_inc_data = spark.createDataFrame(data, schema=schema)
display(df_inc_data)

pos_site_id,sku_id,fsclwk_id,price_substate_id,type,total_sales_units,sales_dollars,discount_dollars,processed_date
CATMAIN_NEW,2785140701,201801,FP,Sale,16,1108.71,10.49,2025-03-09T06:09:42.924+0000
177_NEW,1AV5420000,201801,MD2,Sale,9,49.89,39.93,2025-03-09T06:09:42.924+0000


In [0]:
df_inc_data.count()

Out[25]: 2

### Incremental Data - Updated Rows

In [0]:
df_inc_mview_weekly_sales = (
    df_transactions.limit(10000).alias("t")
    .join(
        df_calendar.limit(1000).alias("c"),
        df_transactions.fscldt_id == df_calendar.fscldt_id,
        "inner",
    )
    .groupby(
        "t.pos_site_id",
        "t.sku_id",
        "c.fsclwk_id",
        "t.price_substate_id",
        "t.type",
    )
    .agg(
        F.sum("t.sales_units").alias("total_sales_units"),
        F.sum("t.sales_dollars").alias("sales_dollars"),
        F.sum("t.discount_dollars").alias("discount_dollars"),
    )
    .withColumn("processed_date", F.lit(processed_date).cast(ty.TimestampType())) 
)
display(df_inc_mview_weekly_sales.limit(2))

pos_site_id,sku_id,fsclwk_id,price_substate_id,type,total_sales_units,sales_dollars,discount_dollars,processed_date
CATMAIN,2870720701,201801,FP,Cancel,2,129.9,0.0,2025-03-09T06:03:30.000+0000
CATMAIN,2870720601,201801,FP,Cancel,3,194.85,0.0,2025-03-09T06:03:30.000+0000


In [0]:
df_inc_mview_weekly_sales.count()

Out[14]: 1001

### Merge Logic for Incremental Data

In [0]:
from delta.tables import DeltaTable
delta_table = DeltaTable.forName(spark, "sales_db.weekly_sales")

In [0]:
delta_table.alias("target").merge(
    df_inc_data.alias("source"),
    "target.pos_site_id = source.pos_site_id AND target.sku_id = source.sku_id AND target.fsclwk_id = source.fsclwk_id"
).whenMatchedUpdate(set={
    "total_sales_units": "source.total_sales_units",
    "sales_dollars": "source.sales_dollars",
    "discount_dollars": "source.discount_dollars",
    "processed_date": "source.processed_date"
}).whenNotMatchedInsert(values={
    "pos_site_id": "source.pos_site_id",
    "sku_id": "source.sku_id",
    "fsclwk_id": "source.fsclwk_id",
    "price_substate_id": "source.price_substate_id",
    "type": "source.type",
    "total_sales_units": "source.total_sales_units",
    "sales_dollars": "source.sales_dollars",
    "discount_dollars": "source.discount_dollars",
    "processed_date": "source.processed_date"
}).execute()

In [0]:
delta_table.alias("target").merge(
    df_inc_mview_weekly_sales.alias("source"),
    "target.pos_site_id = source.pos_site_id AND target.sku_id = source.sku_id AND target.fsclwk_id = source.fsclwk_id"
).whenMatchedUpdate(set={
    "total_sales_units": "source.total_sales_units",
    "sales_dollars": "source.sales_dollars",
    "discount_dollars": "source.discount_dollars",
    "processed_date": "source.processed_date"
}).whenNotMatchedInsert(values={
    "pos_site_id": "source.pos_site_id",
    "sku_id": "source.sku_id",
    "fsclwk_id": "source.fsclwk_id",
    "price_substate_id": "source.price_substate_id",
    "type": "source.type",
    "total_sales_units": "source.total_sales_units",
    "sales_dollars": "source.sales_dollars",
    "discount_dollars": "source.discount_dollars",
    "processed_date": "source.processed_date"
}).execute()

### Check Incremetal Data Load in Delta Table

In [0]:
df_test = spark.read.format("delta").table("sales_db.weekly_sales")
df_test.count()
# Before Incremental Data
#: 887849
# After Incremental Data
#: 887851

Out[34]: 887851

### Dump

In [0]:
# df_transactions.select('pos_site_id').distinct().show()
# from pyspark.sql.types import *
# schema_string = """
#     pos_site_id STRING,
#     sku_id STRING,
#     fsclwk_id INTEGER,
#     price_substate_id STRING,
#     type STRING,
#     total_sales_units LONG,
#     sales_dollars DOUBLE,
#     discount_dollars DOUBLE
# """
# schema = StructType.fromDDL(schema_string)
# print(schema)