In [0]:
# QUESTION 1:
# Ingest sample order data into a Spark DataFrame.
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Creating sample e-commerce order records
# These represent daily incoming data in JSON/CSV format
data = [
    ("O1","2025-01-01 12:05:00","C100","US",120.5,"USD","CREATED"),
    ("O2","2025-01-01 12:15:00","C101","IN",999.0,"INR","PAID"),
    ("O3","2025-01-02 08:30:00","C102","US",55.0,"USD","CANCELLED"),
    ("O4","2025-01-02 09:45:00","C103","UK",300.0,"GBP","PAID")
]

# Defining schema to ensure correct data types
schema = StructType([
    StructField("order_id", StringType()),
    StructField("order_timestamp", StringType()),
    StructField("customer_id", StringType()),
    StructField("country", StringType()),
    StructField("amount", DoubleType()),
    StructField("currency", StringType()),
    StructField("status", StringType())
])

# Creating Spark DataFrame and converting timestamp column into actual timestamp
df = spark.createDataFrame(data, schema) \
            .withColumn("order_timestamp", to_timestamp("order_timestamp"))
# Display initial data
df.show()


+--------+-------------------+-----------+-------+------+--------+---------+
|order_id|    order_timestamp|customer_id|country|amount|currency|   status|
+--------+-------------------+-----------+-------+------+--------+---------+
|      O1|2025-01-01 12:05:00|       C100|     US| 120.5|     USD|  CREATED|
|      O2|2025-01-01 12:15:00|       C101|     IN| 999.0|     INR|     PAID|
|      O3|2025-01-02 08:30:00|       C102|     US|  55.0|     USD|CANCELLED|
|      O4|2025-01-02 09:45:00|       C103|     UK| 300.0|     GBP|     PAID|
+--------+-------------------+-----------+-------+------+--------+---------+



In [0]:
# QUESTION 2:
# Add a derived column 'order_date' from 'order_timestamp'.

# Extracting only the date part from timestamp
df = df.withColumn("order_date", to_date("order_timestamp"))

# Show updated DataFrame with new column
df.show()


+--------+-------------------+-----------+-------+------+--------+---------+----------+
|order_id|    order_timestamp|customer_id|country|amount|currency|   status|order_date|
+--------+-------------------+-----------+-------+------+--------+---------+----------+
|      O1|2025-01-01 12:05:00|       C100|     US| 120.5|     USD|  CREATED|2025-01-01|
|      O2|2025-01-01 12:15:00|       C101|     IN| 999.0|     INR|     PAID|2025-01-01|
|      O3|2025-01-02 08:30:00|       C102|     US|  55.0|     USD|CANCELLED|2025-01-02|
|      O4|2025-01-02 09:45:00|       C103|     UK| 300.0|     GBP|     PAID|2025-01-02|
+--------+-------------------+-----------+-------+------+--------+---------+----------+



In [0]:
# QUESTION 3:
# Write the DataFrame as a Delta table partitioned by country and order_date

delta_path = "/mnt/delta/orders"

# Saving as a Delta table with partitioning for faster read/query
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("country", "order_date") \
    .save(delta_path)


In [0]:
# QUESTION 4:
# Verify the partition structure of the Delta table.

# Listing partition folders created by Delta Lake

display(dbutils.fs.ls(delta_path))


path,name,size,modificationTime
dbfs:/mnt/delta/orders/_delta_log/,_delta_log/,0,1764526981000
dbfs:/mnt/delta/orders/country=IN/,country=IN/,0,1764526985000
dbfs:/mnt/delta/orders/country=UK/,country=UK/,0,1764526985000
dbfs:/mnt/delta/orders/country=US/,country=US/,0,1764526985000


In [0]:
# QUESTION 5:
# Run queries that demonstrate partition pruning.

# Reading the Delta table
df_delta = spark.read.format("delta").load(delta_path)
# Query 1: Filter by country (uses partition pruning)
df_delta.filter("country = 'US'").show()


# Query 2: Filter by country and order_date (narrower pruning)
df_delta.filter("country='US' AND order_date='2025-01-01'").show()


+--------+-------------------+-----------+-------+------+--------+----------------+----------+--------------+-----------+
|order_id|    order_timestamp|customer_id|country|amount|currency|          status|order_date|payment_method|coupon_code|
+--------+-------------------+-----------+-------+------+--------+----------------+----------+--------------+-----------+
|      O6|2025-01-03 11:30:00|       C201|     US| 250.0|     USD|PENDING_APPROVAL|2025-01-03|          NULL|       NULL|
|      O1|2025-01-01 12:05:00|       C100|     US| 120.5|     USD|            PAID|2025-01-01|          NULL|       NULL|
+--------+-------------------+-----------+-------+------+--------+----------------+----------+--------------+-----------+

