In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
full_run = 0 # 1 for full run and 0 for incremental run 

In [0]:
order_details_instance = "sales.orders_details" 

In [0]:
if full_run == 0:  #incremental run for updates -- only In progress records will be updated either to Completed or Cancelled
    results_df = spark.read.format("delta").table(order_details_instance).filter("order_status = 'In Progress'")

In [0]:
if full_run == 1:
    max_row_id = orders_df.agg({"row_id": "max"}).collect()[0][0]
    #orders_df.select(max("row_id")).collect()[0][0]
    print("max row id : ",max_row_id)

    max_order_id = orders_df.agg({"order_id": "max"}).collect()[0][0]
    #orders_df.select(max("row_id")).collect()[0][0]
    print("max order id : ",max_order_id)

else:
    max_row_id = spark.sql("SELECT MAX(row_id) FROM sales.orders_details").collect()[0][0]
    #orders_df.select(max("row_id")).collect()[0][0]
    print("max row id : ",max_row_id)

    max_order_id = spark.sql("SELECT MAX(order_id) FROM sales.orders_details").collect()[0][0]
    #orders_df.select(max("row_id")).collect()[0][0]
    print("max order id : ",max_order_id)

max row id :  8412
max order id :  59983


In [0]:
schema = StructType([
    StructField("row_id", IntegerType(), True),  
    StructField("customer_id", StringType(), True), 
    StructField("order_id", IntegerType(), True), 
    StructField("order_date", StringType(), True), 
    StructField("ship_date", StringType(), True), 
    StructField("order_priority", StringType(), True), 
    StructField("order_quantity", IntegerType(), True) ,
    StructField("order_status", StringType(), True) 
])
new_orders_data = [
    (8409,"07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd7", 59981,    '2024-05-03',  None, "Low", 20,None),
    (8410,"07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd8", 59982,    '2024-05-03',  None, "Low", 20,None),
    (8411,"07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9", 59983,    '2024-05-03',  '2024-05-04', "Low", 20,None),
    (8412,"07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9", 59983,    '2024-05-03',  '2024-05-04', "Low", 20,None)
]

# Create DataFrame for new order data
new_orders_df = spark.createDataFrame(new_orders_data,schema).withColumn("order_date", to_date("order_date")).withColumn("ship_date", to_date("ship_date"))

In [0]:
new_orders_df.show(truncate=False)

+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|row_id|customer_id                                                     |order_id|order_date|ship_date |order_priority|order_quantity|order_status|
+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|8409  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd7|59981   |2024-05-03|null      |Low           |20            |null        |
|8410  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd8|59982   |2024-05-03|null      |Low           |20            |null        |
|8411  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low           |20            |null        |
|8412  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low     

In [0]:
if full_run == 1:
    insert_records = new_orders_df.join(orders_df, ['row_id'], 'left_anti')
else:
    insert_records = new_orders_df.join(results_df, ['row_id'], 'left_anti')

In [0]:
insert_records.show(truncate=False)

+------+-----------+--------+----------+---------+--------------+--------------+------------+
|row_id|customer_id|order_id|order_date|ship_date|order_priority|order_quantity|order_status|
+------+-----------+--------+----------+---------+--------------+--------------+------------+
+------+-----------+--------+----------+---------+--------------+--------------+------------+



In [0]:
windowSpec = Window.orderBy(lit(1))
windowSpec1 = Window.orderBy("customer_id")

In [0]:
insert_records = insert_records.withColumn("row_id", max_row_id + row_number().over(windowSpec)) \
    .withColumn("customer_id_prev", lag("customer_id").over(windowSpec)) \
    .withColumn("order_date", current_date()-1) \
    .withColumn("ship_date", lit(None)) \
    .withColumn("order_status", lit("In Progress"))

In [0]:
insert_records = insert_records.withColumn("order_id_inc", when((col("customer_id_prev").isNull()) | (insert_records["customer_id"] != insert_records["customer_id_prev"]), 1).otherwise(0)) \
    .withColumn("order_id_offset", sum("order_id_inc").over(windowSpec1)) \
    .withColumn("order_id", max_order_id + col("order_id_offset")) 

insert_records = insert_records.select('row_id','customer_id', 'order_id', 'order_date', 'ship_date', 'order_priority', 'order_quantity','order_status')

In [0]:
insert_records.show(truncate=False)

+------+-----------+--------+----------+---------+--------------+--------------+------------+
|row_id|customer_id|order_id|order_date|ship_date|order_priority|order_quantity|order_status|
+------+-----------+--------+----------+---------+--------------+--------------+------------+
+------+-----------+--------+----------+---------+--------------+--------------+------------+



In [0]:
if full_run == 1:
    update_records = new_orders_df.join(orders_df, ['row_id'], 'inner').filter(orders_df["order_status"] == "In Progress").select(new_orders_df["row_id"],orders_df["customer_id"],orders_df["order_id"],new_orders_df["order_date"],new_orders_df["ship_date"],orders_df["order_priority"],orders_df["order_quantity"],orders_df["order_status"])
else:
    update_records = new_orders_df.join(results_df, ['row_id'], 'inner').select(new_orders_df["row_id"],results_df["customer_id"],results_df["order_id"],new_orders_df["order_date"],new_orders_df["ship_date"],results_df["order_priority"],results_df["order_quantity"],results_df["order_status"])

