Skip to content

Update Application.run() behavior #932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 68 additions & 36 deletions docs/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,72 +91,104 @@ sdf.print_table(title="Final Output")
## Interacting with Data

If you'd like to store or manipulate data from `Application` processing directly, you can
do an "interactive" `Application` by executing `Application.run()` within an
iPython session (terminal/Pycharm/VS Code etc.) or Jupiter Notebook cell.
do an "interactive" `Application` by executing `Application.run()` with stop conditions within an
iPython session (terminal/Pycharm/VS Code etc.) or Jupyter Notebook cell.

To do so, you'll need to both:

- Provide stop conditions to `Application.run()`
- Use Quix Streams `ListSink` + `StreamingDataFrame.sink()` to store data.

Basically:

1. The `Application` runs for the specified period in the interactive session.
2. While running, data is stored in the specified `ListSink` variable(s).
3. Once the Application stops, those variable(s) are now accessible as normal.
1. The `Application` runs for the specified period in the interactive session.
2. Once the Application stops, the `run` method can return the accumulated outputs of the application for all registered dataframes.

The details of this pattern are further explained below.

### Application.run() stop conditions
### Stopping the application early and collecting the outputs

In a production setting, `Application.run()` should be called only once with no
arguments, which means it will run indefinitely (until it encounters an error or
is manually stopped by the user).

However, for debugging, the following kwargs can be passed to stop the `Application`
However, for debugging, the following kwargs can be passed to `Application.run()` to stop it
when the applicable condition is met:

- `timeout`: maximum time to wait for a new message (default `0.0` == infinite)
- `count`: number of messages to process from main SDF input topics (default `0` == infinite)
- `timeout`: maximum time to wait for a new message to arrive (default `0.0` == infinite)
- `count`: a number of outputs to process across all dataframes and input topics (default `0` == infinite)

If used together (which is the recommended pattern for debugging), either condition
If `timeout` and `count` are passed together (which is the recommended pattern for debugging), either condition
will trigger the stop.


#### Count Behavior
Also, you can collect the outputs of your application and examine them after
the `Application.run()` call:

- `collect`: if `True` (default), collect the outputs and return them as a list of dictionaries from the `Application.run()` call.
This setting is effective only when `timeout` or `count` are passed.
- `metadata`: if `True`, the collected outputs will include values, keys, timestamps, topics, partitions, and offsets.
Otherwise, only values are included (the default).

**Example**:

