
#Objective:
Demonstrate how Delta Lake supports incremental pipelines, auditability, performance tuning, and cleanup using production-style patterns.
Dataset:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv

##Configuration & Imports 

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit


##Read Source CSV (Raw Ingestion Layer)

In [0]:
source_path = "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv"

events_raw = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(source_path)
)

events_raw.printSchema()


##Write Initial Delta Table

In [0]:
delta_path = "/Volumes/workspace/ecommerce/ecommerce_data/delta/events_day5"

(
    events_raw
    .write
    .format("delta")
    .mode("overwrite")
    .save(delta_path)
)

üìå Why this matters
This single write enables:

ACID transactions

Versioning

MERGE

OPTIMIZE

VACUUM

##Create Managed Delta Table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS events_managed_day5
USING DELTA
AS SELECT * FROM delta.`/Volumes/workspace/ecommerce/ecommerce_data/delta/events_day5`;


Production intent

Allows BI / SQL users to query data

Decouples storage from access

##Verify Table State

In [0]:
%sql

select * from events_managed_day5 limit 3


##Time Travel ‚Äì Version History
What to explain

Each write = new version

Delta log tracks all changes

In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;


#Query Historical Version
Use case

Auditing

Debugging

Rollbacks

In [0]:
initial_version = (
    spark.read
    .format("delta")
    .option("versionAsOf", 0)
    .load(delta_path)
)

initial_version.count()


#Simulate Incremental Incoming Data
Instead of fake data, we reuse the same dataset realistically.

Why this is realistic

Same schema

Same keys

Modified values ‚Üí update scenario

In [0]:
incremental_updates = (
    events_raw
    .limit(1000)
    .withColumn("event_type", lit("purchase"))
)


#Incremental MERGE (Upsert Pattern)
Production concepts demonstrated

Idempotent writes

No duplicate explosion

Atomic transaction

In [0]:
%sql
DROP TABLE IF EXISTS events_managed_day5;

CREATE TABLE events_managed_day5
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/ecommerce_data/delta/events_day5`;


In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;

##Validate MERGE Operation


In [0]:
delta_table = DeltaTable.forName(spark, "events_managed_day5")

(
    delta_table.alias("target")
    .merge(
        incremental_updates.alias("source"),
        """
        target.user_session = source.user_session
        AND target.event_time = source.event_time
        """
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


#Verify Delta Version Increment (Post-MERGE Validation)
What you should observe

Version 0 ‚Üí Initial table creation

Version 1 ‚Üí MERGE operation

This confirms ACID commit and transaction logging.


In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;


#Time Travel ‚Äì Compare Before vs After MERGE
Time travel proves:

Data changes are auditable

Rollback is possible

Delta ‚â† plain Parquet

Read Version 0 (Before MERGE)

In [0]:
before_merge = (
    spark.read
    .format("delta")
    .option("versionAsOf", 0)
    .table("events_managed_day5")
)

before_merge.count()


#Read Latest Version (After MERGE)

In [0]:
after_merge = spark.read.table("events_managed_day5")

after_merge.count()


#Clear All Data (Keep Table Structure)

In [0]:
%sql
TRUNCATE TABLE events_managed_day5;


In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;

#Reload Base Data from File

In [0]:
base_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv")
)

base_df.write.format("delta").mode("append").saveAsTable("events_managed_day5")


In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;

#Create Incremental Dataset (Updates + Inserts)
üîç What we simulate
Scenario	Rows
Updates (matched keys)	500
Inserts (new keys)	500

In [0]:
from pyspark.sql.functions import lit, concat, col

current_df = spark.read.table("events_managed_day5")

# Rows to UPDATE (same key)
updates_df = (
    current_df.limit(500)
    .withColumn("event_type", lit("purchase"))
)

# Rows to INSERT (new key)
inserts_df = (
    current_df.limit(500)
    .withColumn("user_session", concat(lit("NEW_"), col("user_session")))
)

incremental_df = updates_df.unionByName(inserts_df)

incremental_df.count()


#Count BEFORE MERGE

In [0]:
before_count = spark.read.table("events_managed_day5").count()
before_count


#MERGE (UPSERT) - Both Updates and Insert

In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, "events_managed_day5")

(
    deltaTable.alias("t")
    .merge(
        incremental_df.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


#Count AFTER MERGE
Record Count = Baseline count # 67501979(inclussive of 500 updates) + 500(Inserts) = 67502479

In [0]:
after_count = spark.read.table("events_managed_day5").count()
after_count


#Verify INSERTS

In [0]:
%sql
SELECT COUNT(*) 
FROM events_managed_day5
WHERE user_session LIKE 'NEW_%';


#Delta History (Audit Proof)

In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;


#Business Validation Using Time Travel (Strong, Optional)
Why this cell matters

This cell shows a real-world debugging scenario:

Business notices a sudden spike/drop in certain events

Data engineer verifies what changed using Delta time travel

#Current Version ‚Äì Business Metrics

In [0]:
%sql
SELECT event_type, COUNT(*) AS cnt
FROM events_managed_day5
GROUP BY event_type
ORDER BY cnt DESC;


What this tells you

Current state of the business events

Reflects results after MERGE (updates + inserts)

#Compare with Version 0 (Before Any MERGE)

In [0]:
%sql
SELECT event_type, COUNT(*) AS cnt
FROM events_managed_day5 VERSION AS OF 0
GROUP BY event_type
ORDER BY cnt DESC;


Key Takeaway

Time travel allows side-by-side comparison

No backups

No data restore

Pure metadata-based versioning

This is exactly how production issues are debugged.

Why OPTIMIZE Is Needed (Concept Cell ‚Äì No Execution)
What happens after MERGE operations?

After multiple MERGE runs:

Delta creates many small files

Each MERGE writes new data files

Old files are retained for history

‚ùå Problems Without OPTIMIZE

Slower reads

Increased shuffle

File fragmentation

Inefficient I/O

‚úÖ What OPTIMIZE Does

Compacts small files into larger ones

Improves scan efficiency

Reduces task overhead

Keeps logical data unchanged



In [0]:
%sql
OPTIMIZE events_managed_day5
ZORDER BY (event_type, user_id);


What Happens Internally

Rewrites Delta files

Clusters similar values together

Improves performance for:

WHERE event_type = 'purchase'

Queries filtered by user_id

üìå ZORDER is read-optimization, not indexing.

#Observe Table After OPTIMIZE
numFiles ‚Üí should be lower

sizeInBytes ‚Üí roughly same

format ‚Üí delta

In [0]:
%sql
DESCRIBE DETAIL events_managed_day5;


Key Insight

Same data, same results ‚Äî better physical storage

#Time Travel Still Works After OPTIMIZE


Many engineers fear OPTIMIZE breaks history ‚Äî it doesn‚Äôt.
You Should Now See

Version from initial load

MERGE version(s)

OPTIMIZE version

In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;


#VACUUM ‚Äì Storage Cleanup
Deletes physically unused files

Keeps metadata + transaction history

Frees storage space
Delta defaults to 7 days to protect time travel.

In [0]:
%sql
VACUUM events_managed_day5 RETAIN 168 HOURS;


#Why This Is Safe

Keeps 7 days of rollback capability

No accidental data loss

Industry-standard practice

#Final Verification After VACUUM

In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;


#Key Observations

History still present

OPTIMIZE & MERGE entries remain

Only old physical files removed

In [0]:
%sql
select count(*) from events_managed_day5

#Truncate Table


In [0]:
%sql

TRUNCATE TABLE  events_managed_day5;

In [0]:
%sql
DESCRIBE HISTORY events_managed_day5;