Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/source/simple_source/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ async def ack_handler(self, ack_request: AckRequest):
The ack handler is used acknowledge the offsets that have been read, and remove them
from the to_ack_set
"""
self.to_ack_set.remove(str(ack_request.offset.offset, "utf-8"))
for req in ack_request.offsets:
self.to_ack_set.remove(str(req.offset, "utf-8"))

async def pending_handler(self) -> PendingResponse:
"""
Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/proto/sinker/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Result result = 1;
repeated Result results = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
16 changes: 8 additions & 8 deletions pynumaflow/proto/sinker/sink_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pynumaflow/proto/sinker/sink_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class TransmissionStatus(_message.Message):
def __init__(self, eot: bool = ...) -> None: ...

class SinkResponse(_message.Message):
__slots__ = ("result", "handshake", "status")
__slots__ = ("results", "handshake", "status")

class Result(_message.Message):
__slots__ = ("id", "status", "err_msg")
Expand All @@ -106,15 +106,15 @@ class SinkResponse(_message.Message):
status: _Optional[_Union[Status, str]] = ...,
err_msg: _Optional[str] = ...,
) -> None: ...
RESULT_FIELD_NUMBER: _ClassVar[int]
RESULTS_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
result: SinkResponse.Result
results: _containers.RepeatedCompositeFieldContainer[SinkResponse.Result]
handshake: Handshake
status: TransmissionStatus
def __init__(
self,
result: _Optional[_Union[SinkResponse.Result, _Mapping]] = ...,
results: _Optional[_Iterable[_Union[SinkResponse.Result, _Mapping]]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
status: _Optional[_Union[TransmissionStatus, _Mapping]] = ...,
) -> None: ...
4 changes: 2 additions & 2 deletions pynumaflow/proto/sourcer/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ message ReadResponse {
*/
message AckRequest {
message Request {
// Required field holding the offset to be acked
Offset offset = 1;
// Required field holding the offsets to be acked
repeated Offset offsets = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
42 changes: 21 additions & 21 deletions pynumaflow/proto/sourcer/source_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pynumaflow/proto/sourcer/source_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ class AckRequest(_message.Message):
__slots__ = ("request", "handshake")

class Request(_message.Message):
__slots__ = ("offset",)
OFFSET_FIELD_NUMBER: _ClassVar[int]
offset: Offset
def __init__(self, offset: _Optional[_Union[Offset, _Mapping]] = ...) -> None: ...
__slots__ = ("offsets",)
OFFSETS_FIELD_NUMBER: _ClassVar[int]
offsets: _containers.RepeatedCompositeFieldContainer[Offset]
def __init__(
self, offsets: _Optional[_Iterable[_Union[Offset, _Mapping]]] = ...
) -> None: ...
REQUEST_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
request: AckRequest.Request
Expand Down
8 changes: 4 additions & 4 deletions pynumaflow/sinker/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ def handler(self, datums: Iterator[Datum]) -> Responses:
pass


# SyncSinkCallable is a callable which can be used as a handler for the Synchronous UDSink.
# SinkSyncCallable is a callable which can be used as a handler for the Synchronous UDSink.
SinkHandlerCallable = Callable[[Iterator[Datum]], Responses]
SyncSinkCallable = Union[Sinker, SinkHandlerCallable]
SinkSyncCallable = Union[Sinker, SinkHandlerCallable]

# AsyncSinkCallable is a callable which can be used as a handler for the Asynchronous UDSink.
# SinkAsyncCallable is a callable which can be used as a handler for the Asynchronous UDSink.
AsyncSinkHandlerCallable = Callable[[AsyncIterable[Datum]], Awaitable[Responses]]
AsyncSinkCallable = Union[Sinker, AsyncSinkHandlerCallable]
SinkAsyncCallable = Union[Sinker, AsyncSinkHandlerCallable]
4 changes: 2 additions & 2 deletions pynumaflow/sinker/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)

from pynumaflow.shared.server import NumaflowServer, start_async_server
from pynumaflow.sinker._dtypes import AsyncSinkCallable
from pynumaflow.sinker._dtypes import SinkAsyncCallable


class SinkAsyncServer(NumaflowServer):
Expand Down Expand Up @@ -75,7 +75,7 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses:

def __init__(
self,
sinker_instance: AsyncSinkCallable,
sinker_instance: SinkAsyncCallable,
sock_path=SINK_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
Expand Down
4 changes: 2 additions & 2 deletions pynumaflow/sinker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)

from pynumaflow.shared.server import NumaflowServer, sync_server_start
from pynumaflow.sinker._dtypes import SyncSinkCallable
from pynumaflow.sinker._dtypes import SinkSyncCallable


class SinkServer(NumaflowServer):
Expand All @@ -28,7 +28,7 @@ class SinkServer(NumaflowServer):

def __init__(
self,
sinker_instance: SyncSinkCallable,
sinker_instance: SinkSyncCallable,
sock_path=SINK_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
Expand Down
9 changes: 4 additions & 5 deletions pynumaflow/sinker/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pynumaflow.shared.asynciter import NonBlockingIterator

from pynumaflow.shared.server import exit_on_error
from pynumaflow.sinker._dtypes import Datum, AsyncSinkCallable
from pynumaflow.sinker._dtypes import Datum, SinkAsyncCallable
from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2
from pynumaflow.sinker.servicer.utils import (
datum_from_sink_req,
Expand All @@ -25,10 +25,10 @@ class AsyncSinkServicer(sink_pb2_grpc.SinkServicer):

def __init__(
self,
handler: AsyncSinkCallable,
handler: SinkAsyncCallable,
):
self.background_tasks = set()
self.__sink_handler: AsyncSinkCallable = handler
self.__sink_handler: SinkAsyncCallable = handler
self.cleanup_coroutines = []

async def SinkFn(
Expand Down Expand Up @@ -72,8 +72,7 @@ async def SinkFn(
await req_queue.put(STREAM_EOF)
await cur_task
ret = cur_task.result()
for r in ret:
yield sink_pb2.SinkResponse(result=r)
yield sink_pb2.SinkResponse(results=ret)
# send EOT after each finishing sink responses
yield sink_pb2.SinkResponse(status=sink_pb2.TransmissionStatus(eot=True))
cur_task = None
Expand Down
Loading
Loading