```python
from quixstreams import Application

app = Application(broker_address="localhost:9092")

topic = app.topic("some-topic")
# Assume the topic has one partition and three JSON messages:
# {"temperature": 30}
# {"temperature": 40}
# {"temperature": 50}

There are a few of things to be aware of with `count`:
sdf = app.dataframe(topic=topic)

- It only counts messages from (input) topics passed by the user to a `StreamingDatFrame`.
- this means things like repartition topics (group by) are NOT counted.
# Process one output and collect the value (stops if no messages for 10s)
result_values_only = app.run(count=1, timeout=10, collect=True)
# >>> result_values_only = [
# {"temperature": 30}
# ]

- It's a total message count _across all input topics_, NOT for each input topic
- ex: for `count=20`, `topic_a` could get 5 messages, and `topic_b` 15 messages

- Things like `SDF.apply(expand=True`) and branching do not affect counts.
# Process one output and collect the value with metadata (stops if no messages for 10s)
result_values_and_metadata = app.run(count=1, timeout=10, collect=True, metadata=True)
# >>> result_values_and_metadata = [
# {"temperature": 40, "_key": "<message_key>", "_timestamp": 123, "_offset": 1, "_topic": "some-topic", "_partition": 1, "_headers": None},
# ]

- AFTER the count is reached, the `Application` flushes any respective
repartition topics so all downstream processing is included.
- repartition highwaters are recorded when condition is met and is consumed up to
those watermarks.

# Process one output and without collecting (stops if no messages for 10s)
result_empty = app.run(count=1, timeout=10, collect=False)
# >>> result_empty = []


```

#### Count Behavior

There are a few things to be aware of with `count`:

- It counts _outputs_ processed by **all** `StreamingDataFrames`.
Under the hood, every message may generate from 0 to N outputs as it is passing through the topology generated by `StreamingDataFrames`:
- Operations like [filtering](processing.md#filtering-data) (e.g., `StreamingDataFrame.filter()` or `dataframe[dataframe["<field>"] == "<value>"]`) and ["final" windowed aggregations](windowing.md#emitting-after-the-window-is-closed-) reduce the number of outputs.
- Operations like [`StreamingDataFrame.apply(..., expand=True)`](processing.md#expanding-collections-into-items) and [branching](branching.md) may increase the total number of outputs

- The total number of outputs may be higher than the passed `count` parameter because every message is processed fully before stopping the app.
- After the count is reached, the `Application` stops and returns the accumulated outputs according to the `collection_mode` setting.


#### Timeout Behavior

A couple things to note about `timeout`:
A couple of things to note about `timeout`:

- Though it can be used standalone, it's recommended to be paired with a `count`.

- Tracking starts once the first partition assignment (or recovery, if needed) finishes.
- There is a 60s wait buffer for the first assignment to trigger.

- Using only `timeout` when collecting data from a high-volume topic
with a `ListSink` could cause out-of-memory errors.
- Using only `timeout` when collecting data from high-volume topics may cause out-of-memory errors when `collect=True` (default).

#### Multiple Application.run() calls

It is safe to do subsequent `Application.run()` calls (even with
new arguments!); it will simply pick up where it left off.
It is safe to do subsequent `Application.run()` calls with different arguments; it will simply pick up where it left off.

There is no need to do any manual cleanup when finished; each run cleans up after itself.

Expand All @@ -166,8 +198,8 @@ There is no need to do any manual cleanup when finished; each run cleans up afte

`ListSink` primarily exists to use alongside `Application.run()` stop conditions.

It collects data where it's used, which can then be interacted with like
a list once the `Application` stops.
You may use it to collect and examine data after some specific operations in the DataFrame.
It can be interacted with like a list once the `Application` stops.

#### Using ListSink

Expand All @@ -187,11 +219,11 @@ app.run(count=50, timeout=10) # get up to 50 records (stops if no messages for

You can then interact with it once the `Application` stops:

```shell
> print(list_sink)
```pycon
>>> print(list_sink)
[{"thing": "x"}, {"thing": "y"}, {"thing": "z"}]

> list_sink[0]
>>> list_sink[0]
{"thing": "x"}
```

Expand All @@ -214,7 +246,7 @@ as additional fields as `_{param}`, like so:
- You can use any number of `ListSink` in an `Application`
- each one must have its own variable.

- `ListLink` does not limit it's own size
- `ListLink` does not limit its own size
- Be sure to use it with `Application.run()` stopping conditions.

- `ListSink` does not "refresh" itself per `.run()`; it collects data indefinitely.
Expand Down
107 changes: 61 additions & 46 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pydantic_settings import PydanticBaseSettingsSource, SettingsConfigDict

from .context import copy_context, set_message_context
from .core.stream.functions.types import VoidExecutor
from .dataframe import DataFrameRegistry, StreamingDataFrame
from .error_callbacks import (
ConsumerErrorCallback,
Expand Down Expand Up @@ -375,7 +376,7 @@ def __init__(
sink_manager=self._sink_manager,
dataframe_registry=self._dataframe_registry,
)
self._run_tracker = RunTracker(processing_context=self._processing_context)
self._run_tracker = RunTracker()

@property
def config(self) -> "ApplicationConfig":
Expand Down Expand Up @@ -568,7 +569,7 @@ def stop(self, fail: bool = False):
to unhandled exception, and it shouldn't commit the current checkpoint.
"""

self._run_tracker.stop_and_reset()
self._run_tracker.stop()
if fail:
# Update "_failed" only when fail=True to prevent stop(failed=False) from
# resetting it
Expand Down Expand Up @@ -747,34 +748,36 @@ def run(
dataframe: Optional[StreamingDataFrame] = None,
timeout: float = 0.0,
count: int = 0,
):
collect: bool = True,
metadata: bool = False,
) -> list[dict]:
"""
Start processing data from Kafka using provided `StreamingDataFrame`

Once started, it can be safely terminated with a `SIGTERM` signal
(like Kubernetes does) or a typical `KeyboardInterrupt` (`Ctrl+C`).

Alternatively, stop conditions can be set (typically for debugging purposes);
has the option of stopping after a number of messages, timeout, or both.
has the option of stopping after a number of outputs, timeout, or both.

Not setting a timeout or count limit will result in the Application running
indefinitely (expected production behavior).


Stop Condition Details:

A timeout will immediately stop an Application once no new messages have
been consumed after T seconds (after rebalance and recovery).
A `timeout` will immediately stop an Application once no new messages have
been consumed after T seconds (after rebalance and recovery).

A `count` will make the application to wait until N total outputs
are processed from all the input topics after an initial rebalance
and recovery.
Note that each message may produce from 0 to N outputs depending
on the processing code.

A count will process N total records from ANY input/SDF topics (so
multiple input topics will very likely differ in their consume total!) after
an initial rebalance and recovery.
THEN, any remaining processes from things such as groupby (which uses internal
topics) will also be validated to ensure the results of said messages are
fully processed (this does NOT count towards the process total).
Note that without a timeout, the Application runs until the count is hit.
If `timeout` is not set, the Application runs until the `count` is hit.

If timeout and count are used together (which is the recommended pattern for
If `timeout` and `count` are used together (which is the recommended pattern for
debugging), either condition will trigger a stop.


Expand All @@ -794,10 +797,20 @@ def run(
app.run() # could pass `timeout=5` here, for example
```
:param dataframe: DEPRECATED - do not use; sdfs are now automatically tracked.

:param timeout: maximum time to wait for a new message.
Default = 0.0 (infinite)
:param count: how many input topic messages to process before stopping.
Default = 0 (infinite)
Default: 0.0 (infinite)
:param count: stop the application after processing N outputs.
Default: 0 (infinite)
:param collect: if True, collect the outputs and return them as a list of dictionaries
in the format defined by the `metadata` parameter.
This parameter is effective only when `timeout` or `count` are passed.
Default: `True`.
:param metadata: if True, the collected outputs will contain values, keys,
timestamps, offsets, topics and partitions.
Otherwise, only values are collected.
This parameter is effective only if `collect=True` and `timeout` or `count` are passed.
Default - `False`.
"""
if dataframe is not None:
warnings.warn(
Expand All @@ -813,24 +826,8 @@ def run(
"Can only provide a timeout to .run() when running "
"a plain Source (no StreamingDataFrame)."
)
self._run_tracker.set_topics(
[t for t in self._topic_manager.topics],
[t for t in self._topic_manager.repartition_topics],
)
self._run_tracker.reset()
self._run_tracker.set_stop_condition(timeout=timeout, count=count)
self._run()

def _exception_handler(self, exc_type, exc_val, exc_tb):
fail = False

# Sources and the application are independent.
# If a source fails, the application can shutdown gracefully.
if exc_val is not None and exc_type is not SourceException:
fail = True

self.stop(fail=fail)

def _run(self):
self._setup_signal_handlers()

logger.info(
Expand All @@ -855,27 +852,45 @@ def _run(self):
with exit_stack:
# Subscribe to topics in Kafka and start polling
if self._dataframe_registry.consumer_topics:
self._run_dataframe()
collector = self._run_tracker.get_collector(
collect=collect, metadata=metadata
)
self._run_dataframe(sink=collector)
else:
self._run_sources()

def _run_dataframe(self):
return self._run_tracker.collected

def _exception_handler(self, exc_type, exc_val, exc_tb):
fail = False

# Sources and the application are independent.
# If a source fails, the application can shutdown gracefully.
if exc_val is not None and exc_type is not SourceException:
fail = True

self.stop(fail=fail)

def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
changelog_topics = self._topic_manager.changelog_topics_list
self._consumer.subscribe(
topics=self._dataframe_registry.consumer_topics + changelog_topics,
on_assign=self._on_assign,
on_revoke=self._on_revoke,
on_lost=self._on_lost,
)

# set refs for performance improvements
dataframes_composed = self._dataframe_registry.compose_all()
state_manager = self._state_manager
processing_context = self._processing_context
source_manager = self._source_manager
process_message = self._process_message
printer = self._processing_context.printer
run_tracker = self._run_tracker
consumer = self._consumer

consumer.subscribe(
topics=self._dataframe_registry.consumer_topics + changelog_topics,
on_assign=self._on_assign,
on_revoke=self._on_revoke,
on_lost=self._on_lost,
)

dataframes_composed = self._dataframe_registry.compose_all(sink=sink)

processing_context.init_checkpoint()
run_tracker.set_as_running()
Expand All @@ -888,7 +903,7 @@ def _run_dataframe(self):
else:
process_message(dataframes_composed)
processing_context.commit_checkpoint()
self._consumer.resume_backpressured()
consumer.resume_backpressured()
source_manager.raise_for_error()
printer.print()
run_tracker.update_status()
Expand Down Expand Up @@ -927,7 +942,7 @@ def _process_message(self, dataframe_composed):
)

if rows is None:
self._run_tracker.set_current_message_tp(None)
self._run_tracker.set_message_consumed(False)
return

# Deserializer may return multiple rows for a single message
Expand Down Expand Up @@ -965,7 +980,7 @@ def _process_message(self, dataframe_composed):
self._processing_context.store_offset(
topic=topic_name, partition=partition, offset=offset
)
self._run_tracker.set_current_message_tp((topic_name, partition))
self._run_tracker.set_message_consumed(True)

if self._on_message_processed is not None:
self._on_message_processed(topic_name, partition, offset)
Expand Down
5 changes: 4 additions & 1 deletion quixstreams/core/stream/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from quixstreams.exceptions import QuixException

__all__ = ("InvalidTopology",)
__all__ = ("InvalidTopology", "InvalidOperation")


class InvalidTopology(QuixException): ...


class InvalidOperation(QuixException): ...
Loading