## # DELTA LAKE 

1. Stores data efficiently on cloud storage.
2. Uses partitioning (partitionBy) to speed up queries.
3. Leverages Delta Lake features like:
- ACID transactions
- Time Travel
- Schema evolution
- Updates &amp; deletes
- Optimize / Z-Ordering

**Question -1 :
Ingest sample order data into a Spark DataFrame**

In [0]:
# Ingest sample order data into a Spark DataFrame.
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Defining schema to ensure correct data types
schema = StructType([
    StructField("order_id", StringType()),
    StructField("order_timestamp", StringType()),
    StructField("customer_id", StringType()),
    StructField("country", StringType()),
    StructField("amount", DoubleType()),
    StructField("currency", StringType()),
    StructField("status", StringType())
])

# Creating sample e-commerce order records
# These represent daily incoming data 
data = [
    ("O1","2025-01-01 12:05:00","C100","US",120.5,"USD","CREATED"),
    ("O2","2025-01-01 12:15:00","C101","IN",999.0,"INR","PAID"),
    ("O3","2025-01-02 08:30:00","C102","US",55.0,"USD","CANCELLED"),
    ("O4","2025-01-02 09:45:00","C103","UK",300.0,"GBP","PAID"),
    ("O5","2025-01-02 10:10:00","C104","FR",450.0,"EUR","PAID"),
    ("O6","2025-01-02 11:20:00","C105","DE",210.0,"EUR","CREATED"),
    ("O7","2025-01-02 12:40:00","C106","IN",850.0,"INR","PAID"),
    ("O8","2025-01-02 14:05:00","C107","US",129.0,"USD","CREATED"),
    ("O9","2025-01-03 09:00:00","C108","AU",300.0,"AUD","PAID"),
    ("O10","2025-01-03 09:30:00","C109","BR",175.0,"BRL","PAID"),
    ("O11","2025-01-03 10:10:00","C110","IN",1499.0,"INR","PAID"),
    ("O12","2025-01-03 11:55:00","C111","US",310.0,"USD","CREATED"),
    ("O13","2025-01-03 13:15:00","C112","UK",95.0,"GBP","CANCELLED"),
    ("O14","2025-01-03 14:50:00","C113","FR",210.0,"EUR","PAID"),
    ("O15","2025-01-03 15:25:00","C114","DE",330.0,"EUR","PAID"),
    ("O16","2025-01-03 16:40:00","C115","IN",999.0,"INR","CREATED"),
    ("O17","2025-01-04 08:05:00","C116","US",450.0,"USD","PAID"),
    ("O18","2025-01-04 09:30:00","C117","AU",255.0,"AUD","CREATED"),
    ("O19","2025-01-04 10:10:00","C118","BR",320.0,"BRL","PAID"),
    ("O20","2025-01-04 11:45:00","C119","SG",410.0,"SGD","PAID"),
    ("O21","2025-01-04 13:20:00","C120","IN",120.0,"INR","CANCELLED"),
    ("O22","2025-01-04 14:55:00","C121","US",620.0,"USD","PAID"),
    ("O23","2025-01-04 15:40:00","C122","UK",275.0,"GBP","CREATED"),
    ("O24","2025-01-04 17:10:00","C123","FR",340.0,"EUR","PAID"),
    ("O25","2025-01-05 08:45:00","C124","DE",510.0,"EUR","PAID"),
    ("O26","2025-01-05 09:50:00","C125","IN",249.0,"INR","CREATED"),
    ("O27","2025-01-05 11:22:00","C126","US",175.0,"USD","PAID"),
    ("O28","2025-01-05 12:40:00","C127","AU",245.0,"AUD","CANCELLED"),
    ("O29","2025-01-05 13:50:00","C128","BR",340.0,"BRL","PAID"),
    ("O30","2025-01-05 14:20:00","C129","SG",305.0,"SGD","PAID"),
    ("O31","2025-01-05 16:05:00","C130","IN",450.0,"INR","PAID"),
    ("O32","2025-01-05 17:30:00","C131","US",510.0,"USD","CREATED"),
    ("O33","2025-01-06 08:00:00","C132","UK",275.0,"GBP","PAID"),
    ("O34","2025-01-06 09:18:00","C133","FR",330.0,"EUR","PAID"),
    ("O35","2025-01-06 11:40:00","C134","DE",120.0,"EUR","CREATED"),
    ("O36","2025-01-06 12:55:00","C135","IN",850.0,"INR","PAID"),
    ("O37","2025-01-06 14:10:00","C136","US",390.0,"USD","PAID"),
    ("O38","2025-01-06 15:45:00","C137","AU",210.0,"AUD","CREATED"),
    ("O39","2025-01-06 17:00:00","C138","BR",470.0,"BRL","PAID"),
    ("O40","2025-01-06 18:15:00","C139","SG",520.0,"SGD","PAID")
]



