In [1]:
from typing import List
import polars as pl
from deltalake import DeltaTable
from datetime import datetime
from deltalake.exceptions import TableNotFoundError

## Utility functions

In [2]:
from datetime import timezone

def try_merge_delta(df, table_path, predicate, content_predicate=None):
    try:
        output = (
            df
            .write_delta(
                table_path,      
                mode='merge',
                delta_merge_options={
                    'predicate': predicate,
                    'source_alias': 'source',
                    'target_alias': 'target',
                    'large_dtypes': False, 
                },
                delta_write_options={"schema_mode": "merge"},
            )
            .when_matched_update_all(predicate=content_predicate)
            .when_not_matched_insert_all()
            .execute()
        )
    except TableNotFoundError:
        print(f"Table {table_path} does not exist, creating it.")
        output = (
            df
            .write_delta(
                table_path,
                mode='error',
                delta_write_options={
                    "configuration": {
                        "delta.minWriterVersion": "7",
                        "delta.minReaderVersion": "3", 
                        "delta.enableChangeDataFeed": "true"
                    }
                }
            )
        )

    return output


def delta_changes_from_timestamp(
        path: str, 
        key_columns: List[str],
        starting_timestamp: datetime = None,
        ending_timestamp: datetime = None,
        drop_cdf_columns: bool = True
) -> pl.DataFrame:

    def _date(version: datetime):
        if version.tzinfo is None:
            version = version.astimezone(timezone.utc)
        return version

    dt = DeltaTable(path)
    dttable = dt.load_cdf(
        starting_timestamp=_date(starting_timestamp).isoformat() if starting_timestamp else None,
        ending_timestamp=_date(ending_timestamp).isoformat() if ending_timestamp else None,
    ).read_all()
    pt = pl.from_arrow(dttable)

    if starting_timestamp: 
        pt = pt.filter(pl.col("_commit_timestamp") > pl.lit(starting_timestamp).cast(pl.Datetime("ms")))

    # Get only the latest state for each row
    changed_rows = (
        pt
        .with_columns(rn=pl.col("_commit_version").rank("dense", descending=True).over(key_columns))
        .filter(pl.col("rn") == 1)
        .filter(pl.col("_change_type").is_in(["insert", "update_postimage"]))
        .drop("rn")
    )
    if drop_cdf_columns:
        changed_rows = changed_rows.drop("_change_type", "_commit_version", "_commit_timestamp")

    return changed_rows


def delta_changes_from_version(
        path: str, 
        key_columns: List[str],
        starting_version: int = 0,
        ending_version: int = None,
        drop_cdf_columns: bool = True,
) -> pl.DataFrame:

    dt = DeltaTable(path)
    dttable = dt.load_cdf(
        starting_version=starting_version,
        ending_version=ending_version,
    ).read_all()
    pt = pl.from_arrow(dttable)

    if starting_version: 
        pt = pt.filter(pl.col("_commit_version") > starting_version)

    # Get only the latest state for each row
    changed_rows = (
        pt
        .with_columns(rn=pl.col("_commit_version").rank("dense", descending=True).over(key_columns))
        .filter(pl.col("rn") == 1)
        .filter(pl.col("_change_type").is_in(["insert", "update_postimage"]))
        .drop("rn")
    )
    if drop_cdf_columns:
        changed_rows = changed_rows.drop("_change_type", "_commit_version", "_commit_timestamp")

    return changed_rows


def read_watermark(table_path, watermark_table):
    watermark_df = pl.read_delta(watermark_table).filter(pl.col("table_path") == table_path)
    watermark = watermark_df.select("watermark_timestamp").item() if watermark_df.select(pl.len()).item() != 0 else None

    return watermark

## Parameters

In [None]:
source_table_path = "_output/test/incremental"
target_table_path = "_output/target"
watermark_table = "_output/watermark"
merge_columnns = ["id"]
content_columns = ["value"]

merge_predicate = " AND ".join(f"source.{c} == target.{c}" for c in merge_columnns)
content_predicate = " OR ".join(f"(source.{f} != target.{f})" for f in content_columns)

## Create and update delta

In [4]:
t0 = datetime.now(timezone.utc)
df = pl.DataFrame({
    "id": ["a", "b", "c"],
    "value": [1, 1, 1]
})
try_merge_delta(df, table_path=source_table_path, predicate=merge_predicate, content_predicate=content_predicate)

Table _output/test/incremental11 does not exist, creating it.


