# **PHASE 2: DATA ENGINEERING (Days 5-8)**

## **DAY 5 (13/01/26) - Delta Lake Advanced**



### **Section 1 - Learn**:

### **_1. Time travel (version history)_**

Delta Lake **Time Travel** is a powerful feature that allows you to query historical versions of your data. Because every transaction (Insert, Update, Delete) is recorded in a versioned **Transaction Log**, you can "go back in time" to see exactly what your table looked like at any specific point.

Here are the key points in bullet form:

#### 1. How to Access History

* **The "DVR" for Data:** Every write operation creates a new **Version Number** (starting at 0) and a **Timestamp**.
* **Version Query:** You can query a specific version using `VERSION AS OF X`.
* **Timestamp Query:** You can query by a date or time string using `TIMESTAMP AS OF 'yyyy-MM-dd'`.

#### 2. Core Use Cases

* **Audit & Compliance:** Easily prove what data looked like during a specific month-end or financial close without maintaining manual backups.
* **Error Recovery:** If a bug in your code accidentally deletes or corrupts rows, you can instantly see the "good" data from the previous version.
* **ML Reproducibility:** Data scientists can re-run models on the exact same dataset version used months ago, ensuring consistent results for experiments.
* **Snapshot Isolation:** You can "pin" a set of queries to a single version of a fast-changing table so that your multi-step report doesn't change halfway through execution.

#### 3. Key Commands (PySpark & SQL)

* **View Table History:** Use `DESCRIBE HISTORY table_name` to see a list of versions, who made the change, and what operation was performed (e.g., MERGE, UPDATE).
* **Rollback (Restore):** You can permanently revert a table to a previous state using the `RESTORE` command:
  * *SQL:* `RESTORE TABLE my_table TO VERSION AS OF 5`
  * *Python:* `deltaTable.restoreToVersion(5)`



#### 4. Important Constraints

* **The "Vacuum" Limit:** You can only time travel to versions for which the underlying data files still exist. If you run the **`VACUUM`** command (which deletes old files to save space), you lose the ability to go back past that cleanup point.
* **Default Retention:** By default, Databricks keeps 30 days of transaction logs and 7 days of actual data files for time travel. You can increase these durations in the table properties:
  * `delta.logRetentionDuration` (History log)
  * `delta.deletedFileRetentionDuration` (Physical data files)

---

### **_2. MERGE operations (upserts)_**

In PySpark, the **`MERGE`** operation (often called an **Upsert**) is a single, powerful command that allows you to simultaneously insert new records, update existing ones, and even delete data based on a matching condition.

In a traditional Data Lake (Parquet), you would have to rewrite the entire partition to change a single row. Delta Lake makes this efficient by only rewriting the specific files that contain the "touched" data.

#### **The Three Core Clauses**

* **`whenMatchedUpdate(...)`**: If the `source.id` matches a `target.id`, update the existing row in the target table with new values.
* **`whenNotMatchedInsert(...)`**: If a record exists in the source but **not** in the target, insert it as a brand-new row.
* **`whenNotMatchedBySourceDelete()`**: (Optional) If a record exists in the target but is **missing** from the source, delete it from the target. This is useful for synchronizing a mirror image of a source database.

#### **Key Industry Use Cases**

* **Change Data Capture (CDC):** Syncing your Data Lake with a production SQL database by applying a daily stream of "Inserts" and "Updates."
* **GDPR Compliance:** Efficiently finding and deleting specific user records across massive datasets without a full table rewrite.
* **SCD Type 1 & 2:** Maintaining "Slowly Changing Dimensions"—either overwriting old values (Type 1) or keeping a history of changes (Type 2).
* **Deduplication:** Merging a new batch of data while ensuring you don't create duplicate records for users already in the system.

#### **PySpark Code Example**

```python
from delta.tables import DeltaTable

# 1. Load the existing target Delta table
target_table = DeltaTable.forPath(spark, "/mnt/delta/customers")

# 2. Perform the Merge
target_table.alias("t").merge(
    source = df_updates.alias("s"),
    condition = "t.customer_id = s.customer_id"
).whenMatchedUpdate(set = {
    "email": "s.email",
    "last_updated": "current_timestamp()"
}).whenNotMatchedInsert(values = {
    "customer_id": "s.customer_id",
    "name": "s.name",
    "email": "s.email",
    "last_updated": "current_timestamp()"
}).execute()

```


#### **Performance Note: "Shuffle" Costs**

A `MERGE` operation is essentially a **Join** followed by a **Write**. To keep it fast:

* **Z-Order your join keys:** If you frequently merge on `customer_id`, Z-Ordering the target table by that column will help Spark find the relevant files much faster.
* **Broadcast Small Sources:** If your "updates" DataFrame is small, Spark can broadcast it to all nodes, avoiding a massive shuffle of the multi-terabyte target table.