# Creating Spark DataFrame and converting timestamp column into actual timestamp
df = spark.createDataFrame(data, schema) \
            .withColumn("order_timestamp", to_timestamp("order_timestamp"))
# Display initial data
df.show()

+--------+-------------------+-----------+-------+------+--------+---------+
|order_id|    order_timestamp|customer_id|country|amount|currency|   status|
+--------+-------------------+-----------+-------+------+--------+---------+
|      O1|2025-01-01 12:05:00|       C100|     US| 120.5|     USD|  CREATED|
|      O2|2025-01-01 12:15:00|       C101|     IN| 999.0|     INR|     PAID|
|      O3|2025-01-02 08:30:00|       C102|     US|  55.0|     USD|CANCELLED|
|      O4|2025-01-02 09:45:00|       C103|     UK| 300.0|     GBP|     PAID|
|      O5|2025-01-02 10:10:00|       C104|     FR| 450.0|     EUR|     PAID|
|      O6|2025-01-02 11:20:00|       C105|     DE| 210.0|     EUR|  CREATED|
|      O7|2025-01-02 12:40:00|       C106|     IN| 850.0|     INR|     PAID|
|      O8|2025-01-02 14:05:00|       C107|     US| 129.0|     USD|  CREATED|
|      O9|2025-01-03 09:00:00|       C108|     AU| 300.0|     AUD|     PAID|
|     O10|2025-01-03 09:30:00|       C109|     BR| 175.0|     BRL|     PAID|

**Question-2 : Add a derived column 'order_date' from 'order_timestamp'.**

In [0]:
# Add a derived column 'order_date' from 'order_timestamp'.
# Extracting only the date part from timestamp
df = df.withColumn("order_date", to_date("order_timestamp"))

# Show updated DataFrame with new column
df.show()


+--------+-------------------+-----------+-------+------+--------+---------+----------+
|order_id|    order_timestamp|customer_id|country|amount|currency|   status|order_date|
+--------+-------------------+-----------+-------+------+--------+---------+----------+
|      O1|2025-01-01 12:05:00|       C100|     US| 120.5|     USD|  CREATED|2025-01-01|
|      O2|2025-01-01 12:15:00|       C101|     IN| 999.0|     INR|     PAID|2025-01-01|
|      O3|2025-01-02 08:30:00|       C102|     US|  55.0|     USD|CANCELLED|2025-01-02|
|      O4|2025-01-02 09:45:00|       C103|     UK| 300.0|     GBP|     PAID|2025-01-02|
|      O5|2025-01-02 10:10:00|       C104|     FR| 450.0|     EUR|     PAID|2025-01-02|
|      O6|2025-01-02 11:20:00|       C105|     DE| 210.0|     EUR|  CREATED|2025-01-02|
|      O7|2025-01-02 12:40:00|       C106|     IN| 850.0|     INR|     PAID|2025-01-02|
|      O8|2025-01-02 14:05:00|       C107|     US| 129.0|     USD|  CREATED|2025-01-02|
|      O9|2025-01-03 09:00:00|  

**Question-3 : Write DataFrame as Delta table partitioned by country, order_date**

In [0]:

# Write the DataFrame as a Delta table partitioned by country and order_date

path = "/mnt/delta/order"

# Saving as a Delta table with partitioning for faster read/query
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("country", "order_date") \
    .save(path)


**Question-4 : Verify partition structure**

In [0]:
#Verifying and Listing the Partition folders created by delta lake
# first level
display(dbutils.fs.ls(path))  
# inside: order_date=YYYY-MM-DD          
display(dbutils.fs.ls(path + '/country=US'))   

