Skip to content

Commit

Permalink
add experimental 'print' function, replace 'update' with 'peek', allo…
Browse files Browse the repository at this point in the history
…w skipping assignment for peek, print, and produce
  • Loading branch information
tim-quix committed Jun 26, 2024
1 parent 835ef27 commit 9db8110
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,29 +241,32 @@ def func(d: dict, state: State):
)
return self.__dataframe_clone__(stream=stream)

def print(self, metadata: bool = False) -> Self:
return self.peek(_print, metadata=metadata)

@overload
def update(self, func: UpdateCallback) -> Self: ...
def peek(self, func: UpdateCallback) -> Self: ...

@overload
def update(
def peek(
self, func: UpdateWithMetadataCallback, *, metadata: Literal[True]
) -> Self: ...

@overload
def update(
def peek(
self, func: UpdateCallbackStateful, *, stateful: Literal[True]
) -> Self: ...

@overload
def update(
def peek(
self,
func: UpdateWithMetadataCallbackStateful,
*,
stateful: Literal[True],
metadata: Literal[True],
) -> Self: ...

def update(
def peek(
self,
func: Union[
UpdateCallback,
Expand Down Expand Up @@ -327,7 +330,7 @@ def func(values: list, state: State):
cast(Union[UpdateCallback, UpdateWithMetadataCallback], func),
metadata=metadata,
)
return self.__dataframe_clone__(stream=stream)
return self.__dataframe_inplace__(stream)

@overload
def filter(self, func: FilterCallback) -> Self: ...
Expand Down Expand Up @@ -570,7 +573,7 @@ def to_topic(
By default, the current message key will be used.
"""
return self.update(
return self.peek(
lambda value, orig_key, timestamp, headers: self._produce(
topic=topic,
value=value,
Expand Down Expand Up @@ -927,6 +930,7 @@ def _produce(
row = Row(
value=value, key=key, timestamp=timestamp, context=ctx, headers=headers
)
print("PRODUCE!!!")
self._producer.produce_row(row=row, topic=topic, key=key, timestamp=timestamp)

def _register_store(self):
Expand Down Expand Up @@ -960,6 +964,10 @@ def _groupby_key(
else:
raise TypeError("group_by 'key' must be callable or string (column name)")

def __dataframe_inplace__(self, stream: Stream):
self._stream = stream
return self

def __dataframe_clone__(
self,
stream: Optional[Stream] = None,
Expand Down Expand Up @@ -1071,6 +1079,15 @@ def wrapper(
return wrapper


import pprint

_PRINT_ARGS = ["value", "key", "timestamp", "headers"]


def _print(*args):
pprint.pprint({_PRINT_ARGS[i]: args[i] for i in range(len(args))}, indent=2)


def _as_stateful(
func: Union[
ApplyWithMetadataCallbackStateful,
Expand Down

0 comments on commit 9db8110

Please sign in to comment.