In [0]:
# Load data - Explore the smaller October dataset
df_oct = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)

In [0]:
df_oct.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/df_oct_delta_day_5")


In [0]:
df_oct.write.format("delta").saveAsTable("df_oct_table_day_5_table")

#### MERGE = Upsert: update existing rows, insert new ones

In [0]:
# Inspect columns in the main table

spark.table("df_oct_table_day_5_table").printSchema()


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [0]:
from delta.tables import DeltaTable

# Load the main Delta table
delta_table = DeltaTable.forName(spark, "df_oct_table_day_5_table")

# Sample data for updates/inserts
updates = [
    ("2025-01-01 10:00:00", "purchase", 999999, 8888888888888, "cat.test", "brandZ", 99.99, 111111, "session111"),
    ("2025-01-01 11:00:00", "cart",     555555, 7777777777777, "cat.new",  "brandY", 49.49, 222222, "session222")
]

columns = ["event_time","event_type","product_id","category_id",
           "category_code","brand","price","user_id","user_session"]

df_updates = spark.createDataFrame(updates, schema=columns)

# Perform the merge using user_id as the match key
delta_table.alias("tgt").merge(
    df_updates.alias("src"),
    "tgt.user_id = src.user_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

Check the different versions of our Delta table

In [0]:
%sql
DESCRIBE HISTORY df_oct_table_day_5_table;


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2026-01-14T15:27:46.000Z,74403994742972,maisondemaitre+databricks@hotmail.com,MERGE,"Map(predicate -> [""(cast(user_id#13810 as bigint) = user_id#13855L)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> true, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(2719560714653473),0114-151445-n91pfkwy-v2n,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 4816, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 3186, materializeSourceTimeMs -> 175, numTargetRowsInserted -> 2, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1508, numTargetRowsUpdated -> 0, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1357)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-14T15:16:32.000Z,74403994742972,maisondemaitre+databricks@hotmail.com,CREATE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2719560714653473),0114-151445-n91pfkwy-v2n,,WriteSerializable,True,"Map(numFiles -> 43, numOutputRows -> 42448764, numOutputBytes -> 727560588)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


Display the first 5 rows from version 1 (new version after MERGE) of the table.

In [0]:
%sql
SELECT *
FROM df_oct_table_day_5_table
VERSION AS OF 1
LIMIT 5;


event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-19T06:15:06.000Z,view,49300013,2125931803410694331,,weekend,875.18,515837919,0df7d269-da26-4087-b593-faee853ab0b5
2019-10-19T06:15:06.000Z,view,12706258,2053013553559896355,,nokian,60.23,537960539,472c6bd5-6e1f-437f-aa84-a112885fefe2
2019-10-19T06:15:06.000Z,view,4804056,2053013554658804075,electronics.audio.headphone,apple,160.62,561794333,12a4b25d-21e7-4530-92f5-9f8d633b130b
2019-10-19T06:15:06.000Z,view,1004903,2053013555631882655,electronics.smartphone,huawei,111.7,553521751,9ad39586-648d-4b2d-932e-ec25acaed778
2019-10-19T06:15:06.000Z,view,1004587,2053013555631882655,electronics.smartphone,inoi,61.49,553442516,bad9e161-d7fe-4136-be2d-ccfba436b721


Compare record counts in historical as well as new version

In [0]:
%sql

SELECT COUNT(*) AS historical_count
FROM df_oct_table_day_5_table
VERSION AS OF 0;


historical_count
42448764


In [0]:
%sql
SELECT COUNT(*) AS current_count
FROM df_oct_table_day_5_table;


current_count
42448766


Run this to OPTIMIZE the whole table

In [0]:
%sql
OPTIMIZE df_oct_table_day_5_table

path,metrics
,"List(14, 45, List(50909022, 55969823, 5.205239414285714E7, 14, 728733518), List(2395, 17695971, 1.616812008888889E7, 45, 727565404), 0, null, null, 0, 1, 45, 0, true, 0, 0, 1768405045891, 1768405054955, 8, 14, null, List(0, 0), null, 9, 9, 38870, 0, null)"


VACCUM in action - Uses default retention period of 7 days. Deletes anything older.

In [0]:
%sql
VACUUM df_oct_table_day_5_table RETAIN 168 HOURS;

path
