In [0]:
from pyspark.sql import functions as F
from delta.tables import *
import time

#### SETUP: Load Data & Define Paths

In [0]:
delta_path = "/Volumes/workspace/ecommerce/ecommerce_data/delta/events_silver"
# We use the managed table name for SQL commands
table_name = "events_managed_sample"

print(f"📍 Target Path: {delta_path}")
print(f"📍 Target Table: {table_name}")

# Initialize DeltaTable object for PySpark commands
deltaTable = DeltaTable.forPath(spark, delta_path)

📍 Target Path: /Volumes/workspace/ecommerce/ecommerce_data/delta/events_silver
📍 Target Table: events_managed_sample


### Implement Incremental MERGE (Upsert)

In [0]:
print("\n--- Task 1: Incremental MERGE ---")

# 1. Prepare New Data (Simulation)
# FIX: Added 'category_id' (set to 0) to match the target table schema exactly.
new_data = [
    # Existing User (Update): Price correction
    ("2019-10-01 00:00:00", "purchase", 1002544, 0, "electronics.smartphone", "electronics", "apple", 850.00, 518958788, "sess_1"),
    # New User (Insert): Late arrival
    ("2019-10-02 10:00:00", "view", 5555555, 0, "apparel.shoes", "apparel", "adidas", 120.00, 777777777, "sess_new")
]

# FIX: Added 'category_id' to the columns list
columns = ["event_time", "event_type", "product_id", "category_id", "category_code", "category_main", "brand", "price", "user_id", "user_session"]

updates_df = spark.createDataFrame(new_data, columns)

# 2. Execute Merge
print("🔄 Performing Upsert...")
deltaTable.alias("t").merge(
    updates_df.alias("s"),
    # Condition: Match User + Product + Session
    "t.user_id = s.user_id AND t.product_id = s.product_id AND t.user_session = s.user_session"
).whenMatchedUpdate(set = {
    "price": "s.price" # Update logic
}).whenNotMatchedInsertAll() \
 .execute()

print("Task 1 Complete: Data merged successfully.")


--- Task 1: Incremental MERGE ---
🔄 Performing Upsert...
Task 1 Complete: Data merged successfully.


### Query Historical Versions (Time Travel)

In [0]:
# Requirement: View the table history and access a previous state.
print("\n--- Task 2: Time Travel ---")

# 1. View History
print("📜 Table Commit History:")
display(deltaTable.history().limit(5))

# 2. Travel back to Version 0 (Original state)
df_v0 = spark.read \
    .format("delta") \
    .option("versionAsOf", 0) \
    .load(delta_path)

print(f"Task 2 Complete: Loaded Version 0 ({df_v0.count():,} rows).")


--- Task 2: Time Travel ---
📜 Table Commit History:


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2026-01-13T09:33:33.000Z,75645690044269,meenakshi.urvs@gmail.com,MERGE,"Map(predicate -> [""(((cast(user_id#13291 as bigint) = user_id#13341L) AND (cast(product_id#13286 as bigint) = product_id#13335L)) AND (user_session#13292 = user_session#13342))""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3504017974982911),0113-093134-n7xioa89-v2n,4,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 4751, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 5888, materializeSourceTimeMs -> 370, numTargetRowsInserted -> 2, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 2244, numTargetRowsUpdated -> 0, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 3098)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
4,2026-01-12T11:42:01.000Z,75645690044269,meenakshi.urvs@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(3208981861955938),0112-111102-53f71rql-v2n,3,WriteSerializable,False,"Map(numFiles -> 12, numRemovedFiles -> 12, numRemovedBytes -> 1471926065, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1471926065)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
3,2026-01-12T11:26:56.000Z,75645690044269,meenakshi.urvs@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(3208981861955938),0112-111102-53f71rql-v2n,2,WriteSerializable,False,"Map(numFiles -> 12, numRemovedFiles -> 12, numRemovedBytes -> 1471926065, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1471926065)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
2,2026-01-12T11:13:05.000Z,75645690044269,meenakshi.urvs@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(3208981861955938),0112-111102-53f71rql-v2n,1,WriteSerializable,False,"Map(numFiles -> 12, numRemovedFiles -> 12, numRemovedBytes -> 1471926065, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1471926065)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
1,2026-01-12T07:11:34.000Z,75645690044269,meenakshi.urvs@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(3208981861955938),0112-070104-f8da936c-v2n,0,WriteSerializable,False,"Map(numFiles -> 12, numRemovedFiles -> 12, numRemovedBytes -> 1471926065, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1471926065)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


Task 2 Complete: Loaded Version 0 (42,448,764 rows).


### Optimize Tables (Z-ORDER)

In [0]:
# Requirement: Compact small files and organize data for faster querying.
print("\n--- Task 3: Optimize & Z-Order ---")

# We use Spark SQL for the Optimize command
# Z-Ordering by 'event_type' and 'brand' speeds up filters on those columns
spark.sql(f"""
    OPTIMIZE delta.`{delta_path}`
    ZORDER BY (event_type, brand)
""")

print("Task 3 Complete: Table optimized via Path.")


--- Task 3: Optimize & Z-Order ---
Task 3 Complete: Table optimized via Path.


### Clean Old Files (VACUUM)

In [0]:
# Requirement: Remove stale files to save storage space.
print("\n--- Task 4: Vacuum (Cleanup) ---")
print("Running VACUUM (Standard Retention)...")

# We use 'RETAIN 168 HOURS' (7 days), which is the standard safety limit.
# This respects the cluster's safety policies while proving the command works.
spark.sql(f"VACUUM delta.`{delta_path}` RETAIN 168 HOURS")

print("Task 4 Complete: Vacuum command executed successfully.")


--- Task 4: Vacuum (Cleanup) ---
Running VACUUM (Standard Retention)...
Task 4 Complete: Vacuum command executed successfully.
