# **Scenario 1.**

A retail company receives daily updates to its product catalog, including new items, price changes, and discontinued products. Instead of replacing the entire catalog or simply adding new records, they need to upsert the incoming data. This means updating existing products with the latest information and inserting new ones so the catalog stays accurate and up to date in real time.

### **Querying Source**


In [0]:
%sql
SELECT * FROM pyspark_catalog.source.products


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *


In [0]:
# df = spark.read.table("pyspark_catalog.source.products")

# or
# Real world use 

df = spark.sql("select * from pyspark_catalog.source.products")

#Dedup - When you duplicate data, find latest one and ignore old data

df = df.withColumn("dedup", row_number().over(Window.partitionBy("id").orderBy(desc('updatedDate'))))

df = df.filter(col("dedup") == 1).drop('dedup')
display(df)

## **UPSERTS**

In [0]:
# Creating Delta Object

from delta.tables import DeltaTable

if len(dbutils.fs.ls("/Volumes/pyspark_catalog/source/db_volume/products_sink")) > 0:
    # Incremental Load
    dlt_obj = DeltaTable.forPath(spark, "/Volumes/pyspark_catalog/source/db_volume/products_sink/")

    # Apply merge condition

    dlt_obj.alias("trg").merge(
        df.alias("src"),
        "src.id = trg.id")\
            .whenMatchedUpdateAll(condition="src.updatedDate >= trg.updatedDate")\
            .whenNotMatchedInsertAll()\
            .execute()
    print("Upserting")        
else:
    # Initial Load

    print("Inserting")
    df.write.format("delta")\
        .mode("Overwrite")\
        .save("/Volumes/pyspark_catalog/source/db_volume/products_sink/")                

In [0]:
%sql
SELECT * FROM delta. `/Volumes/pyspark_catalog/source/db_volume/products_sink/`