---

### **_3. OPTIMIZE & ZORDER_**

In Delta Lake, `OPTIMIZE` and `ZORDER` are the two primary tools used to physically reorganize data on disk to improve query performance and reduce storage overhead.

##### **1. OPTIMIZE (File Compaction)**

* **The "Small File Problem":** Frequent streaming or small batch writes often create thousands of tiny Parquet files. This slows down queries because the engine spends more time opening/closing files and reading metadata than actually processing data.
* **Bin-Packing:** The `OPTIMIZE` command performs "compaction." It takes those small files and merges them into larger, right-sized files (defaulting to **1GB**).
* **Metadata Efficiency:** By reducing the number of files, Spark has much less work to do when scanning the transaction log and listing files in cloud storage.

##### **2. ZORDER (Data Clustering)**

* **Multi-Dimensional Sorting:** While `OPTIMIZE` just makes files bigger, adding `ZORDER BY` physically **rearranges the rows** within those files based on specific columns.
* **Data Skipping:** It co-locates related information. If you Z-Order by `customer_id`, rows for the same customer will be grouped into the same file. When you filter for that ID, Delta Lake skips the files that don't contain it by checking the min/max statistics in the log.
* **High Cardinality Columns:** Z-Ordering is most effective on columns that have many unique values (like `ID`, `email`, or `phone_number`) and are frequently used in `WHERE` clauses or as `JOIN` keys.
* **The 4-Column Limit:** Effectiveness drops if you Z-Order by too many columns (usually stay under 1–4). It is a "trade-off" tool—the more columns you Z-Order, the less "tight" the clustering becomes for each one.

##### **Summary Comparison**

| Feature | OPTIMIZE | ZORDER |
| --- | --- | --- |
| **Primary Goal** | Solve the "Small File Problem" | Enable advanced "Data Skipping" |
| **What it does** | Merges small files into ~1GB files | Clusters related row values together |
| **When to run** | After many small writes/updates | On large tables with frequent filters |
| **Resource Cost** | Low to Moderate (I/O heavy) | High (Compute intensive sorting) |

##### **PySpark & SQL Usage**

```sql
-- SQL Syntax
OPTIMIZE my_table 
ZORDER BY (customer_id, event_type);

```

```python
# PySpark Syntax
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
deltaTable.optimize().zorderBy("customer_id").executeZOrderBy()

```

---

### **_4. VACUUM for cleanup_**

In Delta Lake, the `VACUUM` command is the primary maintenance tool used to delete data files that are no longer needed by the current version of a table. It is essential for managing storage costs and maintaining data privacy (e.g., GDPR compliance).

##### **1. Why do we need VACUUM?**

* **Managing Deleted Files:** When you `UPDATE` or `DELETE` data in Delta Lake, the old files aren't physically erased immediately. They are simply "marked as removed" in the Transaction Log so that **Time Travel** is still possible.
* **Storage Optimization:** Over time, these obsolete files accumulate, leading to "orphaned" data that takes up space and increases cloud storage costs. `VACUUM` permanently removes these files from the underlying storage (S3/ADLS/GCS).

##### **2. The Safety Threshold (Retention Period)**

* **Default Protection:** By default, `VACUUM` only deletes files that were deleted/overwritten **more than 7 days ago**.
* **Preventing Corruption:** This 7-day buffer is a safety mechanism. If you vacuum files that are currently being read by a long-running Spark job, that job will fail because its source files suddenly vanished.
* **Overriding Safety:** You can change this period (e.g., `VACUUM table RETAIN 24 HOURS`), but setting it to 0 is generally blocked unless you change a specific Spark configuration, as it risks corrupting active sessions.

##### **3. Impact on Time Travel**

* **Breaking the "History":** Once you run `VACUUM`, you lose the ability to **Time Travel** back to any version that relied on those deleted files.
* **Metadata vs. Data:** The Transaction Log (the "history list") stays intact, but if you try to query an old version, you will get a `FileNotFoundException` because the physical data is gone.

##### **4. Best Practices**

* **Run Regularly:** Incorporate `VACUUM` into your weekly maintenance pipelines for tables with high update/delete frequency.
* **Run AFTER Optimize:** It is common to run `OPTIMIZE` (which creates new large files) followed by `VACUUM` (which cleans up the old small files left behind).
* **Dry Run First:** Always use the `DRY RUN` clause first to see exactly how many files and how much data will be deleted before actually committing to the cleanup.


##### **Syntax Examples**

**SQL:**

```sql
-- Preview what will be deleted
VACUUM customers_table DRY RUN;

-- Delete files older than the default 7 days
VACUUM customers_table;

-- Delete files older than 100 hours
VACUUM customers_table RETAIN 100 HOURS;

```

