In [0]:
from delta.tables import DeltaTable



### # MERGE for incremental updates

### # Reference an existing Delta Lake table from Unity Catalog using its fully qualified name.
# This allows us to perform transactional operations such as MERGE, UPDATE, and DELETE.

In [0]:

deltaTable = DeltaTable.forName(spark, "workspace.ecommerce.ecommerce_data")

### # Read the November ecommerce CSV file from Unity Catalog volume into a Spark DataFrame


In [0]:
updates = spark.read.options( header=True, inferSchema=True).csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv")


In [0]:
updates.show()

### # Deduplicate source data by keeping only the latest record per user_session and event_time using a window function


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window_spec = Window.partitionBy(
    "user_session", "event_time"
).orderBy(
    col("event_time").desc()
)

updates_dedup = updates.withColumn(
    "rn", row_number().over(window_spec)
).filter("rn = 1") \
 .drop("rn")


### # Perform an upsert into the Delta table by updating matched records and inserting new records based on user_session and event_time


In [0]:
deltaTable.alias("t").merge(
    updates_dedup.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()


### -- Query the initial snapshot (version 0) of the Delta table using Delta Lake time travel



In [0]:
%sql
SELECT *
FROM workspace.ecommerce.ecommerce_data
VERSION AS OF 0;


### # Optimize


In [0]:


spark.sql("OPTIMIZE workspace.ecommerce.ecommerce_data ZORDER BY (event_type, user_id)")
spark.sql("VACUUM workspace.ecommerce.ecommerce_data RETAIN 168 HOURS")
