##### Demonstrate ACID transactions in Delta Lake by creating a Delta table and performing operations like Insert, Update, Delete, and Merge.

##### Delta Lake ensures:

##### Atomicity

##### Consistency

##### Isolation

##### Durability

In [0]:
### Create sample DataFrame

In [0]:
data = [
    (1,'sonu', 24),
    (2,'rohit',25),
    (3,'soni',21)
]
schema =('id','name','age')
df = spark.createDataFrame(data = data , schema=schema)
df.display()

id,name,age
1,sonu,24
2,rohit,25
3,soni,21


In [0]:
### dataframe to delta table

In [0]:
df.write.format('delta')\
    .mode("overwrite")\
        .save('/FileStore/tables/acid_demo')

In [0]:
### read as delta table and display

In [0]:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark,'/FileStore/tables/acid_demo')
delta_table.toDF().display()

id,name,age
2,rohit,25
1,sonu,24
3,soni,21


### ACID Transactions

In [0]:
### update a record

In [0]:
delta_table.update(
    condition = "id = 1",
    set = { "age": "32" }
)


In [0]:
delta_table.toDF().display()

id,name,age
2,rohit,25
3,soni,21
1,sonu,32


In [0]:
### Delete a record

In [0]:
delta_table.delete("id = 2")

In [0]:
delta_table.toDF().display()

id,name,age
3,soni,21
1,sonu,32


In [0]:
### insert new data
from pyspark.sql import Row


In [0]:
new_Data = spark.createDataFrame([Row(id=4,name='sony', age=26)])

In [0]:
new_Data.write.format('delta').mode('append').save('/FileStore/tables/acid_demo')

In [0]:
delta_table.toDF().display()

id,name,age
3,soni,21
1,sonu,32
4,sony,26


In [0]:
### merge (upsert operation)

In [0]:
incoming_data = spark.createDataFrame(
    [
        Row(id=3,name='soni_gi',age=26),
        # existing -> update
        Row(id =5, name='evil',age =29)
        # new-> insert
    ]
)

In [0]:
delta_table.alias("target").merge(
    source=incoming_data.alias("source"),
    condition="target.id = source.id"
).whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
        .execute()

In [0]:
delta_table.toDF().display()

id,name,age
3,soni_gi,26
1,sonu,32
5,evil,29
4,sony,26


In [0]:
#verify ACID with time travel

In [0]:
spark.read.format('delta').option('versionAsOf',0).load('/FileStore/tables/acid_demo').display()

id,name,age
2,rohit,25
1,sonu,24
3,soni,21


### Z-Ordering

##### Z-Ordering in Delta Lake co-locates related data in the same set of files based on the values of specified columns. This helps minimize data scanned during queries and improves performance.



In [0]:
dbutils.fs.ls('/FileStore/tables')

Out[32]: [FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales-1.csv', name='BigMart_Sales-1.csv', size=869537, modificationTime=1732766283000),
 FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales-2.csv', name='BigMart_Sales-2.csv', size=869537, modificationTime=1738049024000),
 FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales-3.csv', name='BigMart_Sales-3.csv', size=869537, modificationTime=1739937933000),
 FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales.csv', name='BigMart_Sales.csv', size=869537, modificationTime=1732677345000),
 FileInfo(path='dbfs:/FileStore/tables/CSV/', name='CSV/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/Calendar-1.csv', name='Calendar-1.csv', size=9952, modificationTime=1731775279000),
 FileInfo(path='dbfs:/FileStore/tables/Calendar.csv', name='Calendar.csv', size=9952, modificationTime=1731775248000),
 FileInfo(path='dbfs:/FileStore/tables/Products.csv', name='Products.csv', size=58122, modificationTime=1732765779000),
 File

In [0]:
df1 = spark.read.format('csv').option('header',True).option('inferSchema',True)\
    .load('dbfs:/FileStore/tables/BigMart_Sales.csv')

In [0]:
df1.write.format('delta')\
    .mode('overwrite')\
        .save('dbfs:/FileStore/tables/bigmart_delta_sd')

In [0]:
spark.sql("""
          OPTIMIZE delta. `dbfs:/FileStore/tables/bigmart_delta_sd`
          ZORDER BY (Item_MRP)
          
          """)

Out[35]: DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:b

In [0]:
df = spark.read.format("delta").load("dbfs:/FileStore/tables/bigmart_delta_sd")

# Filter query that benefits from Z-Ordering
df.filter("Item_MRP > 200").groupBy("Item_Type").count().show()


+--------------------+-----+
|           Item_Type|count|
+--------------------+-----+
|       Starchy Foods|   38|
|        Baking Goods|   64|
|              Breads|   53|
|Fruits and Vegeta...|  217|
|                Meat|   56|
|         Hard Drinks|   25|
|         Soft Drinks|   39|
|           Household|  177|
|           Breakfast|   15|
|               Dairy|  181|
|         Snack Foods|  216|
|              Others|   19|
|             Seafood|    4|
|              Canned|  111|
|        Frozen Foods|  166|
|  Health and Hygiene|   59|
+--------------------+-----+



In [0]:
%sql
set spark.databricks.delta.retentionDurationCheck.enabled = false;

key,value
spark.databricks.delta.retentionDurationCheck.enabled,False


In [0]:
spark.sql("VACUUM delta.`dbfs:/FileStore/tables/bigmart_delta_sd` RETAIN 0 HOURS")
### Note: Vacuum = no time trave
### you don't need to access older versions (time travel)

Out[44]: DataFrame[path: string]