**PySpark:**

```python
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/delta/customers")
deltaTable.vacuum(168) # 168 hours = 7 days

```

---

### **Practice**

In [0]:
from pyspark.sql import functions as F

In [0]:
def load_ecommerce_dataset(Month_name):
    df = spark.read.csv(f"/Volumes/workspace/ecommerce/ecommerce_data/2019-{Month_name}.csv", header=True, inferSchema=True)
    return df

In [0]:
# df_n = load_ecommerce_dataset("Nov")
df_o = load_ecommerce_dataset("Oct")

#### **1. Implement incremental MERGE** 

In [0]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType
import datetime

In [0]:
schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])

# Create sample rows
new_rows = [
    # Row A: An update to an existing record (e.g., price change for a specific event)
    Row(datetime.datetime(2019, 10, 1, 0, 0, 1), "view", 1004856, 2053013555631882655, "electronics.smartphone", "samsung", 130.25, 541312840, "72e0966e-bc31-40ef-9c63-907d2d5bcc89"),
    
    # Row B: A brand new event
    Row(datetime.datetime(2026, 1, 13, 10, 0, 0), "purchase", 5000123, 2053013555631882655, "electronics.audio", "apple", 199.99, 999999999, "new-session-uuid-123")
]

# 3. Create the DataFrame
updates_df = spark.createDataFrame(new_rows, schema=schema)

display(updates_df.head())

Row(event_time=datetime.datetime(2019, 10, 1, 0, 0, 1), event_type='view', product_id=1004856, category_id=2053013555631882655, category_code='electronics.smartphone', brand='samsung', price=130.25, user_id=541312840, user_session='72e0966e-bc31-40ef-9c63-907d2d5bcc89')

In [0]:
from delta.tables import DeltaTable

target_table = DeltaTable.forName(spark, "df_o_table")

target_table.alias("target").merge(
    updates_df.alias("source"),
    """
    target.event_time = source.event_time AND 
    target.user_id = source.user_id AND 
    target.product_id = source.product_id AND 
    target.event_type = source.event_type
    """
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

print("Merge operation completed successfully.")

Merge operation completed successfully.


#### **2. Query historical versions** 

In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, "df_o_table")
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2026-01-13T10:10:18.000Z,4346170070925815,shivamdubey2310711@gmail.com,MERGE,"Map(predicate -> [""(((event_time#13286 = event_time#13257) AND (cast(user_id#13293 as bigint) = user_id#13264L)) AND ((cast(product_id#13288 as bigint) = product_id#13259L) AND (event_type#13287 = event_type#13258)))""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> true, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(2870774749036939),0113-095357-ntupxiav-v2n,2.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 4987, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 5224, materializeSourceTimeMs -> 215, numTargetRowsInserted -> 2, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 3007, numTargetRowsUpdated -> 0, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1866)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
2,2026-01-12T08:33:57.000Z,4346170070925815,shivamdubey2310711@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(3926879644577736),0112-081808-3dvwp4c-v2n,1.0,WriteSerializable,False,"Map(numFiles -> 43, numRemovedFiles -> 43, numRemovedBytes -> 1405244735, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1405244735)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
1,2026-01-12T08:32:35.000Z,4346170070925815,shivamdubey2310711@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(3926879644577736),0112-081808-3dvwp4c-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 43, numRemovedFiles -> 43, numRemovedBytes -> 1405244735, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1405244735)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-12T08:27:06.000Z,4346170070925815,shivamdubey2310711@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(3926879644577736),0112-081808-3dvwp4c-v2n,,WriteSerializable,False,"Map(numFiles -> 43, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1405244735)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


In [0]:
# Load the table as it existed at version 0
df_v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .table("df_o_table")

print(f"Version 0 Count: {df_v0.count()}")
display(df_v0.head())

Version 0 Count: 42448764


Row(event_time=datetime.datetime(2019, 10, 19, 6, 15, 6), event_type='view', product_id=49300013, category_id=2125931803410694331, category_code=None, brand='weekend', price=875.18, user_id=515837919, user_session='0df7d269-da26-4087-b593-faee853ab0b5')

#### **3. Optimize**

In [0]:
spark.sql("OPTIMIZE df_o_table ZORDER BY (user_id, event_time)")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

---

#### __4. Clean Up__

In [0]:
# Preview files older than 7 days (the default)
spark.sql("VACUUM df_o_table RETAIN 168 HOURS DRY RUN")

DataFrame[path: string]

In [0]:
# Physically delete the old files
spark.sql("VACUUM df_o_table")

DataFrame[path: string]

---

### **Resources**
- [OPTIMZE and VACCUM](https://docs.databricks.com/aws/en/delta/vacuum/)
- [Tutorials](https://docs.databricks.com/delta/tutorial.html)

----