diff --git a/README.md b/README.md index 1fc3900c..25484525 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Quix Streams has the following benefits: - Support for many serialization formats, including JSON (and Quix-specific). - Support for stateful operations using RocksDB. - Support for aggregations over tumbling and hopping time windows. -- "At-least-once" Kafka processing guarantees. +- "At-least-once" and "exactly-once" Kafka processing guarantees. - Designed to run and scale resiliently via container orchestration (like Kubernetes). - Easily runs locally and in Jupyter Notebook for convenient development and debugging. - Seamless integration with the fully managed [Quix Cloud](https://quix.io/product) platform. @@ -160,9 +160,9 @@ Here are some of the planned improvements: - [x] [Windowed aggregations over Tumbling & Hopping windows](https://quix.io/docs/quix-streams/v2-0-latest/windowing.html) - [x] [State recovery based on Kafka changelog topics](https://quix.io/docs/quix-streams/advanced/stateful-processing.html#fault-tolerance-recovery) - [x] [Group-by operation](https://quix.io/docs/quix-streams/groupby.html) +- [X] ["Exactly Once" delivery guarantees for Kafka message processing (AKA transactions)](https://quix.io/docs/quix-streams/configuration.html#processing-guarantees) - [ ] Joins - [ ] Windowed aggregations over Sliding windows -- [ ] "Exactly Once" delivery guarantees for Kafka message processing (AKA transactions) - [ ] Support for Avro and Protobuf formats - [ ] Schema Registry support diff --git a/quixstreams/app.py b/quixstreams/app.py index d7884716..e6ae0cda 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -4,7 +4,7 @@ import os import signal import warnings -from typing import Optional, List, Callable, Union, get_args +from typing import Optional, List, Callable, Union, Literal, get_args from confluent_kafka import TopicPartition from typing_extensions import Self @@ -40,11 +40,11 @@ from .state import StateStoreManager from .state.recovery import RecoveryManager from .state.rocksdb import RocksDBOptionsType -from .types import ProcessingGuarantee, ExactlyOnceSemantics, AtLeastOnceSemantics __all__ = ("Application",) logger = logging.getLogger(__name__) +ProcessingGuarantee = Literal["at-least-once", "exactly-once"] MessageProcessedCallback = Callable[[str, int, int], None] # Enforce idempotent producing for the internal RowProducer @@ -184,15 +184,10 @@ def __init__( """ configure_logging(loglevel=loglevel) - if processing_guarantee in get_args(ExactlyOnceSemantics): - exactly_once = True - elif processing_guarantee in get_args(AtLeastOnceSemantics): - exactly_once = False - else: + if processing_guarantee not in get_args(ProcessingGuarantee): raise ValueError( - f'Must provide a valid "processing_guarantee"; expected: ' - f"{[*get_args(ExactlyOnceSemantics), *get_args(AtLeastOnceSemantics)]}" - f', got "{processing_guarantee}"' + f'Must provide a valid "processing_guarantee"; expected one of: ' + f'{get_args(ProcessingGuarantee)}, got "{processing_guarantee}"' ) producer_extra_config = producer_extra_config or {} @@ -263,6 +258,7 @@ def __init__( self._commit_interval = commit_interval self._producer_extra_config = producer_extra_config self._consumer_extra_config = consumer_extra_config + self._processing_guarantee = processing_guarantee self._consumer = RowConsumer( broker_address=broker_address, consumer_group=consumer_group, @@ -279,7 +275,7 @@ def __init__( "max.poll.interval.ms", _default_max_poll_interval_ms ) / 1000, # convert to seconds - transactional=exactly_once, + transactional=self._uses_exactly_once, ) self._consumer_poll_timeout = consumer_poll_timeout self._producer_poll_timeout = producer_poll_timeout @@ -288,7 +284,6 @@ def __init__( self._auto_create_topics = auto_create_topics self._running = False self._failed = False - self._exactly_once = exactly_once if not topic_manager: topic_manager = topic_manager_factory( @@ -320,13 +315,17 @@ def __init__( producer=self._producer, consumer=self._consumer, state_manager=self._state_manager, - exactly_once=exactly_once, + exactly_once=self._uses_exactly_once, ) @property - def is_quix_app(self): + def is_quix_app(self) -> bool: return self._is_quix_app + @property + def _uses_exactly_once(self) -> bool: + return self._processing_guarantee == "exactly-once" + @classmethod def Quix( cls, @@ -349,7 +348,7 @@ def Quix( topic_manager: Optional[QuixTopicManager] = None, request_timeout: float = 30, topic_create_timeout: float = 60, - processing_guarantee: ProcessingGuarantee = "exactly-once", + processing_guarantee: Literal["at-least-once", "exactly-once"] = "exactly-once", ) -> Self: """ >***NOTE:*** DEPRECATED: use Application with `quix_sdk_token` argument instead. @@ -721,14 +720,13 @@ def run( """ self._setup_signal_handlers() - guarantee = "exactly-once" if self._exactly_once else "at-least-once" logger.info( f"Starting the Application with the config: " f'broker_address="{self._broker_address}" ' f'consumer_group="{self._consumer_group}" ' f'auto_offset_reset="{self._auto_offset_reset}" ' f"commit_interval={self._commit_interval}s " - f'processing_guarantee="{guarantee}"' + f'processing_guarantee="{self._processing_guarantee}"' ) if self.is_quix_app: self._quix_runtime_init() diff --git a/quixstreams/checkpointing/checkpoint.py b/quixstreams/checkpointing/checkpoint.py index 2954655c..02ddccbc 100644 --- a/quixstreams/checkpointing/checkpoint.py +++ b/quixstreams/checkpointing/checkpoint.py @@ -107,6 +107,15 @@ def get_store_transaction( self._store_transactions[(topic, partition, store_name)] = transaction return transaction + def close(self): + """ + Perform cleanup (when the checkpoint is empty) instead of committing. + + Needed for exactly-once, as Kafka transactions are timeboxed. + """ + if self._exactly_once: + self._producer.abort_transaction() + def commit(self): """ Commit the checkpoint. @@ -118,12 +127,6 @@ def commit(self): 4. Flush each state store partition to the disk. """ - if not self._tp_offsets: - logger.debug("Nothing to commit") - if self._exactly_once: - self._producer.abort_transaction() - return - # Step 1. Produce the changelogs for ( topic, @@ -160,7 +163,7 @@ def commit(self): offsets, self._consumer.consumer_group_metadata() ) else: - logger.debug("Checkpoint: commiting consumer") + logger.debug("Checkpoint: committing consumer") try: partitions = self._consumer.commit(offsets=offsets, asynchronous=False) except KafkaException as e: diff --git a/quixstreams/kafka/producer.py b/quixstreams/kafka/producer.py index 18c7282f..9c1269d0 100644 --- a/quixstreams/kafka/producer.py +++ b/quixstreams/kafka/producer.py @@ -213,7 +213,6 @@ def _producer(self) -> ConfluentProducer: return self._inner_producer def begin_transaction(self): - logger.debug("Starting Kafka transaction...") self._producer.begin_transaction() def send_offsets_to_transaction( @@ -227,11 +226,7 @@ def send_offsets_to_transaction( ) def abort_transaction(self, timeout: Optional[float] = None): - logger.debug("Aborting Kafka transaction...") self._producer.abort_transaction(timeout if timeout is not None else -1) - logger.debug("Kafka transaction aborted successfully!") def commit_transaction(self, timeout: Optional[float] = None): - logger.debug("Committing Kafka transaction...") self._producer.commit_transaction(timeout if timeout is not None else -1) - logger.debug("Kafka transaction committed successfully!") diff --git a/quixstreams/processing_context.py b/quixstreams/processing_context.py index 22b1173e..ff48087e 100644 --- a/quixstreams/processing_context.py +++ b/quixstreams/processing_context.py @@ -64,23 +64,25 @@ def init_checkpoint(self): def commit_checkpoint(self, force: bool = False): """ - Commit the current checkpoint. + Attempts finalizing the current Checkpoint only if the Checkpoint is "expired", + or `force=True` is passed, otherwise do nothing. - The actual commit will happen only when: + To finalize: the Checkpoint will be committed if it has any stored offsets, + else just close it. A new Checkpoint is then created. - 1. The checkpoint has at least one stored offset - 2. The checkpoint is expired or `force=True` is passed - - :param force: if `True`, commit the checkpoint before its expiration deadline. + :param force: if `True`, commit the Checkpoint before its expiration deadline. """ if self._checkpoint.expired() or force: - logger.debug(f"Attempting checkpoint commit; forced={force}") - start = time.monotonic() - self._checkpoint.commit() - elapsed = round(time.monotonic() - start, 2) - logger.debug( - f"Committed a checkpoint; forced={force}, time_elapsed={elapsed}s" - ) + if self._checkpoint.empty(): + self._checkpoint.close() + else: + logger.debug(f"Committing a checkpoint; forced={force}") + start = time.monotonic() + self._checkpoint.commit() + elapsed = round(time.monotonic() - start, 2) + logger.debug( + f"Committed a checkpoint; forced={force}, time_elapsed={elapsed}s" + ) self.init_checkpoint() def __enter__(self): diff --git a/quixstreams/rowproducer.py b/quixstreams/rowproducer.py index 94f59418..4e858d1a 100644 --- a/quixstreams/rowproducer.py +++ b/quixstreams/rowproducer.py @@ -1,4 +1,5 @@ import logging +from functools import wraps from time import sleep from typing import Optional, Any, Union, Dict, Tuple, List, Callable @@ -18,6 +19,52 @@ _KEY_UNSET = object() +def _retriable_transaction_op(attempts: int, backoff_seconds: float): + """ + Some specific failure cases from sending offsets or committing a transaction + are retriable, which is worth re-attempting since the transaction is + almost complete (we flushed before attempting to commit). + + Intended as a wrapper for these methods. + """ + + def decorator(kafka_op: Callable): + @wraps(kafka_op) + def wrapper(*args, **kwargs): + attempts_remaining = attempts + op_name = kafka_op.__name__ + while attempts_remaining: + try: + return kafka_op(*args, **kwargs) + except KafkaException as e: + error = e.args[0] + if error.retriable(): + attempts_remaining -= 1 + logger.debug( + f"Kafka transaction operation {op_name} failed, but " + f"can retry; attempts remaining: {attempts_remaining}. " + ) + if attempts_remaining: + logger.debug( + f"Sleeping for {backoff_seconds}s before retrying." + ) + sleep(backoff_seconds) + else: + # Just treat all errors besides retriable as fatal. + logger.error( + f"Error during Kafka transaction operation {op_name}" + ) + raise + raise KafkaProducerTransactionCommitFailed( + f"All Kafka {op_name} attempts failed; " + "aborting transaction and shutting down Application..." + ) + + return wrapper + + return decorator + + class KafkaProducerTransactionCommitFailed(QuixException): ... @@ -47,7 +94,7 @@ def __init__( broker_address: Union[str, ConnectionConfig], extra_config: dict = None, on_error: Optional[ProducerErrorCallback] = None, - flush_timeout: Optional[int] = None, + flush_timeout: Optional[float] = None, transactional: bool = False, ): @@ -197,12 +244,16 @@ def abort_transaction(self, timeout: Optional[float] = None): the Checkpoint inits another immediately after committing. """ if self._active_transaction: + if self._tp_offsets: + # Only log here to avoid polluting logging with empty checkpoint aborts + logger.debug("Aborting Kafka transaction and clearing producer offsets") + self._tp_offsets = {} self._producer.abort_transaction(timeout) self._active_transaction = False else: logger.debug("No Kafka transaction to abort") - def _retriable_commit_op(self, operation: Callable, args: list): + def _retriable_op(self, attempts: int, backoff_seconds: float): """ Some specific failure cases from sending offsets or committing a transaction are retriable, which is worth re-attempting since the transaction is @@ -211,49 +262,66 @@ def _retriable_commit_op(self, operation: Callable, args: list): NOTE: During testing, most other operations (including producing) did not generate "retriable" errors. """ - attempts_remaining = 3 - backoff_seconds = 1 - op_name = operation.__name__ - while attempts_remaining: - try: - return operation(*args) - except KafkaException as e: - error = e.args[0] - if error.retriable(): - attempts_remaining -= 1 - logger.debug( - f"Kafka transaction operation {op_name} failed, but can retry; " - f"attempts remaining: {attempts_remaining}. " - ) - if attempts_remaining: - logger.debug( - f"Sleeping for {backoff_seconds} seconds before retrying." - ) - sleep(backoff_seconds) - else: - # Just treat all errors besides retriable as fatal. - logger.error( - f"Error occurred during Kafka transaction operation {op_name}" - ) - raise - raise KafkaProducerTransactionCommitFailed( - f"All Kafka {op_name} attempts failed; " - "aborting transaction and shutting down Application..." - ) - def commit_transaction( + def decorator(kafka_op: Callable): + @wraps(kafka_op) + def wrapper(*args, **kwargs): + attempts_remaining = attempts + op_name = kafka_op.__name__ + while attempts_remaining: + try: + return kafka_op(*args, **kwargs) + except KafkaException as e: + error = e.args[0] + if error.retriable(): + attempts_remaining -= 1 + logger.debug( + f"Kafka transaction operation {op_name} failed, but " + f"can retry; attempts remaining: {attempts_remaining}. " + ) + if attempts_remaining: + logger.debug( + f"Sleeping for {backoff_seconds}s before retrying." + ) + sleep(backoff_seconds) + else: + # Just treat all errors besides retriable as fatal. + logger.error( + f"Error during Kafka transaction operation {op_name}" + ) + raise + raise KafkaProducerTransactionCommitFailed( + f"All Kafka {op_name} attempts failed; " + "aborting transaction and shutting down Application..." + ) + + return wrapper + + return decorator + + @_retriable_transaction_op(attempts=3, backoff_seconds=1.0) + def _send_offsets_to_transaction( self, positions: List[TopicPartition], group_metadata: GroupMetadata, timeout: Optional[float] = None, ): - self._retriable_commit_op( - self._producer.send_offsets_to_transaction, - [positions, group_metadata, timeout], - ) - self._retriable_commit_op(self._producer.commit_transaction, [timeout]) + self._producer.send_offsets_to_transaction(positions, group_metadata, timeout) + + @_retriable_transaction_op(attempts=3, backoff_seconds=1.0) + def _commit_transaction(self, timeout: Optional[float] = None): + self._producer.commit_transaction(timeout) self._active_transaction = False + def commit_transaction( + self, + positions: List[TopicPartition], + group_metadata: GroupMetadata, + timeout: Optional[float] = None, + ): + self._send_offsets_to_transaction(positions, group_metadata, timeout) + self._commit_transaction(timeout) + def __enter__(self): return self diff --git a/quixstreams/state/exceptions.py b/quixstreams/state/exceptions.py index 7ec21621..5d758d52 100644 --- a/quixstreams/state/exceptions.py +++ b/quixstreams/state/exceptions.py @@ -17,3 +17,6 @@ class InvalidStoreTransactionStateError(QuixException): ... class StoreTransactionFailed(QuixException): ... + + +class InvalidStoreChangelogOffset(QuixException): ... diff --git a/quixstreams/state/recovery.py b/quixstreams/state/recovery.py index 7712bbe1..eaa6c5e1 100644 --- a/quixstreams/state/recovery.py +++ b/quixstreams/state/recovery.py @@ -10,6 +10,7 @@ from quixstreams.rowproducer import RowProducer from quixstreams.state.types import StorePartition from quixstreams.utils.dicts import dict_values +from .exceptions import InvalidStoreChangelogOffset logger = logging.getLogger(__name__) @@ -84,11 +85,9 @@ def needs_recovery_check(self) -> bool: return has_consumable_offsets and state_potentially_behind @property - def needs_offset_update(self) -> bool: + def has_invalid_offset(self) -> bool: """ - Determine if an offset update is required. - - Usually checked during assign if recovery was not required. + Determine if the current changelog offset stored in state is invalid. """ return self._changelog_highwater and (self._changelog_highwater <= self.offset) @@ -100,26 +99,6 @@ def recovery_consume_position(self) -> Optional[int]: def had_recovery_changes(self) -> bool: return self._initial_offset != self.offset - def update_offset(self): - """ - Update only the changelog offset of a StorePartition. - """ - logger.info( - f"changelog partition {self.changelog_name}[{self.partition_num}] " - f"requires an offset update" - ) - if self.offset >= self._changelog_highwater: - logger.warning( - f"{self} - the changelog offset " - f"{self.offset} in state was greater or equal to its actual highwater " - f"{self._changelog_highwater}, possibly due to previous Kafka or " - f"network issues. State may be inaccurate for any affected keys. " - f"The offset will now be set to its highwater - 1." - ) - self._store_partition.set_changelog_offset( - changelog_offset=self._changelog_highwater - 1 - ) - def recover_from_changelog_message( self, changelog_message: ConfluentKafkaMessageProto ): @@ -379,10 +358,14 @@ def assign_partition( self._consumer.incremental_assign( [ConfluentPartition(changelog_name, partition, rp.offset)] ) - elif rp.needs_offset_update: - # nothing to recover, but offset is off...likely that offset > - # highwater due to At Least Once processing behavior. - rp.update_offset() + elif rp.has_invalid_offset: + raise InvalidStoreChangelogOffset( + "The offset in the state store is greater than or equal to its " + "respective changelog highwater. This can happen if the changelog " + "was deleted (and recreated) but the state store was not. The " + "invalid state store can be deleted by manually calling " + "Application.clear_state() before running the application again." + ) # Figure out if we need to pause any topic partitions if self._recovery_partitions: @@ -426,8 +409,9 @@ def revoke_partition(self, partition_num: int): :param partition_num: partition number of source topic """ if changelogs := self._recovery_partitions.get(partition_num, {}): - logger.debug(f"Stopping recovery for {changelogs}") - self._revoke_recovery_partitions(list(changelogs.values())) + recovery_partitions = list(changelogs.values()) + logger.debug(f"Stopping recovery for {list(map(str, recovery_partitions))}") + self._revoke_recovery_partitions(recovery_partitions) def _update_recovery_status(self): rp_revokes = [] diff --git a/quixstreams/types.py b/quixstreams/types.py index ffbebf30..6a268826 100644 --- a/quixstreams/types.py +++ b/quixstreams/types.py @@ -1,8 +1,4 @@ -from typing import Protocol, Literal, Union - -ExactlyOnceSemantics = Literal["exactly-once", "EO", "EOS"] -AtLeastOnceSemantics = Literal["at-least-once", "ALO", "ALOS"] -ProcessingGuarantee = Literal[Union[ExactlyOnceSemantics, AtLeastOnceSemantics]] +from typing import Protocol class TopicPartition(Protocol): diff --git a/tests/test_quixstreams/fixtures.py b/tests/test_quixstreams/fixtures.py index 9bb0561c..31fb1786 100644 --- a/tests/test_quixstreams/fixtures.py +++ b/tests/test_quixstreams/fixtures.py @@ -10,7 +10,7 @@ NewPartitions, ) -from quixstreams.app import Application, MessageProcessedCallback +from quixstreams.app import Application, MessageProcessedCallback, ProcessingGuarantee from quixstreams.error_callbacks import ( ConsumerErrorCallback, ProducerErrorCallback, @@ -48,7 +48,6 @@ from quixstreams.rowproducer import RowProducer from quixstreams.state import StateStoreManager from quixstreams.state.recovery import RecoveryManager -from quixstreams.types import ProcessingGuarantee @pytest.fixture() @@ -294,7 +293,7 @@ def factory( auto_create_topics: bool = True, use_changelog_topics: bool = True, topic_manager: Optional[TopicManager] = None, - processing_guarantee: ProcessingGuarantee = "ALOS", + processing_guarantee: ProcessingGuarantee = "at-least-once", ) -> Application: state_dir = state_dir or (tmp_path / "state").absolute() return Application( diff --git a/tests/test_quixstreams/test_app.py b/tests/test_quixstreams/test_app.py index 86d534ba..3d511ab4 100644 --- a/tests/test_quixstreams/test_app.py +++ b/tests/test_quixstreams/test_app.py @@ -757,7 +757,7 @@ def get_app(fail: bool): auto_offset_reset="earliest", on_message_processed=on_message_processed, consumer_group=consumer_group, - processing_guarantee="EOS", + processing_guarantee="exactly-once", ) topic_in = app.topic(topic_in_name, value_deserializer="json") topic_out = app.topic(topic_out_name, value_serializer="json") diff --git a/tests/test_quixstreams/test_checkpointing.py b/tests/test_quixstreams/test_checkpointing.py index c73cfee8..8d227230 100644 --- a/tests/test_quixstreams/test_checkpointing.py +++ b/tests/test_quixstreams/test_checkpointing.py @@ -216,17 +216,14 @@ def test_commit_with_state_and_changelog_no_updates_success( assert not store_partition.get_processed_offset() @pytest.mark.parametrize("exactly_once", [False, True]) - def test_commit_no_offsets_stored_noop( + def test_close_no_offsets( self, checkpoint_factory, - state_manager_factory, - topic_factory, rowproducer_mock, exactly_once, ): - topic_name, _ = topic_factory() consumer_mock = MagicMock(spec_set=Consumer) - state_manager = state_manager_factory(producer=rowproducer_mock) + state_manager = MagicMock(spec_set=StateStoreManager) checkpoint = checkpoint_factory( consumer_=consumer_mock, state_manager_=state_manager, @@ -234,18 +231,13 @@ def test_commit_no_offsets_stored_noop( exactly_once=exactly_once, ) # Commit the checkpoint without processing any messages - checkpoint.commit() + checkpoint.close() - # The producer should not flush - assert not rowproducer_mock.flush.call_count - - # Check nothing is committed if exactly_once: # transaction should also be aborted assert rowproducer_mock.abort_transaction.call_count - assert not rowproducer_mock.commit_transaction.call_count else: - assert not consumer_mock.commit.call_count + assert not rowproducer_mock.abort_transaction.call_count @pytest.mark.parametrize("exactly_once", [False, True]) def test_commit_has_failed_transactions_fails( diff --git a/tests/test_quixstreams/test_rowproducer.py b/tests/test_quixstreams/test_rowproducer.py index 30cdfe80..6c78ce6f 100644 --- a/tests/test_quixstreams/test_rowproducer.py +++ b/tests/test_quixstreams/test_rowproducer.py @@ -302,6 +302,7 @@ def consume_and_produce(consumer, producer): ) as consumer: _ = consume_and_produce(consumer, producer) producer.abort_transaction(2) + assert not producer.offsets # repeat, only this time we commit the transaction with row_consumer_factory( diff --git a/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py b/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py index b126c351..339b32c5 100644 --- a/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py +++ b/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py @@ -1,10 +1,11 @@ from unittest.mock import patch, MagicMock +import pytest from confluent_kafka import TopicPartition as ConfluentPartition from quixstreams.kafka import Consumer from quixstreams.models import TopicManager, TopicConfig -from quixstreams.state import RecoveryPartition +from quixstreams.state.exceptions import InvalidStoreChangelogOffset from quixstreams.state.rocksdb import RocksDBStorePartition from quixstreams.state.rocksdb.metadata import CHANGELOG_CF_MESSAGE_HEADER from tests.utils import ConfluentKafkaMessageStub @@ -92,7 +93,7 @@ def test_assign_partition( # Check that consumer paused all assigned partitions consumer.pause.assert_called_with(assignment) - def test_assign_partition_fix_offset_only( + def test_assign_partition_invalid_offset( self, recovery_manager_factory, recovery_partition_factory, @@ -100,7 +101,7 @@ def test_assign_partition_fix_offset_only( ): """ Try to recover store partition with changelog offset AHEAD of the watermark. - The offset should be adjusted in this case, but recovery should not be triggered + This is invalid and should raise exception. """ topic_name = "topic_name" @@ -127,7 +128,7 @@ def test_assign_partition_fix_offset_only( consumer=consumer, topic_manager=topic_manager ) - with patch.object(RecoveryPartition, "update_offset") as update_offset: + with pytest.raises(InvalidStoreChangelogOffset): recovery_manager.assign_partition( topic=topic_name, partition=partition_num, @@ -135,9 +136,6 @@ def test_assign_partition_fix_offset_only( committed_offset=-1001, ) - # "update_offset()" should be called - update_offset.assert_called() - # No pause or assignments should happen consumer.pause.assert_not_called() consumer.incremental_assign.assert_not_called() diff --git a/tests/test_quixstreams/test_state/test_recovery/test_recovery_partition.py b/tests/test_quixstreams/test_state/test_recovery/test_recovery_partition.py index 0da14ba4..c7c0071c 100644 --- a/tests/test_quixstreams/test_state/test_recovery/test_recovery_partition.py +++ b/tests/test_quixstreams/test_state/test_recovery/test_recovery_partition.py @@ -1,4 +1,3 @@ -import logging from unittest.mock import MagicMock import pytest @@ -24,7 +23,7 @@ def test_needs_recovery_check( assert recovery_partition.needs_recovery_check == needs_check @pytest.mark.parametrize( - "offset, needs_check, needs_update", + "offset, needs_check, invalid_offset", ( [21, False, True], [20, False, True], @@ -34,7 +33,7 @@ def test_needs_recovery_check( ), ) def test_needs_recovery_check_no_valid_offsets( - self, recovery_partition_factory, offset, needs_check, needs_update + self, recovery_partition_factory, offset, needs_check, invalid_offset ): # Create a RecoveryPartition with the offset ahead of the watermark store_partition = MagicMock(RocksDBStorePartition) @@ -43,7 +42,7 @@ def test_needs_recovery_check_no_valid_offsets( recovery_partition.set_watermarks(20, 20) assert recovery_partition.needs_recovery_check == needs_check - assert recovery_partition.needs_offset_update == needs_update + assert recovery_partition.has_invalid_offset == invalid_offset def test_recover_from_changelog_message(self, recovery_partition_factory): store_partition = MagicMock(RocksDBStorePartition) @@ -58,18 +57,3 @@ def test_recover_from_changelog_message(self, recovery_partition_factory): store_partition.recover_from_changelog_message.assert_called_with( changelog_message=msg, committed_offset=1 ) - - def test_update_offset(self, recovery_partition_factory, caplog): - store_partition = MagicMock(RocksDBStorePartition) - store_partition.get_changelog_offset.return_value = 10 - lowwater, highwater = 0, 9 - recovery_partition = recovery_partition_factory(store_partition=store_partition) - recovery_partition.set_watermarks(lowwater, highwater) - recovery_partition.update_offset() - - store_partition.set_changelog_offset.assert_called_with( - changelog_offset=highwater - 1 - ) - with caplog.at_level(level=logging.WARNING): - recovery_partition.update_offset() - assert caplog.text