path,name,size,modificationTime
dbfs:/mnt/delta/order/_delta_log/,_delta_log/,0,1764603412000
dbfs:/mnt/delta/order/country=AU/,country=AU/,0,1764603602000
dbfs:/mnt/delta/order/country=BR/,country=BR/,0,1764603602000
dbfs:/mnt/delta/order/country=DE/,country=DE/,0,1764603602000
dbfs:/mnt/delta/order/country=FR/,country=FR/,0,1764603603000
dbfs:/mnt/delta/order/country=IN/,country=IN/,0,1764603413000
dbfs:/mnt/delta/order/country=SG/,country=SG/,0,1764603602000
dbfs:/mnt/delta/order/country=UK/,country=UK/,0,1764603413000
dbfs:/mnt/delta/order/country=US/,country=US/,0,1764603413000


path,name,size,modificationTime
dbfs:/mnt/delta/order/country=US/order_date=2025-01-01/,order_date=2025-01-01/,0,1764603413000
dbfs:/mnt/delta/order/country=US/order_date=2025-01-02/,order_date=2025-01-02/,0,1764603413000
dbfs:/mnt/delta/order/country=US/order_date=2025-01-03/,order_date=2025-01-03/,0,1764603490000
dbfs:/mnt/delta/order/country=US/order_date=2025-01-04/,order_date=2025-01-04/,0,1764603602000
dbfs:/mnt/delta/order/country=US/order_date=2025-01-05/,order_date=2025-01-05/,0,1764603602000
dbfs:/mnt/delta/order/country=US/order_date=2025-01-06/,order_date=2025-01-06/,0,1764603602000


**Question-5 : Demonstrate Partition Pruning**

In [0]:
# Run queries that demonstrate partition pruning.
#Querying only by country(partition column)
df_country = (
    spark.read.format("delta")
    .load(path)
    .where("country = 'IN'")     
)
df_country.show()
#Querying by country and order date
df_country_date = (
    spark.read.format("delta")
    .load(path)
    .where("country = 'IN' AND order_date = '2025-11-30'")
)

df_country_date.show()

+--------+-------------------+-----------+-------+------+--------+---------+----------+--------------+-----------+
|order_id|    order_timestamp|customer_id|country|amount|currency|   status|order_date|payment_method|coupon_code|
+--------+-------------------+-----------+-------+------+--------+---------+----------+--------------+-----------+
|     O11|2025-01-03 10:10:00|       C110|     IN|1499.0|     INR|     PAID|2025-01-03|          NULL|       NULL|
|     O16|2025-01-03 16:40:00|       C115|     IN| 999.0|     INR|  CREATED|2025-01-03|          NULL|       NULL|
|     O26|2025-01-05 09:50:00|       C125|     IN| 249.0|     INR|  CREATED|2025-01-05|          NULL|       NULL|
|     O31|2025-01-05 16:05:00|       C130|     IN| 450.0|     INR|     PAID|2025-01-05|          NULL|       NULL|
|     O21|2025-01-04 13:20:00|       C120|     IN| 120.0|     INR|CANCELLED|2025-01-04|          NULL|       NULL|
|     O36|2025-01-06 12:55:00|       C135|     IN| 850.0|     INR|     PAID|2025

In [0]:
from delta.tables import DeltaTable
dtable = DeltaTable.forPath(spark, path)
dtable.history().show()