In [5]:
t1 = datetime.now(timezone.utc)
df = pl.DataFrame({
    "id": ["a", "d"],
    "value": [5, 5],
})
try_merge_delta(df, table_path=source_table_path, predicate=merge_predicate, content_predicate=content_predicate)

{'num_source_rows': 2,
 'num_target_rows_inserted': 1,
 'num_target_rows_updated': 1,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 2,
 'num_output_rows': 4,
 'num_target_files_scanned': 1,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 5,
 'num_target_files_removed': 1,
 'execution_time_ms': 251,
 'scan_time_ms': 0,
 'rewrite_time_ms': 210}

In [6]:
t2 = datetime.now(timezone.utc)
df = pl.DataFrame({
    "id": ["b", "a", "d"],
    "value": [6, 6, 5],
})
try_merge_delta(df, table_path=source_table_path, predicate=merge_predicate, content_predicate=content_predicate)

{'num_source_rows': 3,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 2,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 1,
 'num_output_rows': 3,
 'num_target_files_scanned': 2,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 5,
 'num_target_files_removed': 1,
 'execution_time_ms': 212,
 'scan_time_ms': 0,
 'rewrite_time_ms': 175}

### Show history of the table and the metadata of CDF

In [7]:
dt = DeltaTable(source_table_path)

pl.DataFrame(dt.history())

timestamp,operation,operationParameters,readVersion,operationMetrics,clientVersion,version
i64,str,struct[6],i64,struct[17],str,i64
1732906098043,"""MERGE""","{""[]"",""[{""actionType"":""update"",""predicate"":""source.value != target.value""}]"",""id BETWEEN 'a' AND 'd'"",""source.id = target.id"",""[{""actionType"":""insert""}]"",null}",1.0,"{212,3,3,5,1,2,0,1,0,0,2,175,0,null,null,null,null}","""delta-rs.0.22.1""",2
1732906097792,"""MERGE""","{""[]"",""[{""actionType"":""update"",""predicate"":""source.value != target.value""}]"",""id BETWEEN 'a' AND 'd'"",""source.id = target.id"",""[{""actionType"":""insert""}]"",null}",0.0,"{251,4,2,5,1,1,0,2,0,1,1,210,0,null,null,null,null}","""delta-rs.0.22.1""",1
1732906097513,"""WRITE""","{null,null,null,null,null,""ErrorIfExists""}",,"{4,null,null,null,null,null,null,null,null,null,null,null,null,1,3,0,0}","""delta-rs.0.22.1""",0


In [8]:
dt.metadata().configuration

{'delta.minReaderVersion': '3',
 'delta.enableChangeDataFeed': 'true',
 'delta.minWriterVersion': '7'}

In [9]:
dt.protocol()

ProtocolVersions(min_reader_version=3, min_writer_version=7, writer_features=['changeDataFeed'], reader_features=None)

## Read CDF to read rows from a timestamp

In [10]:
t0.isoformat()

'2024-11-29T18:48:17.491118+00:00'

In [11]:
delta_changes_from_timestamp(source_table_path, starting_timestamp=t0, key_columns=merge_columnns, drop_cdf_columns=False)

id,value,_change_type,_commit_version,_commit_timestamp
str,i64,str,i64,datetime[ms]
"""b""",6,"""update_postimage""",2,2024-11-29 18:48:18.043
"""a""",6,"""update_postimage""",2,2024-11-29 18:48:18.043
"""d""",5,"""insert""",1,2024-11-29 18:48:17.792
"""c""",1,"""insert""",0,2024-11-29 18:48:17.513


In [12]:
delta_changes_from_timestamp(source_table_path, starting_timestamp=t1, key_columns=merge_columnns)

id,value
str,i64
"""d""",5
"""b""",6
"""a""",6


In [13]:
delta_changes_from_timestamp(source_table_path, starting_timestamp=t2, key_columns=merge_columnns, drop_cdf_columns=False)

id,value,_change_type,_commit_version,_commit_timestamp
str,i64,str,i64,datetime[ms]
"""b""",6,"""update_postimage""",2,2024-11-29 18:48:18.043
"""a""",6,"""update_postimage""",2,2024-11-29 18:48:18.043


## First time running the incremental pipeline

Create empty watermark the first time

In [14]:
from deltalake import DeltaTable

