In [0]:
# QUESTION-1 :
# Load imports functions
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import to_timestamp, to_date, col

# 1. Define the raw ecommerce sample data ( JSON/CSV records)
raw_inputs = [
    ("O1", "2025-01-01 12:05:00", "C100", "US", 120.5, "USD", "CREATED"),
    ("O2", "2025-01-01 12:15:00", "C101", "IN", 999.0, "INR", "PAID"),
    ("O3", "2025-01-02 08:30:00", "C102", "US", 55.0, "USD", "CANCELLED"),
    ("O4", "2025-01-02 09:45:00", "C103", "UK", 300.0, "GBP", "PAID"),
    ("O5", "2025-01-03 14:20:00", "C104", "CA", 150.75, "CAD", "SHIPPED"),
    ("O6", "2025-01-03 16:10:00", "C105", "US", 89.99, "USD", "PAID"),
    ("O7", "2025-01-04 10:00:00", "C106", "IN", 2500.0, "INR", "PENDING"),
    ("O8", "2025-01-04 11:30:00", "C107", "UK", 45.50, "GBP", "RETURNED"),
    ("O9", "2025-01-05 09:15:00", "C108", "DE", 200.0, "EUR", "CREATED"),
    ("O10", "2025-01-05 13:45:00", "C109", "FR", 75.25, "EUR", "PAID"),
    ("O11", "2025-01-06 08:00:00", "C110", "US", 500.0, "USD", "PROCESSING"),
    ("O12", "2025-01-06 15:20:00", "C111", "JP", 12000.0, "JPY", "SHIPPED"),
    ("O13", "2025-01-07 10:45:00", "C112", "AU", 110.0, "AUD", "PAID"),
    ("O14", "2025-01-07 12:30:00", "C113", "US", 45.0, "USD", "CANCELLED"),
    ("O15", "2025-01-08 14:00:00", "C114", "IN", 650.0, "INR", "CREATED")
]

# 2. Define the schema to enforce data types
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("order_timestamp", StringType()),
    StructField("customer_id", StringType()),
    StructField("country", StringType()),
    StructField("amount", DoubleType()),
    StructField("currency", StringType()),
    StructField("status", StringType())
])

# 3. Create the DataFrame and cast the timestamp string to an actual Timestamp type
orders_df = spark.createDataFrame(raw_inputs, order_schema) \
    .withColumn("order_timestamp", to_timestamp("order_timestamp"))

# Display the initial dataframe
orders_df.show()

+--------+-------------------+-----------+-------+-------+--------+----------+
|order_id|    order_timestamp|customer_id|country| amount|currency|    status|
+--------+-------------------+-----------+-------+-------+--------+----------+
|      O1|2025-01-01 12:05:00|       C100|     US|  120.5|     USD|   CREATED|
|      O2|2025-01-01 12:15:00|       C101|     IN|  999.0|     INR|      PAID|
|      O3|2025-01-02 08:30:00|       C102|     US|   55.0|     USD| CANCELLED|
|      O4|2025-01-02 09:45:00|       C103|     UK|  300.0|     GBP|      PAID|
|      O5|2025-01-03 14:20:00|       C104|     CA| 150.75|     CAD|   SHIPPED|
|      O6|2025-01-03 16:10:00|       C105|     US|  89.99|     USD|      PAID|
|      O7|2025-01-04 10:00:00|       C106|     IN| 2500.0|     INR|   PENDING|
|      O8|2025-01-04 11:30:00|       C107|     UK|   45.5|     GBP|  RETURNED|
|      O9|2025-01-05 09:15:00|       C108|     DE|  200.0|     EUR|   CREATED|
|     O10|2025-01-05 13:45:00|       C109|     FR|  

In [0]:
# QUESTION-2:
# Create a new column 'order_date' derived from the timestamp.

# We extract just the date portion to use it for partitioning later
orders_df = orders_df.withColumn("order_date", to_date("order_timestamp"))

# Verify the new column exists
orders_df.show()

+--------+-------------------+-----------+-------+-------+--------+----------+----------+
|order_id|    order_timestamp|customer_id|country| amount|currency|    status|order_date|
+--------+-------------------+-----------+-------+-------+--------+----------+----------+
|      O1|2025-01-01 12:05:00|       C100|     US|  120.5|     USD|   CREATED|2025-01-01|
|      O2|2025-01-01 12:15:00|       C101|     IN|  999.0|     INR|      PAID|2025-01-01|
|      O3|2025-01-02 08:30:00|       C102|     US|   55.0|     USD| CANCELLED|2025-01-02|
|      O4|2025-01-02 09:45:00|       C103|     UK|  300.0|     GBP|      PAID|2025-01-02|
|      O5|2025-01-03 14:20:00|       C104|     CA| 150.75|     CAD|   SHIPPED|2025-01-03|
|      O6|2025-01-03 16:10:00|       C105|     US|  89.99|     USD|      PAID|2025-01-03|
|      O7|2025-01-04 10:00:00|       C106|     IN| 2500.0|     INR|   PENDING|2025-01-04|
|      O8|2025-01-04 11:30:00|       C107|     UK|   45.5|     GBP|  RETURNED|2025-01-04|
|      O9|