+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      6|2025-12-01 15:49:33|144992283393007|spamz121924@gmail...|    WRITE|{mode -> Overwrit...|NULL|{1923865583601623}|1201-135708-b9lw1...|          5|WriteSerializable|        false|{numFiles -> 35, ...|        NULL|Databricks-Runtim...|
|      5|2025-12-01 15:40:10|144

**Question-6 : Demonstrate Delta Lake Time Travel**

In [0]:
#Write data, update some rows, then query older versions.
from delta.tables import DeltaTable

# Load Delta table as DeltaTable object to perform operations
dtable = DeltaTable.forPath(spark, path)
#Updating the status of all records with status as 'CREATED' to 'PAID'
dtable.update(
    condition="status = 'CREATED'",
    set={"status": "'PAID'"}
)
spark.read.format('delta').load(path).show(5)

# Read old version of the table
df_old = spark.read.format("delta").option("versionAsOf", 0).load(path)
df_old.show()


+--------+-------------------+-----------+-------+------+--------+------+----------+--------------+-----------+
|order_id|    order_timestamp|customer_id|country|amount|currency|status|order_date|payment_method|coupon_code|
+--------+-------------------+-----------+-------+------+--------+------+----------+--------------+-----------+
|     O18|2025-01-04 09:30:00|       C117|     AU| 255.0|     AUD|  PAID|2025-01-04|          NULL|       NULL|
|     O16|2025-01-03 16:40:00|       C115|     IN| 999.0|     INR|  PAID|2025-01-03|          NULL|       NULL|
|     O23|2025-01-04 15:40:00|       C122|     UK| 275.0|     GBP|  PAID|2025-01-04|          NULL|       NULL|
|     O17|2025-01-04 08:05:00|       C116|     US| 450.0|     USD|  PAID|2025-01-04|          NULL|       NULL|
|     O22|2025-01-04 14:55:00|       C121|     US| 620.0|     USD|  PAID|2025-01-04|          NULL|       NULL|
+--------+-------------------+-----------+-------+------+--------+------+----------+--------------+-----

**Question-7 : Demonstrate Schema Evolution**

In [0]:

# QUESTION 7:
# Demonstrate Schema Evolution by adding new columns.

# New data with additional fields
data_new = [
    ("O5","2025-01-03 10:00:00","C200","IN",1500.0,"INR","PAID","UPI","NEWYEAR"),
    ("O6","2025-01-03 11:30:00","C201","US",250.0,"USD","CREATED","CARD",None)
]

# Updated schema for new columns
schema_new = StructType([
    StructField("order_id", StringType()),
    StructField("order_timestamp", StringType()),
    StructField("customer_id", StringType()),
    StructField("country", StringType()),
    StructField("amount", DoubleType()),
    StructField("currency", StringType()),
    StructField("status", StringType()),
    StructField("payment_method", StringType()),
    StructField("coupon_code", StringType())
])

# Creating new DataFrame
df_new = spark.createDataFrame(data_new, schema_new) \
                .withColumn("order_timestamp", to_timestamp("order_timestamp")) \
                .withColumn("order_date", to_date("order_timestamp"))
# Write new data to Delta table with schema evolution
df_new.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .partitionBy("country", "order_date") \
    .save(path)
df_new.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .partitionBy("country","order_date") \
    .save(path)


**Question-8 : Demonstrate Delta Lake UPDATE & DELETE**

In [0]:
# Update: Mark orders above 1000 as CANCELLED
dtable.update("status = 'CREATED'", {"status": "'PENDING_APPROVAL'"})
# Delete: remove very small test orders (< 100)
dtable.delete("amount < 100")

DataFrame[num_affected_rows: bigint]

In [0]:
# Create RAW (unoptimized) Delta table in DBFS

raw_path = "/mnt/delta/orders_raw"

# Write the same DataFrame df to raw_path WITHOUT OPTIMIZE
df.write.format("delta") \
    .mode("overwrite") \
    .save(raw_path)


import time
start_time = time.time()
display(dbutils.fs.ls(raw_path))
end_time = time.time()
print("Time taken for the operation before optimising: ", end_time - start_time)


path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764603882000
dbfs:/mnt/delta/orders_raw/part-00000-2ce36925-6019-4c5c-a6eb-97e4aa8b644f.c000.snappy.parquet,part-00000-2ce36925-6019-4c5c-a6eb-97e4aa8b644f.c000.snappy.parquet,3156,1764603882000
dbfs:/mnt/delta/orders_raw/part-00000-8510fb14-3e61-4fc8-8dee-6ec81c866cb2.c000.snappy.parquet,part-00000-8510fb14-3e61-4fc8-8dee-6ec81c866cb2.c000.snappy.parquet,3156,1764604186000


Time taken for the operation before optimising:  0.3491339683532715


**Question -9 :Optimize the table**


In [0]:
%sql
OPTIMIZE delta.`/mnt/delta/orders_raw`
ZORDER BY (customer_id)

path,metrics
dbfs:/mnt/delta/orders_raw,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 3156), 0, List(0, 0), 0, null), null, 0, 0, 1, 1, false, 0, 0, 1764604187730, 1764604187985, 8, 0, null, List(0, 0), null, 8, 8, 0, 0, null)"


