Skip to content

Commit

Permalink
fix: Null replication values are now handled when incrementing bookma…
Browse files Browse the repository at this point in the history
…rks (#2438)

* fix: Handle null replication values

* Add test
  • Loading branch information
edgarrmondragon committed May 19, 2024
1 parent 0132f0b commit 08cddd2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
5 changes: 5 additions & 0 deletions singer_sdk/helpers/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ def increment_state(
progress_dict = stream_or_partition_state[PROGRESS_MARKERS]
old_rk_value = to_json_compatible(progress_dict.get("replication_key_value"))
new_rk_value = to_json_compatible(latest_record[replication_key])

if new_rk_value is None:
logger.warning("New replication value is null")
return

if old_rk_value is None or not check_sorted or new_rk_value >= old_rk_value:
progress_dict["replication_key"] = replication_key
progress_dict["replication_key_value"] = new_rk_value
Expand Down
28 changes: 28 additions & 0 deletions tests/core/test_state_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import logging

import pytest

from singer_sdk.helpers import _state
Expand Down Expand Up @@ -127,3 +129,29 @@ def test_irresumable_state():
"replication_key_value": "2021-05-17T20:41:16Z",
},
}


def test_null_replication_value(caplog):
stream_state = {
"replication_key": "updated_at",
"replication_key_value": "2021-05-17T20:41:16Z",
}
latest_record = {"updated_at": None}
replication_key = "updated_at"
is_sorted = True
check_sorted = False

with caplog.at_level(logging.WARNING):
_state.increment_state(
stream_state,
latest_record=latest_record,
replication_key=replication_key,
is_sorted=is_sorted,
check_sorted=check_sorted,
)

assert (
stream_state["replication_key_value"] == "2021-05-17T20:41:16Z"
), "State should not be updated."
assert caplog.records[0].levelname == "WARNING"
assert "is null" in caplog.records[0].message

0 comments on commit 08cddd2

Please sign in to comment.