+--------+-------------------+-----------+-------+------+--------+------+----------+--------------+-----------+
|order_id|    order_timestamp|customer_id|country|amount|currency|status|order_date|payment_method|coupon_code|
+--------+-------------------+-----------+-

In [0]:
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, delta_path)
dt.history().show()


+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      0|2025-11-30 18:23:09|144900505017890|kurapativivek1684...|    WRITE|{mode -> Overwrit...|NULL|{1961890842333881}|1121-114032-6bcdi4lb|       NULL|WriteSerializable|        false|{numFiles -> 4, n...|        NULL|Databricks-Runtim...|
+-------+-------------------+---

In [0]:
# QUESTION 6:
# Demonstrate Delta Lake Time Travel.

from delta.tables import DeltaTable

# Load Delta table as DeltaTable object to perform operations
dt = DeltaTable.forPath(spark, delta_path)

# Show table history (versions)
dt.history().show()

# Update some rows (creates a new version)
dt.update(
    condition="status = 'CREATED'",
    set={"status": "'PAID'"}
)

# Reading an older version using versionAsOf
old_df = spark.read.format("delta") \
            .option("versionAsOf", 0) \
            .load(delta_path)

old_df.show()



+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      4|2025-11-30 18:24:52|144900505017890|kurapativivek1684...|   DELETE|{predicate -> ["(...|NULL|{1961890842333881}|1121-114032-6bcdi4lb|          3|WriteSerializable|        false|{numRemovedFiles ...|        NULL|Databricks-Runtim...|
|      3|2025-11-30 18:24:44|144

In [0]:
# QUESTION 7:
# Demonstrate Schema Evolution by adding new columns.

# New data with additional fields
data_new = [
    ("O5","2025-01-03 10:00:00","C200","IN",1500.0,"INR","PAID","UPI","NEWYEAR"),
    ("O6","2025-01-03 11:30:00","C201","US",250.0,"USD","CREATED","CARD",None)
]

# Updated schema for new columns
schema_new = StructType([
    StructField("order_id", StringType()),
    StructField("order_timestamp", StringType()),
    StructField("customer_id", StringType()),
    StructField("country", StringType()),
    StructField("amount", DoubleType()),
    StructField("currency", StringType()),
    StructField("status", StringType()),
    StructField("payment_method", StringType()),
    StructField("coupon_code", StringType())
])

# Creating new DataFrame
df_new = spark.createDataFrame(data_new, schema_new) \
                .withColumn("order_timestamp", to_timestamp("order_timestamp")) \
                .withColumn("order_date", to_date("order_timestamp"))


In [0]:
# Write new data to Delta table with schema evolution
df_new.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .partitionBy("country", "order_date") \
    .save(delta_path)
df_new.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .partitionBy("country","order_date") \
    .save(delta_path)


In [0]:

# QUESTION 8:
# Demonstrate Delta Lake UPDATE.

dt.update("status = 'CREATED'", {"status": "'PENDING_APPROVAL'"})


In [0]:
# QUESTION 9:
# Demonstrate Delta Lake DELETE operation.

dt.delete("amount < 100")



In [0]:
# Create RAW (unoptimized) Delta table in DBFS

raw_path = "/mnt/delta/orders_raw"

# Write the same DataFrame df to raw_path WITHOUT OPTIMIZE
df.write.format("delta") \
    .mode("overwrite") \
    .save(raw_path)


import time
start_time = time.time()
display(dbutils.fs.ls(raw_path))
end_time = time.time()
print("Time taken for the operation before optimising: ", end_time - start_time)


path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764528468000
dbfs:/mnt/delta/orders_raw/part-00000-091904a8-df3f-42fe-8e89-676b0747a3d6.c000.snappy.parquet,part-00000-091904a8-df3f-42fe-8e89-676b0747a3d6.c000.snappy.parquet,2034,1764528835000
dbfs:/mnt/delta/orders_raw/part-00000-39653e2c-176c-4197-a085-2f43c24d326c.c000.snappy.parquet,part-00000-39653e2c-176c-4197-a085-2f43c24d326c.c000.snappy.parquet,2034,1764528470000
dbfs:/mnt/delta/orders_raw/part-00000-62d76268-9ab1-49ad-91f1-cc7fc235866b.c000.snappy.parquet,part-00000-62d76268-9ab1-49ad-91f1-cc7fc235866b.c000.snappy.parquet,2034,1764528647000
dbfs:/mnt/delta/orders_raw/part-00000-8a6a96a9-2abf-44ec-9952-9a64d0f29923.c000.snappy.parquet,part-00000-8a6a96a9-2abf-44ec-9952-9a64d0f29923.c000.snappy.parquet,2034,1764528735000
dbfs:/mnt/delta/orders_raw/part-00000-b1a97453-59e4-4920-a131-022b52814de3.c000.snappy.parquet,part-00000-b1a97453-59e4-4920-a131-022b52814de3.c000.snappy.parquet,2034,1764529058000
dbfs:/mnt/delta/orders_raw/part-00001-61954da0-2dcc-470e-adfb-d5ad18423ce7.c000.snappy.parquet,part-00001-61954da0-2dcc-470e-adfb-d5ad18423ce7.c000.snappy.parquet,2018,1764528647000
dbfs:/mnt/delta/orders_raw/part-00001-864e3354-0b76-41ad-9b2b-ad7dcd8d84e4.c000.snappy.parquet,part-00001-864e3354-0b76-41ad-9b2b-ad7dcd8d84e4.c000.snappy.parquet,2018,1764528470000
dbfs:/mnt/delta/orders_raw/part-00001-93cfc0d9-2cc6-4531-b504-284d98b26075.c000.snappy.parquet,part-00001-93cfc0d9-2cc6-4531-b504-284d98b26075.c000.snappy.parquet,2018,1764528835000
dbfs:/mnt/delta/orders_raw/part-00001-aca5d46e-733f-4a3b-afc5-f1859c189803.c000.snappy.parquet,part-00001-aca5d46e-733f-4a3b-afc5-f1859c189803.c000.snappy.parquet,2018,1764528735000


Time taken for the operation before optimising:  0.3507258892059326


In [0]:
%sql
-- BONUS QUESTION:
-- Optimize Delta table to reduce small files and improve performance.
OPTIMIZE delta.`/mnt/delta/orders_raw`
ZORDER BY (customer_id)


path,metrics
dbfs:/mnt/delta/orders_raw,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(1, 2292), List(0, 0), 1, List(0, 0), 0, null), null, 0, 0, 1, 1, false, 0, 0, 1764529140183, 1764529140668, 4, 0, null, List(0, 0), null, 8, 8, 0, 0, null)"


In [0]:


import time
start_time = time.time()
display(dbutils.fs.ls(raw_path))
end_time = time.time()
print("Time taken for the operation after optimising: ", end_time - start_time)



path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764528468000
dbfs:/mnt/delta/orders_raw/part-00000-091904a8-df3f-42fe-8e89-676b0747a3d6.c000.snappy.parquet,part-00000-091904a8-df3f-42fe-8e89-676b0747a3d6.c000.snappy.parquet,2034,1764528835000
dbfs:/mnt/delta/orders_raw/part-00000-39653e2c-176c-4197-a085-2f43c24d326c.c000.snappy.parquet,part-00000-39653e2c-176c-4197-a085-2f43c24d326c.c000.snappy.parquet,2034,1764528470000
dbfs:/mnt/delta/orders_raw/part-00000-62d76268-9ab1-49ad-91f1-cc7fc235866b.c000.snappy.parquet,part-00000-62d76268-9ab1-49ad-91f1-cc7fc235866b.c000.snappy.parquet,2034,1764528647000
dbfs:/mnt/delta/orders_raw/part-00000-6f9c2c80-2930-4c15-9b98-358d6d7fec1b.c000.snappy.parquet,part-00000-6f9c2c80-2930-4c15-9b98-358d6d7fec1b.c000.snappy.parquet,2292,1764529098000
dbfs:/mnt/delta/orders_raw/part-00000-8a6a96a9-2abf-44ec-9952-9a64d0f29923.c000.snappy.parquet,part-00000-8a6a96a9-2abf-44ec-9952-9a64d0f29923.c000.snappy.parquet,2034,1764528735000
dbfs:/mnt/delta/orders_raw/part-00000-b1a97453-59e4-4920-a131-022b52814de3.c000.snappy.parquet,part-00000-b1a97453-59e4-4920-a131-022b52814de3.c000.snappy.parquet,2034,1764529058000
dbfs:/mnt/delta/orders_raw/part-00001-61954da0-2dcc-470e-adfb-d5ad18423ce7.c000.snappy.parquet,part-00001-61954da0-2dcc-470e-adfb-d5ad18423ce7.c000.snappy.parquet,2018,1764528647000
dbfs:/mnt/delta/orders_raw/part-00001-864e3354-0b76-41ad-9b2b-ad7dcd8d84e4.c000.snappy.parquet,part-00001-864e3354-0b76-41ad-9b2b-ad7dcd8d84e4.c000.snappy.parquet,2018,1764528470000
dbfs:/mnt/delta/orders_raw/part-00001-93cfc0d9-2cc6-4531-b504-284d98b26075.c000.snappy.parquet,part-00001-93cfc0d9-2cc6-4531-b504-284d98b26075.c000.snappy.parquet,2018,1764528835000


Time taken for the operation after optimising:  0.41646504402160645