In [0]:
import time
start_time = time.time()
display(dbutils.fs.ls(raw_path))
end_time = time.time()
print("Time taken for the operation after optimising: ", end_time - start_time)

path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764603882000
dbfs:/mnt/delta/orders_raw/part-00000-2ce36925-6019-4c5c-a6eb-97e4aa8b644f.c000.snappy.parquet,part-00000-2ce36925-6019-4c5c-a6eb-97e4aa8b644f.c000.snappy.parquet,3156,1764603882000
dbfs:/mnt/delta/orders_raw/part-00000-8510fb14-3e61-4fc8-8dee-6ec81c866cb2.c000.snappy.parquet,part-00000-8510fb14-3e61-4fc8-8dee-6ec81c866cb2.c000.snappy.parquet,3156,1764604186000


Time taken for the operation after optimising:  0.36116647720336914


**Question-10 : SMALL FILE PROBLEM DEMO**

In [0]:

# Create many small files (for demonstration only)
(df.repartition(50)             
   .write
   .format("delta")
   .mode("overwrite")
   .save("/mnt/delta/small_files_example"))

# Check number of files
display(dbutils.fs.ls("/mnt/delta/small_files_example"))


path,name,size,modificationTime
dbfs:/mnt/delta/small_files_example/_delta_log/,_delta_log/,0,1764604012000
dbfs:/mnt/delta/small_files_example/part-00000-3e2e0181-8e04-4cb5-9f4d-039bb4404fb7.c000.snappy.parquet,part-00000-3e2e0181-8e04-4cb5-9f4d-039bb4404fb7.c000.snappy.parquet,2044,1764604012000
dbfs:/mnt/delta/small_files_example/part-00000-98773895-7294-47a6-8d58-425cb89b78b4.c000.snappy.parquet,part-00000-98773895-7294-47a6-8d58-425cb89b78b4.c000.snappy.parquet,2044,1764604189000
dbfs:/mnt/delta/small_files_example/part-00001-8591b786-751e-43f9-89d1-5e3d7699ee8c.c000.snappy.parquet,part-00001-8591b786-751e-43f9-89d1-5e3d7699ee8c.c000.snappy.parquet,2027,1764604012000
dbfs:/mnt/delta/small_files_example/part-00001-bf133418-a9da-4331-b907-66b33a0fcc10.c000.snappy.parquet,part-00001-bf133418-a9da-4331-b907-66b33a0fcc10.c000.snappy.parquet,2027,1764604189000
dbfs:/mnt/delta/small_files_example/part-00002-5a19b88b-203b-4581-bd69-fd1d4fc79192.c000.snappy.parquet,part-00002-5a19b88b-203b-4581-bd69-fd1d4fc79192.c000.snappy.parquet,2027,1764604012000
dbfs:/mnt/delta/small_files_example/part-00002-8882c2bd-1018-47c4-a3bf-72c2c7f2a27c.c000.snappy.parquet,part-00002-8882c2bd-1018-47c4-a3bf-72c2c7f2a27c.c000.snappy.parquet,2027,1764604189000
dbfs:/mnt/delta/small_files_example/part-00004-32ae77dd-68f2-4812-827b-5db4bf7d4512.c000.snappy.parquet,part-00004-32ae77dd-68f2-4812-827b-5db4bf7d4512.c000.snappy.parquet,2106,1764604012000
dbfs:/mnt/delta/small_files_example/part-00004-6118ace7-0d18-4ce1-aa78-f13816f0c70d.c000.snappy.parquet,part-00004-6118ace7-0d18-4ce1-aa78-f13816f0c70d.c000.snappy.parquet,2106,1764604189000
dbfs:/mnt/delta/small_files_example/part-00005-b0067291-59f2-4e3c-b3cf-5c0f794317ac.c000.snappy.parquet,part-00005-b0067291-59f2-4e3c-b3cf-5c0f794317ac.c000.snappy.parquet,2097,1764604189000