In [0]:
update_records.show(truncate=False)

+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|row_id|customer_id                                                     |order_id|order_date|ship_date |order_priority|order_quantity|order_status|
+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|8409  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd7|59981   |2024-05-03|null      |Low           |20            |In Progress |
|8410  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd8|59982   |2024-05-03|null      |Low           |20            |In Progress |
|8411  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low           |20            |In Progress |
|8412  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low     

In [0]:
update_records = update_records.withColumn("order_status",
                                            when((update_records["ship_date"].isNull()) & (update_records["order_date"] < current_date() - 4), "Cancelled") \
                                            .when((update_records["ship_date"].isNull()) &  ((update_records["order_date"] == current_date()) | (update_records["order_date"] >= current_date() - 4)), "In Progress") \
                                            .otherwise("Completed")
                                            )

In [0]:
update_records.show(truncate=False)

+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|row_id|customer_id                                                     |order_id|order_date|ship_date |order_priority|order_quantity|order_status|
+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|8409  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd7|59981   |2024-05-03|null      |Low           |20            |In Progress |
|8410  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd8|59982   |2024-05-03|null      |Low           |20            |In Progress |
|8411  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low           |20            |Completed   |
|8412  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low     

In [0]:
if full_run == 0:
    results_df = results_df.join(update_records, on="row_id", how="left_anti") \
        .unionByName(insert_records) \
        .unionByName(update_records)
    results_df.count()
    results_df.createOrReplaceTempView("results_temp_vw")

In [0]:
if full_run == 1:
    orders_df = orders_df.join(update_records, on="row_id", how="left_anti") \
        .unionByName(insert_records) \
        .unionByName(update_records)
    orders_df.count()
else:
    spark.sql("""
        MERGE INTO sales.orders_details
        USING results_temp_vw
        ON sales.orders_details.row_id = results_temp_vw.row_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

In [0]:
if full_run == 1:
    orders_df.createOrReplaceTempView('orders_temp_vw')

    create_db_sql = """
        CREATE DATABASE IF NOT EXISTS sales
    """
    
    create_table_sql = """
        CREATE OR REPLACE TABLE sales.orders_details
        (
            row_id INT,
            customer_id STRING,
            order_id INT,
            order_date DATE,
            ship_date DATE,
            order_priority STRING,
            order_quantity INT,
            order_status STRING
        )
        USING DELTA
        LOCATION '/FileStore/tables/delta-table-merge/orders_details'
    """

In [0]:
if full_run == 1:
    spark.sql(create_db_sql)
    spark.sql(create_table_sql)

In [0]:
'''
%sql
select count(*) from sales.orders_details'''
spark.sql('''
          select count(*) from sales.orders_details
          ''').show(truncate=False)

+--------+
|count(1)|
+--------+
|8412    |
+--------+



#Incremental Run for updating In progress records or inserting new records

In [0]:
if full_run == 1:
    # Truncate Delta table if full_run is 1
    spark.sql("TRUNCATE TABLE sales.orders_details")
    print("sales.orders_details table truncated.")
else:
    # Do nothing if full_run is not 1
    print("Skipping truncation. full_run is not set to 1.")

Skipping truncation. full_run is not set to 1.


In [0]:
if full_run == 1:
    # One time full load if full_run is 1
    spark.sql("INSERT INTO sales.orders_details SELECT * FROM orders_temp_vw")
    print("one time load successful.")
else:
    # Do nothing if full_run is not 1
    print("Skipping one time load. full_run is not set to 1.")

Skipping one time load. full_run is not set to 1.


In [0]:
spark.sql('''
          select * from sales.orders_details limit 5 
          ''').show(truncate=False)

+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|row_id|customer_id                                                     |order_id|order_date|ship_date |order_priority|order_quantity|order_status|
+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|5409  |03de3b10f1d1797fa42d5372db545dadd5aa777b823885979bdd09fa20e9660d|38466   |2011-08-17|2011-08-18|Critical      |22            |Completed   |
|7747  |0fb77b87360fcb496a32cb2673d291c1a83fd88c48d968a4b79d55a169e7144b|55459   |2009-04-25|2009-04-27|Critical      |5             |Completed   |
|29    |136d2dcc58235d6f3fecb6222bf3ae4c9bb00c635db1c1370edf2e86a903924e|194     |2012-04-04|2012-04-06|Medium        |6             |Completed   |
|2927  |1f07e41c5a310fda2c158f4a6760f4b7732a0f24c20db3466a661802b9b07bf9|21220   |2012-12-22|2012-12-25|Not Spec

In [0]:

spark.sql('''
          select * from sales.orders_details where customer_id= '07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9'
          ''').show(truncate=False)

+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|row_id|customer_id                                                     |order_id|order_date|ship_date |order_priority|order_quantity|order_status|
+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+
|8411  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low           |20            |Completed   |
|8412  |07de8c2693a047311aac3bb2e1a7bf301d9e290b04431926048d92a40e3bbfd9|59983   |2024-05-03|2024-05-04|Low           |20            |Completed   |
+------+----------------------------------------------------------------+--------+----------+----------+--------------+--------------+------------+

