Skip to content

Commit

Permalink
fixes based on code review, mostly minor stuff besides removing offse…
Browse files Browse the repository at this point in the history
…t fixing
  • Loading branch information
tim-quix committed Jun 28, 2024
1 parent be43ce3 commit 3f16292
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 158 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Quix Streams has the following benefits:
- Support for many serialization formats, including JSON (and Quix-specific).
- Support for stateful operations using RocksDB.
- Support for aggregations over tumbling and hopping time windows.
- "At-least-once" Kafka processing guarantees.
- "At-least-once" and "exactly-once" Kafka processing guarantees.
- Designed to run and scale resiliently via container orchestration (like Kubernetes).
- Easily runs locally and in Jupyter Notebook for convenient development and debugging.
- Seamless integration with the fully managed [Quix Cloud](https://quix.io/product) platform.
Expand Down Expand Up @@ -160,9 +160,9 @@ Here are some of the planned improvements:
- [x] [Windowed aggregations over Tumbling & Hopping windows](https://quix.io/docs/quix-streams/v2-0-latest/windowing.html)
- [x] [State recovery based on Kafka changelog topics](https://quix.io/docs/quix-streams/advanced/stateful-processing.html#fault-tolerance-recovery)
- [x] [Group-by operation](https://quix.io/docs/quix-streams/groupby.html)
- [X] ["Exactly Once" delivery guarantees for Kafka message processing (AKA transactions)](https://quix.io/docs/quix-streams/configuration.html#processing-guarantees)
- [ ] Joins
- [ ] Windowed aggregations over Sliding windows
- [ ] "Exactly Once" delivery guarantees for Kafka message processing (AKA transactions)
- [ ] Support for Avro and Protobuf formats
- [ ] Schema Registry support

Expand Down
32 changes: 15 additions & 17 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import signal
import warnings
from typing import Optional, List, Callable, Union, get_args
from typing import Optional, List, Callable, Union, Literal, get_args

from confluent_kafka import TopicPartition
from typing_extensions import Self
Expand Down Expand Up @@ -40,11 +40,11 @@
from .state import StateStoreManager
from .state.recovery import RecoveryManager
from .state.rocksdb import RocksDBOptionsType
from .types import ProcessingGuarantee, ExactlyOnceSemantics, AtLeastOnceSemantics

__all__ = ("Application",)

logger = logging.getLogger(__name__)
ProcessingGuarantee = Literal["at-least-once", "exactly-once"]
MessageProcessedCallback = Callable[[str, int, int], None]

# Enforce idempotent producing for the internal RowProducer
Expand Down Expand Up @@ -184,15 +184,10 @@ def __init__(
"""
configure_logging(loglevel=loglevel)

if processing_guarantee in get_args(ExactlyOnceSemantics):
exactly_once = True
elif processing_guarantee in get_args(AtLeastOnceSemantics):
exactly_once = False
else:
if processing_guarantee not in get_args(ProcessingGuarantee):
raise ValueError(
f'Must provide a valid "processing_guarantee"; expected: '
f"{[*get_args(ExactlyOnceSemantics), *get_args(AtLeastOnceSemantics)]}"
f', got "{processing_guarantee}"'
f'Must provide a valid "processing_guarantee"; expected one of: '
f'{get_args(ProcessingGuarantee)}, got "{processing_guarantee}"'
)

producer_extra_config = producer_extra_config or {}
Expand Down Expand Up @@ -263,6 +258,7 @@ def __init__(
self._commit_interval = commit_interval
self._producer_extra_config = producer_extra_config
self._consumer_extra_config = consumer_extra_config
self._processing_guarantee = processing_guarantee
self._consumer = RowConsumer(
broker_address=broker_address,
consumer_group=consumer_group,
Expand All @@ -279,7 +275,7 @@ def __init__(
"max.poll.interval.ms", _default_max_poll_interval_ms
)
/ 1000, # convert to seconds
transactional=exactly_once,
transactional=self._uses_exactly_once,
)
self._consumer_poll_timeout = consumer_poll_timeout
self._producer_poll_timeout = producer_poll_timeout
Expand All @@ -288,7 +284,6 @@ def __init__(
self._auto_create_topics = auto_create_topics
self._running = False
self._failed = False
self._exactly_once = exactly_once

if not topic_manager:
topic_manager = topic_manager_factory(
Expand Down Expand Up @@ -320,13 +315,17 @@ def __init__(
producer=self._producer,
consumer=self._consumer,
state_manager=self._state_manager,
exactly_once=exactly_once,
exactly_once=self._uses_exactly_once,
)

@property
def is_quix_app(self):
def is_quix_app(self) -> bool:
return self._is_quix_app

@property
def _uses_exactly_once(self) -> bool:
return self._processing_guarantee == "exactly-once"

@classmethod
def Quix(
cls,
Expand All @@ -349,7 +348,7 @@ def Quix(
topic_manager: Optional[QuixTopicManager] = None,
request_timeout: float = 30,
topic_create_timeout: float = 60,
processing_guarantee: ProcessingGuarantee = "exactly-once",
processing_guarantee: Literal["at-least-once", "exactly-once"] = "exactly-once",
) -> Self:
"""
>***NOTE:*** DEPRECATED: use Application with `quix_sdk_token` argument instead.
Expand Down Expand Up @@ -721,14 +720,13 @@ def run(
"""
self._setup_signal_handlers()

guarantee = "exactly-once" if self._exactly_once else "at-least-once"
logger.info(
f"Starting the Application with the config: "
f'broker_address="{self._broker_address}" '
f'consumer_group="{self._consumer_group}" '
f'auto_offset_reset="{self._auto_offset_reset}" '
f"commit_interval={self._commit_interval}s "
f'processing_guarantee="{guarantee}"'
f'processing_guarantee="{self._processing_guarantee}"'
)
if self.is_quix_app:
self._quix_runtime_init()
Expand Down
17 changes: 10 additions & 7 deletions quixstreams/checkpointing/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ def get_store_transaction(
self._store_transactions[(topic, partition, store_name)] = transaction
return transaction

def close(self):
"""
Perform cleanup (when the checkpoint is empty) instead of committing.
Needed for exactly-once, as Kafka transactions are timeboxed.
"""
if self._exactly_once:
self._producer.abort_transaction()

def commit(self):
"""
Commit the checkpoint.
Expand All @@ -118,12 +127,6 @@ def commit(self):
4. Flush each state store partition to the disk.
"""

if not self._tp_offsets:
logger.debug("Nothing to commit")
if self._exactly_once:
self._producer.abort_transaction()
return

# Step 1. Produce the changelogs
for (
topic,
Expand Down Expand Up @@ -160,7 +163,7 @@ def commit(self):
offsets, self._consumer.consumer_group_metadata()
)
else:
logger.debug("Checkpoint: commiting consumer")
logger.debug("Checkpoint: committing consumer")
try:
partitions = self._consumer.commit(offsets=offsets, asynchronous=False)
except KafkaException as e:
Expand Down
5 changes: 0 additions & 5 deletions quixstreams/kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def _producer(self) -> ConfluentProducer:
return self._inner_producer

def begin_transaction(self):
logger.debug("Starting Kafka transaction...")
self._producer.begin_transaction()

def send_offsets_to_transaction(
Expand All @@ -227,11 +226,7 @@ def send_offsets_to_transaction(
)

def abort_transaction(self, timeout: Optional[float] = None):
logger.debug("Aborting Kafka transaction...")
self._producer.abort_transaction(timeout if timeout is not None else -1)
logger.debug("Kafka transaction aborted successfully!")

def commit_transaction(self, timeout: Optional[float] = None):
logger.debug("Committing Kafka transaction...")
self._producer.commit_transaction(timeout if timeout is not None else -1)
logger.debug("Kafka transaction committed successfully!")
28 changes: 15 additions & 13 deletions quixstreams/processing_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,25 @@ def init_checkpoint(self):

def commit_checkpoint(self, force: bool = False):
"""
Commit the current checkpoint.
Attempts finalizing the current Checkpoint only if the Checkpoint is "expired",
or `force=True` is passed, otherwise do nothing.
The actual commit will happen only when:
To finalize: the Checkpoint will be committed if it has any stored offsets,
else just close it. A new Checkpoint is then created.
1. The checkpoint has at least one stored offset
2. The checkpoint is expired or `force=True` is passed
:param force: if `True`, commit the checkpoint before its expiration deadline.
:param force: if `True`, commit the Checkpoint before its expiration deadline.
"""
if self._checkpoint.expired() or force:
logger.debug(f"Attempting checkpoint commit; forced={force}")
start = time.monotonic()
self._checkpoint.commit()
elapsed = round(time.monotonic() - start, 2)
logger.debug(
f"Committed a checkpoint; forced={force}, time_elapsed={elapsed}s"
)
if self._checkpoint.empty():
self._checkpoint.close()
else:
logger.debug(f"Committing a checkpoint; forced={force}")
start = time.monotonic()
self._checkpoint.commit()
elapsed = round(time.monotonic() - start, 2)
logger.debug(
f"Committed a checkpoint; forced={force}, time_elapsed={elapsed}s"
)
self.init_checkpoint()

def __enter__(self):
Expand Down
Loading

0 comments on commit 3f16292

Please sign in to comment.