empty_watermark = pl.DataFrame([], schema= pl.Schema([
    ('table_path', pl.String),
    ('merge_columns', pl.List(pl.String)),
    ('content_columns', pl.List(pl.String)),
    ('watermark_version', pl.Int64),
    ('watermark_timestamp', pl.Datetime(time_unit='us', time_zone=None))]
))

DeltaTable.create(watermark_table, schema=empty_watermark.to_arrow().schema, mode="overwrite")


DeltaTable()

## On each update
- Read old watemark
- Use it to read the latest status of all created or updated rows from the watermark
- Do any transformation and write into output
- Update watermark

In [15]:
low_watermark = read_watermark(target_table_path, watermark_table)

rows = delta_changes_from_timestamp(source_table_path, starting_timestamp=low_watermark, key_columns=merge_columnns, drop_cdf_columns=False)
new_watermark_version = rows.select(pl.max("_commit_version")).item()
new_watermark_ts = rows.filter(pl.col("_commit_version") == new_watermark_version).select(pl.max("_commit_timestamp")).item()

print(new_watermark_version, new_watermark_ts)
rows

2 2024-11-29 18:48:18.043000


id,value,_change_type,_commit_version,_commit_timestamp
str,i64,str,i64,datetime[ms]
"""d""",5,"""insert""",1,2024-11-29 18:48:17.792
"""b""",6,"""update_postimage""",2,2024-11-29 18:48:18.043
"""a""",6,"""update_postimage""",2,2024-11-29 18:48:18.043
"""c""",1,"""insert""",0,2024-11-29 18:48:17.513


In [None]:
# Do something with rows to write rows into target_table_path
...

In [17]:
watermark_update = pl.DataFrame(
    data=[[target_table_path, merge_columnns, content_columns, new_watermark_version, new_watermark_ts]], 
    schema=["table_path", "merge_columns", "content_columns", "watermark_version", "watermark_timestamp"], 
    orient="row"
)

try_merge_delta(watermark_update, watermark_table, "source.table_path == target.table_path")

{'num_source_rows': 1,
 'num_target_rows_inserted': 1,
 'num_target_rows_updated': 0,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 1,
 'num_target_files_scanned': 0,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 1,
 'num_target_files_removed': 0,
 'execution_time_ms': 72,
 'scan_time_ms': 0,
 'rewrite_time_ms': 4}

## Another update and lets see how the watermark works

In [18]:
df = pl.DataFrame({
    "id": ["b", "e"],
    "value": [8, 9],
})
try_merge_delta(df, table_path=source_table_path, predicate=merge_predicate, content_predicate=content_predicate)

{'num_source_rows': 2,
 'num_target_rows_inserted': 1,
 'num_target_rows_updated': 1,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 2,
 'num_output_rows': 4,
 'num_target_files_scanned': 2,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 5,
 'num_target_files_removed': 1,
 'execution_time_ms': 236,
 'scan_time_ms': 0,
 'rewrite_time_ms': 179}

This is the input watermark

In [19]:
pl.read_delta(watermark_table)

table_path,merge_columns,content_columns,watermark_version,watermark_timestamp
str,list[str],list[str],i64,datetime[μs]
"""_output/target""","[""id""]","[""value""]",2,2024-11-29 18:48:18.043


In [20]:
low_watermark = read_watermark(target_table_path, watermark_table)

rows = delta_changes_from_timestamp(source_table_path, starting_timestamp=low_watermark, key_columns=merge_columnns, drop_cdf_columns=False)
new_watermark_version = rows.select(pl.max("_commit_version")).item()
new_watermark_ts = rows.filter(pl.col("_commit_version") == new_watermark_version).select(pl.max("_commit_timestamp")).item()

print(new_watermark_version, new_watermark_ts)
rows

3 2024-11-29 18:48:18.793000


id,value,_change_type,_commit_version,_commit_timestamp
str,i64,str,i64,datetime[ms]
"""b""",8,"""update_postimage""",3,2024-11-29 18:48:18.793
"""e""",9,"""insert""",3,2024-11-29 18:48:18.793


In [None]:
# Do something with rows to write rows into target_table_path
...

In [22]:
if new_watermark_ts is not None:
    watermark_update = pl.DataFrame(
        data=[[target_table_path, merge_columnns, content_columns, new_watermark_version, new_watermark_ts]], 
        schema=["table_path", "merge_columns", "content_columns", "watermark_version", "watermark_timestamp"], 
        orient="row"
    )

    try_merge_delta(watermark_update, watermark_table, "source.table_path == target.table_path")