In [0]:
# QUESTION 3:
# Save the DataFrame as a partitioned Delta table.

main_table_path = "/mnt/delta/orders"

# Writing to Delta format. 
# We partition by 'country' and 'order_date' to optimize future queries.
orders_df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("country", "order_date") \
    .save(main_table_path)

In [0]:
# QUESTION 4:
# Check the physical folder structure to verify partitions were created.
main_table_path = "/mnt/delta/orders"
# This lists the directories created by the write operation above
display(dbutils.fs.ls(main_table_path))

path,name,size,modificationTime
dbfs:/mnt/delta/orders/_delta_log/,_delta_log/,0,1764597634000
dbfs:/mnt/delta/orders/country=AU/,country=AU/,0,1764599021000
dbfs:/mnt/delta/orders/country=CA/,country=CA/,0,1764599022000
dbfs:/mnt/delta/orders/country=DE/,country=DE/,0,1764599021000
dbfs:/mnt/delta/orders/country=FR/,country=FR/,0,1764599021000
dbfs:/mnt/delta/orders/country=IN/,country=IN/,0,1764597636000
dbfs:/mnt/delta/orders/country=JP/,country=JP/,0,1764599021000
dbfs:/mnt/delta/orders/country=UK/,country=UK/,0,1764597636000
dbfs:/mnt/delta/orders/country=US/,country=US/,0,1764597636000
dbfs:/mnt/delta/orders/deletion_vector_41bca8cd-6959-4b3e-891e-1c167d73dfa5.bin,deletion_vector_41bca8cd-6959-4b3e-891e-1c167d73dfa5.bin,43,1764597664000


In [0]:
# QUESTION 5:
# Demonstrate Partition Pruning (reading only necessary data).

# Load the data back from Delta Lake
read_df = spark.read.format("delta").load(main_table_path)

# Query 1: Filter by country (Spark will skip partitions for other countries)
print("Filtering by Country US:")
read_df.filter(col("country") == "US").show()

# Query 2: Filter by country AND date (skips even more files)
print("Filtering by Country US and specific Date:")
read_df.filter((col("country") == "US") & (col("order_date") == "2025-01-01")).show()

Filtering by Country US:
+--------+-------------------+-----------+-------+------+--------+----------+----------+--------------+-----------+
|order_id|    order_timestamp|customer_id|country|amount|currency|    status|order_date|payment_method|coupon_code|
+--------+-------------------+-----------+-------+------+--------+----------+----------+--------------+-----------+
|     O11|2025-01-06 08:00:00|       C110|     US| 500.0|     USD|PROCESSING|2025-01-06|          NULL|       NULL|
|     O14|2025-01-07 12:30:00|       C113|     US|  45.0|     USD| CANCELLED|2025-01-07|          NULL|       NULL|
|      O3|2025-01-02 08:30:00|       C102|     US|  55.0|     USD| CANCELLED|2025-01-02|          NULL|       NULL|
|      O1|2025-01-01 12:05:00|       C100|     US| 120.5|     USD|   CREATED|2025-01-01|          NULL|       NULL|
|      O6|2025-01-03 16:10:00|       C105|     US| 89.99|     USD|      PAID|2025-01-03|          NULL|       NULL|
+--------+-------------------+-----------+-----

In [0]:
# QUESTION 6:
# Show Delta Lake Time Travel capabilities.

from delta.tables import DeltaTable

# Initialize the DeltaTable object
delta_table = DeltaTable.forPath(spark, main_table_path)

# 1. View the version history
delta_table.history().show()

# 2. Update a record (this creates a new version in the transaction log)
delta_table.update(
    condition = "status = 'CREATED'",
    set = { "status": "'PAID'" }
)

# 3. Read the previous version (Version 0) before the update happened
historical_df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(main_table_path)

historical_df.show()

