Skip to content

Commit

Permalink
finished writing tests, added docs
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Jun 25, 2024
1 parent f602f1f commit 7a55a1e
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 119 deletions.
35 changes: 35 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ See more `auto.offset.reset` in this [article](https://www.quix.io/blog/kafka-au
**Options**: `"latest"`, `"earliest"`.
**Default** - `"latest"`.

- **`processing_guarantee`** - Use "at-least-once" or "exactly-once" processing
guarantees.
See [Processing Guarantees](#processing-guarantees) for more information.
**Options**: `"at-least-once"`/`"ALO"` or `"exactly-once"`/`"EO"`.
**Default** - `"exactly-once"`.

## Authentication

Expand Down Expand Up @@ -83,6 +88,36 @@ app = Application(
`ConnectionConfig.from_librdkafka_dict(config, ignore_extras=True)` will additionally
ignore irrelevant settings (but you will lose some validation checks).

## Processing Guarantees

This section concerns the `processing_guarantee` setting.

### What are "processing guarantees"?
Kafka broadly has three guarantee levels/semantics associated with handling messages.

From weakest to strongest: `at-most-once`, `at-least-once`, and `exactly-once`.
Stronger guarantees generally have larger overhead, and thus reduced speed.

These guarantees can be read literally: when consuming Kafka messages, you can
guarantee each will be processed `X` times.

Many users expect that Kafka processing is `exactly-once` by nature, but this is often
not the case for many client libraries, which frequently leads to confusing results.

### What options does Quix Streams offer?

Currently, Quix Streams offers `at-least-once` and `exactly-once`.

Also, `exactly-once` is the default in order to give new Kafka users the behavior they
likely expect out of the box.

### What is right for me?
If unsure or processing speed has not been an issue, the safe answer is `exactly-once`
(the default).

Otherwise, it's a case-by-case determination, ultimately weighing an increase of speed
vs the potential to double-process a result, which may require other infrastructural
considerations to handle appropriately.

## State
- **`state_dir`** - path to the application state directory.
Expand Down
10 changes: 5 additions & 5 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ def __init__(
configure_logging(loglevel=loglevel)

if processing_guarantee in get_args(ExactlyOnceSemantics):
use_kafka_transactions = True
exactly_once = True
elif processing_guarantee in get_args(AtLeastOnceSemantics):
use_kafka_transactions = False
exactly_once = False
else:
raise ValueError(
f'Must provide a valid "processing_guarantee"; expected: '
Expand Down Expand Up @@ -275,7 +275,7 @@ def __init__(
broker_address=broker_address,
extra_config=producer_extra_config,
on_error=on_producer_error,
transactional=use_kafka_transactions,
transactional=exactly_once,
)
self._consumer_poll_timeout = consumer_poll_timeout
self._producer_poll_timeout = producer_poll_timeout
Expand All @@ -284,7 +284,7 @@ def __init__(
self._auto_create_topics = auto_create_topics
self._running = False
self._failed = False
self._exactly_once = processing_guarantee
self._exactly_once = exactly_once

if not topic_manager:
topic_manager = topic_manager_factory(
Expand Down Expand Up @@ -316,7 +316,7 @@ def __init__(
producer=self._producer,
consumer=self._consumer,
state_manager=self._state_manager,
exactly_once=use_kafka_transactions,
exactly_once=exactly_once,
)

@classmethod
Expand Down
24 changes: 6 additions & 18 deletions quixstreams/checkpointing/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def get_store_transaction(
self._store_transactions[(topic, partition, store_name)] = transaction
return transaction

def commit(self, offset_adjust: bool = False):
def commit(self):
"""
Commit the checkpoint.
Expand Down Expand Up @@ -164,23 +164,11 @@ def commit(self, offset_adjust: bool = False):
# Get the changelog topic-partition for the given transaction
# It can be None if changelog topics are disabled in the app config
changelog_tp = transaction.changelog_topic_partition
if offset_adjust:
# Set the changelog offset to its highwater - 1 in Kafka
# As part of graceful revoke.
# This helps avoid unnecessary recovery attempts (mostly with
# exactly-once, where the highwater is at least >=2 from last message).
_, highwater = self._consumer.get_watermark_offsets(
TopicPartition(*changelog_tp)
)
changelog_offset = highwater - 1
else:
# The changelog offset also can be None if no updates happened
# during transaction
changelog_offset = (
produced_offsets.get(changelog_tp)
if changelog_tp is not None
else None
)
# The changelog offset also can be None if no updates happened
# during transaction
changelog_offset = (
produced_offsets.get(changelog_tp) if changelog_tp is not None else None
)
transaction.flush(
processed_offset=offset, changelog_offset=changelog_offset
)
2 changes: 1 addition & 1 deletion quixstreams/processing_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def commit_checkpoint(self, force: bool = False):
if self._checkpoint.expired() or force:
logger.debug(f"Attempting checkpoint commit; forced={force}")
start = time.monotonic()
self._checkpoint.commit(offset_adjust=force)
self._checkpoint.commit()
elapsed = round(time.monotonic() - start, 2)
logger.debug(
f"Committed a checkpoint; forced={force}, time_elapsed={elapsed}s"
Expand Down
5 changes: 1 addition & 4 deletions quixstreams/rowproducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,7 @@ def abort_transaction(self, timeout: Optional[float] = None):
self._producer.abort_transaction(timeout)
self._active_transaction = False
else:
logger.debug(
"No Kafka transaction to abort, "
"likely due to some other exception occurring"
)
logger.debug("No Kafka transaction to abort")

def _retriable_commit_op(self, operation: Callable, args: list):
"""
Expand Down
74 changes: 44 additions & 30 deletions quixstreams/state/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def __init__(
self._changelog_highwater: Optional[int] = None
self._committed_offset = committed_offset
self._recovery_consume_position: Optional[int] = None
self._initial_offset: Optional[int] = None

def __str__(self):
return f"{self.changelog_name}[{self.partition_num}]"

@property
def changelog_name(self) -> str:
Expand All @@ -52,47 +56,49 @@ def changelog_name(self) -> str:
def partition_num(self) -> int:
return self._partition_num

@property
def changelog_highwater(self) -> Optional[int]:
return self._changelog_highwater

@property
def changelog_lowwater(self) -> Optional[int]:
return self._changelog_lowwater

@property
def offset(self) -> int:
"""
Get the changelog offset from the underlying `StorePartition`.
:return: changelog offset (int)
"""
return self._store_partition.get_changelog_offset() or 0
offset = self._store_partition.get_changelog_offset() or 0
if not self._initial_offset:
self._initial_offset = offset
return offset

@property
def _has_consumable_offsets(self) -> bool:
if self._recovery_consume_position:
# uses this when at least 1 poll during a recovery loop returned None
return self._recovery_consume_position != self.changelog_highwater
return self._changelog_lowwater != self._changelog_highwater
def finished_recovery_check(self) -> bool:
return self._recovery_consume_position == self._changelog_highwater

@property
def needs_recovery(self):
def needs_recovery_check(self) -> bool:
"""
Determine whether recovery is necessary for underlying `StorePartition`.
Determine whether to attempt recovery for underlying `StorePartition`.
This does NOT mean that anything actually requires recovering.
"""
state_is_behind = self._changelog_highwater - 1 > self.offset
return self._has_consumable_offsets and state_is_behind
is_behind = self.offset < self._changelog_highwater - 1
return (self._changelog_lowwater != self._changelog_highwater) and is_behind

@property
def needs_offset_update(self):
def needs_offset_update(self) -> bool:
"""
Determine if an offset update is required.
Usually checked during assign if recovery was not required.
"""
return self._changelog_highwater and (self._changelog_highwater <= self.offset)

@property
def recovery_consume_position(self) -> Optional[int]:
return self._recovery_consume_position

@property
def had_recovery_changes(self) -> bool:
return self._initial_offset != self.offset

def update_offset(self):
"""
Update only the changelog offset of a StorePartition.
Expand All @@ -103,11 +109,11 @@ def update_offset(self):
)
if self.offset >= self._changelog_highwater:
logger.warning(
f"{self.changelog_name}[{self.partition_num}] - the changelog offset "
f"{self} - the changelog offset "
f"{self.offset} in state was greater or equal to its actual highwater "
f"{self._changelog_highwater}, possibly due to previous Kafka or "
f"network issues. State may be inaccurate for any affected keys. "
f"The offset will now be set to {self._changelog_highwater - 1}."
f"The offset will now be set to its highwater - 1."
)
self._store_partition.set_changelog_offset(
changelog_offset=self._changelog_highwater - 1
Expand Down Expand Up @@ -139,6 +145,10 @@ def set_recovery_consume_position(self, offset: int):
"""
Update the recovery partition with the consumer's position (whenever
an empty poll is returned during recovery).
It is possible that it may be set more than once.
:param offset: the consumer's current read position of the changelog
"""
self._recovery_consume_position = offset

Expand Down Expand Up @@ -355,14 +365,16 @@ def assign_partition(
)
for rp in recovery_partitions:
changelog_name, partition = rp.changelog_name, rp.partition_num
if rp.needs_recovery:
logger.info(f"Recovery required for {changelog_name}[{partition}]")
if rp.needs_recovery_check:
logger.debug(f"Performing recovery check for {rp}")
self._recovery_partitions.setdefault(partition, {})[changelog_name] = rp
# note: technically it should be rp.offset + 1, but to remain backwards
# compatible with >v2.7 +1 ALOS offsetting, it remains rp.offset.
# compatible with <v2.7 +1 ALOS offsetting, it remains rp.offset.
# This means we will always re-write the "first" recovery message.
# Eventually can be updated, but would otherwise miss a message for now
# (though it is an edge case of upgrading when recovery is required).
# More specifically, this is only covering for a very edge case:
# when first upgrading from <v2.7 AND a recovery was actually needed.
# Once on >=v2.7, this is no longer an issue...so we could eventually
# remove this, potentially.
self._consumer.incremental_assign(
[ConfluentPartition(changelog_name, partition, rp.offset)]
)
Expand Down Expand Up @@ -405,7 +417,7 @@ def _revoke_recovery_partitions(
if not self._recovery_partitions[partition_num]:
del self._recovery_partitions[partition_num]
if self.recovering:
logger.debug("Resuming recovery...")
logger.debug("Resuming recovery process...")

def revoke_partition(self, partition_num: int):
"""
Expand All @@ -427,12 +439,14 @@ def _update_recovery_status(self):
[ConfluentPartition(rp.changelog_name, rp.partition_num)]
)[0].offset
rp.set_recovery_consume_position(position)
if not rp.needs_recovery:
if rp.finished_recovery_check:
revokes.append(rp)
while revokes:
rp = revokes.pop()
rp.update_offset()
logger.info(f"Finished recovering {rp.changelog_name}[{rp.partition_num}]")
if rp.had_recovery_changes:
logger.info(f"Finished recovering {rp}")
else:
logger.debug(f"No recovery required for {rp}")
self._revoke_recovery_partitions(
[self._recovery_partitions[rp.partition_num].pop(rp.changelog_name)],
rp.partition_num,
Expand Down
Loading

0 comments on commit 7a55a1e

Please sign in to comment.