### Lets try to read the watermark again without updates

In [23]:
read_watermark(target_table_path, watermark_table)

datetime.datetime(2024, 11, 29, 18, 48, 18, 793000)

In [24]:
low_watermark = read_watermark(target_table_path, watermark_table)

rows = delta_changes_from_timestamp(source_table_path, starting_timestamp=low_watermark, key_columns=merge_columnns, drop_cdf_columns=False)
new_watermark_version = rows.select(pl.max("_commit_version")).item()
new_watermark_ts = rows.filter(pl.col("_commit_version") == new_watermark_version).select(pl.max("_commit_timestamp")).item() if new_watermark_version else None

print(new_watermark_version, new_watermark_ts)
rows

None None


id,value,_change_type,_commit_version,_commit_timestamp
str,i64,str,i64,datetime[ms]


## Deep dive on CDF data

In [25]:
from pathlib import Path
import json

change_data_path = (Path(source_table_path) / "_change_data/*")
log_data_path = next((Path(source_table_path) / "_delta_log").glob("*0001.json"))

delta_log = pl.read_ndjson(log_data_path)
change_data_details = pl.read_parquet(str(change_data_path))

In [26]:
print(log_data_path.read_text())

{"add":{"path":"part-00001-439e3da8-75f9-4cf6-ade2-02c9a9787a16-c000.snappy.parquet","partitionValues":{},"size":745,"modificationTime":1732906097587,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":\"d\",\"value\":5},\"maxValues\":{\"value\":5,\"id\":\"d\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"add":{"path":"part-00001-0df68d68-d8a0-42bc-836d-209e76e0c4b9-c000.snappy.parquet","partitionValues":{},"size":768,"modificationTime":1732906097588,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"id\":\"a\",\"value\":1},\"maxValues\":{\"value\":5,\"id\":\"c\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"cdc":{"path":"_change_data/part-00001-6733794a-170e-483c-a9e5-fd7b8c223a42-c000.snappy.parquet","size":1117,"partitionValues":{},"dataChange":fals

In [27]:
delta_log

add,cdc,remove,commitInfo
struct[11],struct[4],struct[6],struct[6]
"{""part-00001-439e3da8-75f9-4cf6-ade2-02c9a9787a16-c000.snappy.parquet"",{},745,1732906097587,true,""{""numRecords"":1,""minValues"":{""id"":""d"",""value"":5},""maxValues"":{""value"":5,""id"":""d""},""nullCount"":{""value"":0,""id"":0}}"",null,null,null,null,null}",,,
"{""part-00001-0df68d68-d8a0-42bc-836d-209e76e0c4b9-c000.snappy.parquet"",{},768,1732906097588,true,""{""numRecords"":3,""minValues"":{""id"":""a"",""value"":1},""maxValues"":{""value"":5,""id"":""c""},""nullCount"":{""value"":0,""id"":0}}"",null,null,null,null,null}",,,
,"{""_change_data/part-00001-6733794a-170e-483c-a9e5-fd7b8c223a42-c000.snappy.parquet"",1117,{},false}",,
,"{""_change_data/part-00001-97325e1e-9c00-47c9-86fd-491631169281-c000.snappy.parquet"",1124,{},false}",,
,"{""_change_data/part-00001-bfef71f4-cb9e-43c9-ab62-fa227e41b331-c000.snappy.parquet"",1054,{},false}",,
,,"{""part-00001-6932f434-1dd2-4006-85cc-3ba81e732aea-c000.snappy.parquet"",true,1732906097792,true,{},759}",
,,,"{1732906097792,""MERGE"",{""[]"",""[{""actionType"":""update"",""predicate"":""source.value != target.value""}]"",""[{""actionType"":""insert""}]"",""id BETWEEN 'a' AND 'd'"",""source.id = target.id""},""delta-rs.0.22.1"",0,{251,4,2,5,1,1,0,2,0,1,1,210,0}}"


In [28]:
change_data_details

id,value,_change_type
str,i64,str
"""a""",1,"""update_preimage"""
"""b""",6,"""update_postimage"""
"""a""",5,"""update_preimage"""
"""a""",5,"""update_postimage"""
"""e""",9,"""insert"""
"""a""",6,"""update_postimage"""
"""b""",8,"""update_postimage"""
"""d""",5,"""insert"""
"""b""",6,"""update_preimage"""
"""b""",1,"""update_preimage"""
