In [0]:
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, StringType, LongType, DecimalType

data = [(1,"sku-1001", "wireless mouse", "Electronics", 2, Decimal("799.00")),
        (2,"sku-1002", "wireless mouse", "Electronics", 20, Decimal("899.00")),
        (3,"sku-1003", "wireless mouse", "Electronics", 21, Decimal("999.00")),
        (4,"sku-1004", "wireless mouse", "Electronics", 22, Decimal("1099.00")),
        (5,"sku-1005", "wireless mouse", "Electronics", 23, Decimal("1199.00"))]


orders_schema = StructType([StructField("order_id", LongType(), False),
                            StructField("product_id", StringType(), True),
                            StructField("product_name", StringType(), True),
                            StructField("category", StringType(), True),
                            StructField("quantity", LongType(), True),
                            StructField("price", DecimalType(10,2), True)])

orders_df = spark.createDataFrame(data,schema=orders_schema)
display(orders_df)

In [0]:
display(dbutils.fs.ls("/Volumes/workspace/default/orders_volume"))
display(dbutils.fs.mkdirs("/Volumes/workspace/default/orders_volume/source_data"))

In [0]:
vol_path = "/Volumes/workspace/default/orders_volume/source_data"

orders_df.write.mode("overwrite").format("delta").save(vol_path)
display(dbutils.fs.ls(vol_path))

In [0]:
%sql
desc history delta.`/Volumes/workspace/default/orders_volume/source_data`

In [0]:
%sql
create or replace Temp view incoming_batch as 
select * from values 
 (2,"sku-1002", "wireless headphones-updated", "Electronics", 20, 899.00),
  (5,"sku-1003", "TV- changed", "Electronics", 21, 999.00),
  (6,"sku-1006", "wireless mouse", "Electronics", 23, 2099.00)
  As t(order_id,product_id,product_name,category,quantity,price)


In [0]:
%sql
MERGE INTO DELTA.`/Volumes/workspace/default/orders_volume/source_data` as target
using incoming_batch as source
on target.order_id = source.order_id
when matched then 
update set 
target.product_id = source.product_id,
target.product_name = source.product_name,
target.category = source.category,
target.quantity = source.quantity,
target.price = source.price
when not matched then insert *

In [0]:
%sql
select * from delta.`/Volumes/workspace/default/orders_volume/source_data`

In [0]:
%sql
desc history delta.`/Volumes/workspace/default/orders_volume/source_data`

In [0]:
display(dbutils.fs.ls('/Volumes/workspace/default/orders_volume/source_data/_delta_log'))

In [0]:
%sql
select * from JSON.`dbfs:/Volumes/workspace/default/orders_volume/source_data/_delta_log/00000000000000000002.json`

In [0]:
%sql
select distinct _metadata.file_path from delta.`/Volumes/workspace/default/orders_volume/source_data`
    


In [0]:
%sql
select * from delta.`/Volumes/workspace/default/orders_volume/source_data`@v0


In [0]:
%sql
create or replace temp view incoming_flag as 
select * from values 
 (2,"sku-100222", "Apple laptop-updated","Electronics", 200,800.00 ,False),
 (3,"sku-1003", "wireless mouse", "Electronics",21, 999.00,True)
 as t(order_id,product_id,product_name,category,quantity, price, is_deleted)



In [0]:
%sql
MERGE INTO DELTA.`/Volumes/workspace/default/orders_volume/source_data` as target
using incoming_flag as source
on target.order_id = source.order_id
when matched and source.is_deleted = True then delete
when matched and source.is_deleted = False then 
update set 
target.product_id = source.product_id,
target.product_name = source.product_name,
target.category = source.category,
target.quantity = source.quantity,
target.price = source.price
when not matched
then INSERT *

In [0]:
%sql
select * from delta.`/Volumes/workspace/default/orders_volume/source_data`