+-------+--------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|         userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     18| 2025-12-01 14:24:16|147336542809330|sreejithreddy16@o...|    WRITE|{mode -> Overwrit...|NULL|{3665368013354046}|1201-140023-j3nbs...|         17|WriteSerializable|        false|{numFiles -> 15, ...|        NULL|Databricks-Runtim...|
|     17| 2025-12-01 14:23:5

In [0]:
# QUESTION 7:
# Prepare new data that has extra columns to demonstrate Schema Evolution.

# New records containing 'payment_method' and 'coupon_code'
incoming_data = [
    ("O5", "2025-01-03 10:00:00", "C200", "IN", 1500.0, "INR", "PAID", "UPI", "NEWYEAR"),
    ("O6", "2025-01-03 11:30:00", "C201", "US", 250.0, "USD", "CREATED", "CARD", None)
]

# Define the updated schema
evolved_schema = StructType([
    StructField("order_id", StringType()),
    StructField("order_timestamp", StringType()),
    StructField("customer_id", StringType()),
    StructField("country", StringType()),
    StructField("amount", DoubleType()),
    StructField("currency", StringType()),
    StructField("status", StringType()),
    StructField("payment_method", StringType()), # New Column
    StructField("coupon_code", StringType())     # New Column
])

# Create the DataFrame and process dates
new_orders_df = spark.createDataFrame(incoming_data, evolved_schema) \
    .withColumn("order_timestamp", to_timestamp("order_timestamp")) \
    .withColumn("order_date", to_date("order_timestamp"))

In [0]:
# Write the new data using 'mergeSchema' to update the table structure automatically
new_orders_df.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .partitionBy("country", "order_date") \
    .save(main_table_path)

In [0]:
# QUESTION 8:
# Perform a Delta Lake UPDATE operation.

# Change status to 'PENDING_APPROVAL' where it is currently 'CREATED'
delta_table.update(
    "status = 'CREATED'", 
    {"status": "'PENDING_APPROVAL'"}
)

DataFrame[num_affected_rows: bigint]

In [0]:
# QUESTION 9:
# Perform a Delta Lake DELETE operation.

# Remove any orders where the amount is less than 100
delta_table.delete("amount < 100")

DataFrame[num_affected_rows: bigint]

In [0]:
# Create a separate 'Raw' table to test optimization performance.

unoptimized_path = "/mnt/delta/orders_raw"

# Write the data without any specific optimization first
orders_df.write.format("delta") \
    .mode("overwrite") \
    .save(unoptimized_path)

import time

# Measure how long it takes to list files before optimization
t_start = time.time()
display(dbutils.fs.ls(unoptimized_path))
t_end = time.time()

print(f"Listing time before optimization: {t_end - t_start} seconds")

path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764597675000
dbfs:/mnt/delta/orders_raw/part-00000-1503c0aa-f7cf-4466-acec-e92aaf1bbfa1.c000.snappy.parquet,part-00000-1503c0aa-f7cf-4466-acec-e92aaf1bbfa1.c000.snappy.parquet,2702,1764599033000
dbfs:/mnt/delta/orders_raw/part-00000-873dd35a-13b7-4957-9b27-ca238d575597.c000.snappy.parquet,part-00000-873dd35a-13b7-4957-9b27-ca238d575597.c000.snappy.parquet,2300,1764597676000
dbfs:/mnt/delta/orders_raw/part-00000-88ecc10a-6b08-41c0-bb56-dd96cd7f1ac7.c000.snappy.parquet,part-00000-88ecc10a-6b08-41c0-bb56-dd96cd7f1ac7.c000.snappy.parquet,2702,1764598892000
dbfs:/mnt/delta/orders_raw/part-00000-ebf640cb-ba47-4b97-b24b-061529255ffe.c000.snappy.parquet,part-00000-ebf640cb-ba47-4b97-b24b-061529255ffe.c000.snappy.parquet,2702,1764599067000


Listing time before optimization: 0.39864301681518555 seconds


In [0]:
%sql
-- BONUS QUESTION:
-- Run OPTIMIZE and Z-ORDER to compact files and skip data faster.
OPTIMIZE delta.`/mnt/delta/orders_raw`
ZORDER BY (customer_id)

path,metrics
dbfs:/mnt/delta/orders_raw,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 2702), 0, List(0, 0), 0, null), null, 0, 0, 1, 1, false, 0, 0, 1764599068226, 1764599068492, 8, 0, null, List(0, 0), null, 8, 8, 0, 0, null)"


In [0]:
# Verify performance improvement after Optimization.

t_start_opt = time.time()
display(dbutils.fs.ls(unoptimized_path))
t_end_opt = time.time()

print(f"Listing time after optimization: {t_end_opt - t_start_opt} seconds")

path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764597675000
dbfs:/mnt/delta/orders_raw/part-00000-1503c0aa-f7cf-4466-acec-e92aaf1bbfa1.c000.snappy.parquet,part-00000-1503c0aa-f7cf-4466-acec-e92aaf1bbfa1.c000.snappy.parquet,2702,1764599033000
dbfs:/mnt/delta/orders_raw/part-00000-873dd35a-13b7-4957-9b27-ca238d575597.c000.snappy.parquet,part-00000-873dd35a-13b7-4957-9b27-ca238d575597.c000.snappy.parquet,2300,1764597676000
dbfs:/mnt/delta/orders_raw/part-00000-88ecc10a-6b08-41c0-bb56-dd96cd7f1ac7.c000.snappy.parquet,part-00000-88ecc10a-6b08-41c0-bb56-dd96cd7f1ac7.c000.snappy.parquet,2702,1764598892000
dbfs:/mnt/delta/orders_raw/part-00000-ebf640cb-ba47-4b97-b24b-061529255ffe.c000.snappy.parquet,part-00000-ebf640cb-ba47-4b97-b24b-061529255ffe.c000.snappy.parquet,2702,1764599067000


Listing time after optimization: 0.45209383964538574 seconds
