From 1d3c01a8a97727f7b79ab3ddc8cac5a532a8888a Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Thu, 29 Aug 2024 15:19:30 -0400 Subject: [PATCH 1/4] remove lock as branching removes its necessity --- quixstreams/dataframe/dataframe.py | 53 +--------- quixstreams/dataframe/exceptions.py | 4 - .../test_dataframe/test_dataframe.py | 96 ++++++++++++------- 3 files changed, 64 insertions(+), 89 deletions(-) diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index b79b65370..ed7083f67 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -17,10 +17,9 @@ Tuple, Literal, Collection, - TypeVar, ) -from typing_extensions import Self, ParamSpec +from typing_extensions import Self from quixstreams.context import ( message_context, @@ -48,7 +47,7 @@ from quixstreams.sinks import BaseSink from quixstreams.state.types import State from .base import BaseStreaming -from .exceptions import InvalidOperation, DataFrameLocked +from .exceptions import InvalidOperation from .registry import DataframeRegistry from .series import StreamingSeries from .utils import ensure_milliseconds @@ -62,27 +61,6 @@ FilterWithMetadataCallbackStateful = Callable[[Any, Any, int, Any, State], bool] -_T = TypeVar("_T") -_P = ParamSpec("_P") - - -def _ensure_unlocked(func: Callable[_P, _T]) -> Callable[_P, _T]: - """ - Ensure the SDF instance is not locked by the sink() call before adding new - operations to it. - """ - - @functools.wraps(func) - def wrapper(self: StreamingDataFrame, *args, **kwargs): - if self._locked: - raise DataFrameLocked( - "StreamingDataFrame is already sinked and cannot be modified" - ) - return func(self, *args, **kwargs) - - return wrapper - - class StreamingDataFrame(BaseStreaming): """ `StreamingDataFrame` is the main object you will use for ETL work. @@ -189,7 +167,6 @@ def apply( expand: bool = ..., ) -> Self: ... - @_ensure_unlocked def apply( self, func: Union[ @@ -279,7 +256,6 @@ def update( metadata: Literal[True], ) -> Self: ... - @_ensure_unlocked def update( self, func: Union[ @@ -372,7 +348,6 @@ def filter( metadata: Literal[True], ) -> Self: ... - @_ensure_unlocked def filter( self, func: Union[ @@ -459,7 +434,6 @@ def group_by( key_serializer: Optional[SerializerType] = ..., ) -> Self: ... - @_ensure_unlocked def group_by( self, key: Union[str, Callable[[Any], Any]], @@ -559,7 +533,6 @@ def contains(key: str) -> StreamingSeries: lambda value, key_, timestamp, headers: key in value ) - @_ensure_unlocked def to_topic( self, topic: Topic, key: Optional[Callable[[Any], Any]] = None ) -> Self: @@ -605,7 +578,6 @@ def to_topic( metadata=True, ) - @_ensure_unlocked def set_timestamp(self, func: Callable[[Any, Any, int, Any], int]) -> Self: """ Set a new timestamp based on the current message value and its metadata. @@ -647,7 +619,6 @@ def _set_timestamp_callback( stream = self.stream.add_transform(func=_set_timestamp_callback) return self.__dataframe_clone__(stream=stream) - @_ensure_unlocked def set_headers( self, func: Callable[ @@ -699,7 +670,6 @@ def _set_headers_callback( stream = self.stream.add_transform(func=_set_headers_callback) return self.__dataframe_clone__(stream=stream) - @_ensure_unlocked def print(self, pretty: bool = True, metadata: bool = False) -> Self: """ Print out the current message value (and optionally, the message metadata) to @@ -813,7 +783,6 @@ def test( context.run(composed[topic.name], value, key, timestamp, headers) return result - @_ensure_unlocked def tumbling_window( self, duration_ms: Union[int, timedelta], @@ -890,7 +859,6 @@ def tumbling_window( duration_ms=duration_ms, grace_ms=grace_ms, dataframe=self, name=name ) - @_ensure_unlocked def hopping_window( self, duration_ms: Union[int, timedelta], @@ -983,7 +951,6 @@ def hopping_window( name=name, ) - @_ensure_unlocked def drop( self, columns: Union[str, List[str]], @@ -1028,7 +995,6 @@ def drop( metadata=False, ) - @_ensure_unlocked def sink(self, sink: BaseSink): """ Sink the processed data to the specified destination. @@ -1044,8 +1010,8 @@ def sink(self, sink: BaseSink): and resume again after the timeout. The backpressure handling and timeouts are defined by the specific sinks. - Note: `sink()` is a terminal operation, and you cannot add new operations - to the same StreamingDataFrame after it's called. + Note: `sink()` is a terminal operation - it cannot receive any additional + operations, but branches can still be generated from its originating SDF. """ self._processing_context.sink_manager.register(sink) @@ -1064,15 +1030,8 @@ def _sink_callback( offset=ctx.offset, ) - # even though using apply, don't return since we lock afterward anyway + # uses apply without returning to make this operation terminal self.apply(_sink_callback, metadata=True) - self._lock() - - def _lock(self): - """ - Lock the StreamingDataFrame to prevent adding new operations to it. - """ - self._locked = True def _produce( self, @@ -1140,7 +1099,6 @@ def __dataframe_clone__( ) return clone - @_ensure_unlocked def __setitem__(self, item_key: Any, item: Union[Self, object]): if isinstance(item, self.__class__): # Update an item key with a result of another sdf.apply() @@ -1173,7 +1131,6 @@ def __getitem__(self, item: str) -> StreamingSeries: ... @overload def __getitem__(self, item: Union[StreamingSeries, List[str], Self]) -> Self: ... - @_ensure_unlocked def __getitem__( self, item: Union[str, List[str], StreamingSeries, Self] ) -> Union[Self, StreamingSeries]: diff --git a/quixstreams/dataframe/exceptions.py b/quixstreams/dataframe/exceptions.py index b2dfcd319..1367902c9 100644 --- a/quixstreams/dataframe/exceptions.py +++ b/quixstreams/dataframe/exceptions.py @@ -6,7 +6,6 @@ "GroupByNestingLimit", "InvalidColumnReference", "ColumnDoesNotExist", - "DataFrameLocked", "StreamingDataFrameDuplicate", "GroupByDuplicate", ) @@ -27,7 +26,4 @@ class GroupByNestingLimit(QuixException): ... class GroupByDuplicate(QuixException): ... -class DataFrameLocked(QuixException): ... - - class StreamingDataFrameDuplicate(QuixException): ... diff --git a/tests/test_quixstreams/test_dataframe/test_dataframe.py b/tests/test_quixstreams/test_dataframe/test_dataframe.py index d06262478..552585d66 100644 --- a/tests/test_quixstreams/test_dataframe/test_dataframe.py +++ b/tests/test_quixstreams/test_dataframe/test_dataframe.py @@ -10,7 +10,6 @@ InvalidOperation, GroupByNestingLimit, GroupByDuplicate, - DataFrameLocked, ) from quixstreams.dataframe.registry import DataframeRegistry from quixstreams.dataframe.windows import WindowResult @@ -1642,33 +1641,25 @@ def test_group_by_name_clash(self, dataframe_factory, topic_manager_factory): with pytest.raises(GroupByDuplicate): sdf.group_by("col_a") - @pytest.mark.parametrize( - "operation", - [ - lambda sdf: sdf["a"], - lambda sdf: operator.setitem(sdf, "a", 1), - lambda sdf: sdf.apply(...), - lambda sdf: sdf.update(...), - lambda sdf: sdf.filter(...), - lambda sdf: sdf.print(), - lambda sdf: sdf.drop(), - lambda sdf: sdf.group_by(), - lambda sdf: sdf.tumbling_window(1), - lambda sdf: sdf.hopping_window(1, 1), - lambda sdf: sdf.to_topic(...), - lambda sdf: sdf.sink(...), - lambda sdf: sdf.set_headers(...), - lambda sdf: sdf.set_timestamp(...), - ], - ) - def test_sink_locks_sdf(self, operation, dataframe_factory, topic_manager_factory): + def test_sink_cannot_be_added_to(self, dataframe_factory, topic_manager_factory): + """ + A sink cannot be added to or branched. + """ topic_manager = topic_manager_factory() topic = topic_manager.topic(str(uuid.uuid4())) sdf = dataframe_factory(topic, topic_manager=topic_manager) + assert len(sdf.stream.children) == 0 sdf.sink(DummySink()) - - with pytest.raises(DataFrameLocked): - operation(sdf) + # sink operation was added + assert len(sdf.stream.children) == 1 + sdf_sink_node = list(sdf.stream.children)[0] + # do random stuff + sdf.apply(lambda x: x).update(lambda x: x) + sdf = sdf.apply(lambda x: x) + sdf.update(lambda x: x).apply(lambda x: x) + sdf.update(lambda x: x) + # no children should be added to the sink operation + assert not sdf_sink_node.children def add_n(n): @@ -1714,19 +1705,15 @@ def test_basic_branching(self, dataframe_factory): def test_multiple_branches(self, dataframe_factory): """ - --< is a split - "S'" denotes the continuation of the sdf that was split from - - sdf ---[ add_120, div_2 ]---< (sdf', sdf2), 60 - sdf_2 ---[ div_3 ]---< (sdf_2', sdf_3, sdf_4), 20 - sdf_3 ---[ add_10, add_3 ]---|END 33 - sdf_4 ---[ add_24 ]---|END 44 - sdf_2' ---[ add_2 ]---|END 22 - sdf' ---[ add_40 ]---< (sdf'', sdf_5), 100 - sdf_5 ---[ div_2, add_5 ]---|END 55 - sdf'' ---[ div_100, add_10 ]---|END 11 - - :return: + INPUT: 0 + └── SDF_1 = (add 120, div 2) -> 60 + ├── SDF_2 = (div 3) -> 20 + │ ├── SDF_3 = (add 10, add 3 ) -> 33 + │ ├── SDF_4 = ( add 24 ) -> 44 + │ └── SDF_2 = ( add 2 ) -> 22 + └── SDF_1 = (add 40) -> 100 + ├── SDF_5 = ( div 2, add 5 ) -> 55 + └── SDF_1 = (div 100, add 10) -> 11 """ sdf = dataframe_factory().apply(add_n(120)).apply(div_n(2)) # 60 @@ -1751,6 +1738,41 @@ def test_multiple_branches(self, dataframe_factory): assert results == expected + def test_multiple_branches_skip_assigns(self, dataframe_factory): + """ + INPUT: 0 + └── SDF_1 = (add 120, div 2) -> 60 + ├── SDF_2 = (div 3) -> 20 + │ ├── (add 10, add 3 ) -> 33 + │ ├── ( add 24 ) -> 44 + │ └── ( add 2 ) -> 22 + └── SDF_1 = (add 40) -> 100 + ├── ( div 2, add 5 ) -> 55 + └── (div 100, add 10) -> 11 + """ + + sdf = dataframe_factory().apply(add_n(120)).apply(div_n(2)) # 60 + sdf_2 = sdf.apply(div_n(3)) # 20 + sdf_2.apply(add_n(10)).apply(add_n(3)) # 33 + sdf_2.apply(add_n(24)) # 44 + sdf_2.apply(add_n(2)) # 22 + sdf = sdf.apply(add_n(40)) # 100 + sdf.apply(div_n(2)).apply(add_n(5)) # 55 + sdf.apply(div_n(100)).apply(add_n(10)) # 11 + + _extras = {"key": b"key", "timestamp": 0, "headers": []} + extras = list(_extras.values()) + expected = [ + (33, *extras), + (44, *extras), + (22, *extras), + (55, *extras), + (11, *extras), + ] + results = sdf.test(value=0, **_extras) + + assert results == expected + def test_filter(self, dataframe_factory): sdf = dataframe_factory().apply(add_n(10)) sdf2 = sdf.apply(add_n(5)).filter(less_than(0)).apply(add_n(200)) From b1f44cc46f49e1f5b8a7ec648be27a595063baf7 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Fri, 30 Aug 2024 14:03:30 -0400 Subject: [PATCH 2/4] revert to_topic to update, add more docs around sink and a test for sink + branching --- docs/connectors/sinks/README.md | 47 ++++++++++++++-- quixstreams/dataframe/dataframe.py | 4 +- tests/test_quixstreams/test_app.py | 87 ++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 7 deletions(-) diff --git a/docs/connectors/sinks/README.md b/docs/connectors/sinks/README.md index 4bcccbf17..8def229c2 100644 --- a/docs/connectors/sinks/README.md +++ b/docs/connectors/sinks/README.md @@ -30,11 +30,50 @@ sdf = app.dataframe(topic) sdf.sink(influx_sink) ``` -## Sinks Are Destinations -When `.sink()` is called on a StreamingDataFrame instance, it marks the end of the processing pipeline, and - the StreamingDataFrame can't be changed anymore. +## Sinks Are Terminal Operations +`StreamingDataFrame.sink()` is special in that its "terminal": +**no additional operations can be added to it once called** (with branching, the branch +becomes terminal). + +This is to ensure no further mutations can be applied to the outbound data. + +_However_, you can continue other operations with other branches, including using +the same `Sink` to push another set of data (with another `SDF.sink()` call). + +[Learn more about _branching_ here](../../advanced/branching.md). + +### Branching after SDF.sink() + +It is still possible to branch after using `SDF.sink()` assuming _you do NOT reassign +with it_ (it returns `None`): + +```python +sdf = app.dataframe(topic) +sdf = sdf.apply() + +# Approach 1... Allows branching from `sdf` +sdf.sink() + +# Approach 2...Disables branching from `sdf` +sdf = sdf.sink() +``` + +### Suggested Use of SDF.sink() + +If further operations are required (or you want to preserve various operations for +other branches), it's recommended to use `SDF.sink()` as a standalone operation: + +```python +sdf = app.dataframe(topic) +# [other operations here...] +sdf = sdf.apply().apply() # last transforms before a sink +sdf.sink(influx_sink) # do sink as a standalone call, no reassignment +sdf = sdf.apply() # continue different operations with another branch... +``` + +([learn more: branching](../../advanced/branching.md)) + -Make sure you call `StreamingDataFrame.sink()` as the last operation. ## Supported Sinks diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index ed7083f67..603017d15 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -567,7 +567,7 @@ def to_topic( By default, the current message key will be used. :return: the updated StreamingDataFrame instance (reassignment NOT required). """ - return self.apply( + return self._add_update( lambda value, orig_key, timestamp, headers: self._produce( topic=topic, value=value, @@ -1046,8 +1046,6 @@ def _produce( value=value, key=key, timestamp=timestamp, context=ctx, headers=headers ) self._producer.produce_row(row=row, topic=topic, key=key, timestamp=timestamp) - # return value so produce can be an "apply" function (no branch copy required) - return value def _add_update( self, diff --git a/tests/test_quixstreams/test_app.py b/tests/test_quixstreams/test_app.py index 6d580bae7..8996ef875 100644 --- a/tests/test_quixstreams/test_app.py +++ b/tests/test_quixstreams/test_app.py @@ -2090,6 +2090,93 @@ def write(self, batch: SinkBatch): ) assert committed.offset == total_messages + def test_run_with_sink_branches_success( + self, + app_factory, + executor, + ): + + processed_count = 0 + total_messages = 3 + + def on_message_processed(topic_, partition, offset): + # Set the callback to track total messages processed + # The callback is not triggered if processing fails + nonlocal processed_count + + processed_count += 1 + # Stop processing after consuming all the messages + if processed_count == total_messages: + done.set_result(True) + + app = app_factory( + auto_offset_reset="earliest", + on_message_processed=on_message_processed, + ) + sink = DummySink() + + topic = app.topic( + str(uuid.uuid4()), + value_deserializer="str", + config=TopicConfig(num_partitions=3, replication_factor=1), + ) + sdf = app.dataframe(topic) + sdf = sdf.apply(lambda x: x + "_branch") + sdf.apply(lambda x: x + "0").sink(sink) + sdf.apply(lambda x: x + "1").sink(sink) + sdf = sdf.apply(lambda x: x + "2") + sdf.sink(sink) + + key, value, timestamp_ms = b"key", "value", 1000 + headers = [("key", b"value")] + + # Produce messages to different topic partitions and flush + with app.get_producer() as producer: + for i in range(total_messages): + producer.produce( + topic=topic.name, + partition=i, + key=key, + value=value, + timestamp=timestamp_ms, + headers=headers, + ) + + done = Future() + + # Stop app when the future is resolved + executor.submit(_stop_app_on_future, app, done, 15.0) + app.run(sdf) + + # Check that all messages have been processed + assert processed_count == total_messages + + # Ensure all messages were flushed to the sink + assert len(sink.results) == 9 + for i in range(3): + assert ( + len([r for r in sink.results if f"_branch{i}" in r.value]) + == total_messages + ) + for item in sink.results: + assert item.key == key + assert value in item.value + assert item.timestamp == timestamp_ms + assert item.headers == headers + + # Ensure that the offsets are committed + with app.get_consumer() as consumer: + committed0, committed1, committed2 = consumer.committed( + [ + TopicPartition(topic=topic.name, partition=0), + TopicPartition(topic=topic.name, partition=1), + TopicPartition(topic=topic.name, partition=2), + ] + ) + assert committed0.offset == 1 + assert committed1.offset == 1 + assert committed2.offset == 1 + class TestApplicationMultipleSdf: def test_multiple_sdfs( From 287efe196eb2166ac2c8d1d6388355429e737a3b Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Fri, 30 Aug 2024 14:04:41 -0400 Subject: [PATCH 3/4] fix leftover doc stuff --- docs/connectors/sinks/README.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/connectors/sinks/README.md b/docs/connectors/sinks/README.md index 8def229c2..165ba6eba 100644 --- a/docs/connectors/sinks/README.md +++ b/docs/connectors/sinks/README.md @@ -71,11 +71,6 @@ sdf.sink(influx_sink) # do sink as a standalone call, no reassignment sdf = sdf.apply() # continue different operations with another branch... ``` -([learn more: branching](../../advanced/branching.md)) - - - - ## Supported Sinks Currently, Quix Streams provides these sinks out of the box: From d98234239bf147c2b8940091503036c954803d63 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Fri, 30 Aug 2024 14:06:45 -0400 Subject: [PATCH 4/4] typo, wording fix --- docs/connectors/sinks/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/connectors/sinks/README.md b/docs/connectors/sinks/README.md index 165ba6eba..e5f6b5a47 100644 --- a/docs/connectors/sinks/README.md +++ b/docs/connectors/sinks/README.md @@ -31,14 +31,14 @@ sdf.sink(influx_sink) ``` ## Sinks Are Terminal Operations -`StreamingDataFrame.sink()` is special in that its "terminal": +`StreamingDataFrame.sink()` is special in that it's "terminal": **no additional operations can be added to it once called** (with branching, the branch becomes terminal). This is to ensure no further mutations can be applied to the outbound data. _However_, you can continue other operations with other branches, including using -the same `Sink` to push another set of data (with another `SDF.sink()` call). +the same `Sink` to push another value (with another `SDF.sink()` call). [Learn more about _branching_ here](../../advanced/branching.md).