diff --git a/examples/source/simple_source/example.py b/examples/source/simple_source/example.py index 7f61405a..2dfd65c4 100644 --- a/examples/source/simple_source/example.py +++ b/examples/source/simple_source/example.py @@ -55,7 +55,8 @@ async def ack_handler(self, ack_request: AckRequest): The ack handler is used acknowledge the offsets that have been read, and remove them from the to_ack_set """ - self.to_ack_set.remove(str(ack_request.offset.offset, "utf-8")) + for req in ack_request.offsets: + self.to_ack_set.remove(str(req.offset, "utf-8")) async def pending_handler(self) -> PendingResponse: """ diff --git a/pynumaflow/proto/sinker/sink.proto b/pynumaflow/proto/sinker/sink.proto index 0cb2f69b..71dbb418 100644 --- a/pynumaflow/proto/sinker/sink.proto +++ b/pynumaflow/proto/sinker/sink.proto @@ -78,7 +78,7 @@ message SinkResponse { // err_msg is the error message, set it if success is set to false. string err_msg = 3; } - Result result = 1; + repeated Result results = 1; optional Handshake handshake = 2; optional TransmissionStatus status = 3; } \ No newline at end of file diff --git a/pynumaflow/proto/sinker/sink_pb2.py b/pynumaflow/proto/sinker/sink_pb2.py index 3dcfcd9b..27082a0e 100644 --- a/pynumaflow/proto/sinker/sink_pb2.py +++ b/pynumaflow/proto/sinker/sink_pb2.py @@ -18,7 +18,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\nsink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto"\xa3\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xfd\x01\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08"\xfb\x01\n\x0cSinkResponse\x12,\n\x06result\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x46\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\tB\x0c\n\n_handshakeB\t\n\x07_status*0\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3' + b'\n\nsink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto"\xa3\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xfd\x01\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08"\xfc\x01\n\x0cSinkResponse\x12-\n\x07results\x18\x01 \x03(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x46\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\tB\x0c\n\n_handshakeB\t\n\x07_status*0\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3' ) _globals = globals() @@ -28,8 +28,8 @@ DESCRIPTOR._options = None _globals["_SINKREQUEST_REQUEST_HEADERSENTRY"]._options = None _globals["_SINKREQUEST_REQUEST_HEADERSENTRY"]._serialized_options = b"8\001" - _globals["_STATUS"]._serialized_start = 854 - _globals["_STATUS"]._serialized_end = 902 + _globals["_STATUS"]._serialized_start = 855 + _globals["_STATUS"]._serialized_end = 903 _globals["_SINKREQUEST"]._serialized_start = 86 _globals["_SINKREQUEST"]._serialized_end = 505 _globals["_SINKREQUEST_REQUEST"]._serialized_start = 238 @@ -43,9 +43,9 @@ _globals["_TRANSMISSIONSTATUS"]._serialized_start = 565 _globals["_TRANSMISSIONSTATUS"]._serialized_end = 598 _globals["_SINKRESPONSE"]._serialized_start = 601 - _globals["_SINKRESPONSE"]._serialized_end = 852 - _globals["_SINKRESPONSE_RESULT"]._serialized_start = 757 - _globals["_SINKRESPONSE_RESULT"]._serialized_end = 827 - _globals["_SINK"]._serialized_start = 904 - _globals["_SINK"]._serialized_end = 1028 + _globals["_SINKRESPONSE"]._serialized_end = 853 + _globals["_SINKRESPONSE_RESULT"]._serialized_start = 758 + _globals["_SINKRESPONSE_RESULT"]._serialized_end = 828 + _globals["_SINK"]._serialized_start = 905 + _globals["_SINK"]._serialized_end = 1029 # @@protoc_insertion_point(module_scope) diff --git a/pynumaflow/proto/sinker/sink_pb2.pyi b/pynumaflow/proto/sinker/sink_pb2.pyi index 70b24c22..78926321 100644 --- a/pynumaflow/proto/sinker/sink_pb2.pyi +++ b/pynumaflow/proto/sinker/sink_pb2.pyi @@ -90,7 +90,7 @@ class TransmissionStatus(_message.Message): def __init__(self, eot: bool = ...) -> None: ... class SinkResponse(_message.Message): - __slots__ = ("result", "handshake", "status") + __slots__ = ("results", "handshake", "status") class Result(_message.Message): __slots__ = ("id", "status", "err_msg") @@ -106,15 +106,15 @@ class SinkResponse(_message.Message): status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., ) -> None: ... - RESULT_FIELD_NUMBER: _ClassVar[int] + RESULTS_FIELD_NUMBER: _ClassVar[int] HANDSHAKE_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] - result: SinkResponse.Result + results: _containers.RepeatedCompositeFieldContainer[SinkResponse.Result] handshake: Handshake status: TransmissionStatus def __init__( self, - result: _Optional[_Union[SinkResponse.Result, _Mapping]] = ..., + results: _Optional[_Iterable[_Union[SinkResponse.Result, _Mapping]]] = ..., handshake: _Optional[_Union[Handshake, _Mapping]] = ..., status: _Optional[_Union[TransmissionStatus, _Mapping]] = ..., ) -> None: ... diff --git a/pynumaflow/proto/sourcer/source.proto b/pynumaflow/proto/sourcer/source.proto index 8878ac66..33f73104 100644 --- a/pynumaflow/proto/sourcer/source.proto +++ b/pynumaflow/proto/sourcer/source.proto @@ -110,8 +110,8 @@ message ReadResponse { */ message AckRequest { message Request { - // Required field holding the offset to be acked - Offset offset = 1; + // Required field holding the offsets to be acked + repeated Offset offsets = 1; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. Request request = 1; diff --git a/pynumaflow/proto/sourcer/source_pb2.py b/pynumaflow/proto/sourcer/source_pb2.py index 59e4c181..6cd6f0b2 100644 --- a/pynumaflow/proto/sourcer/source_pb2.py +++ b/pynumaflow/proto/sourcer/source_pb2.py @@ -18,7 +18,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x0csource.proto\x12\tsource.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08"\xb1\x01\n\x0bReadRequest\x12/\n\x07request\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x35\n\x07Request\x12\x13\n\x0bnum_records\x18\x01 \x01(\x04\x12\x15\n\rtimeout_in_ms\x18\x02 \x01(\rB\x0c\n\n_handshake"\x81\x05\n\x0cReadResponse\x12.\n\x06result\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Result\x12.\n\x06status\x18\x02 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Status\x12,\n\thandshake\x18\x03 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\xe8\x01\n\x06Result\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12!\n\x06offset\x18\x02 \x01(\x0b\x32\x11.source.v1.Offset\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04keys\x18\x04 \x03(\t\x12<\n\x07headers\x18\x05 \x03(\x0b\x32+.source.v1.ReadResponse.Result.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xe9\x01\n\x06Status\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\x12\x31\n\x04\x63ode\x18\x02 \x01(\x0e\x32#.source.v1.ReadResponse.Status.Code\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0e\x32$.source.v1.ReadResponse.Status.ErrorH\x00\x88\x01\x01\x12\x10\n\x03msg\x18\x04 \x01(\tH\x01\x88\x01\x01" \n\x04\x43ode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01"\x1f\n\x05\x45rror\x12\x0b\n\x07UNACKED\x10\x00\x12\t\n\x05OTHER\x10\x01\x42\x08\n\x06_errorB\x06\n\x04_msgB\x0c\n\n_handshake"\xa6\x01\n\nAckRequest\x12.\n\x07request\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a,\n\x07Request\x12!\n\x06offset\x18\x01 \x01(\x0b\x32\x11.source.v1.OffsetB\x0c\n\n_handshake"\xab\x01\n\x0b\x41\x63kResponse\x12-\n\x06result\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckResponse.Result\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x31\n\x06Result\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x16.google.protobuf.EmptyB\x0c\n\n_handshake"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08"]\n\x0fPendingResponse\x12\x31\n\x06result\x18\x01 \x01(\x0b\x32!.source.v1.PendingResponse.Result\x1a\x17\n\x06Result\x12\r\n\x05\x63ount\x18\x01 \x01(\x03"h\n\x12PartitionsResponse\x12\x34\n\x06result\x18\x01 \x01(\x0b\x32$.source.v1.PartitionsResponse.Result\x1a\x1c\n\x06Result\x12\x12\n\npartitions\x18\x01 \x03(\x05".\n\x06Offset\x12\x0e\n\x06offset\x18\x01 \x01(\x0c\x12\x14\n\x0cpartition_id\x18\x02 \x01(\x05\x32\xc8\x02\n\x06Source\x12=\n\x06ReadFn\x12\x16.source.v1.ReadRequest\x1a\x17.source.v1.ReadResponse(\x01\x30\x01\x12:\n\x05\x41\x63kFn\x12\x15.source.v1.AckRequest\x1a\x16.source.v1.AckResponse(\x01\x30\x01\x12?\n\tPendingFn\x12\x16.google.protobuf.Empty\x1a\x1a.source.v1.PendingResponse\x12\x45\n\x0cPartitionsFn\x12\x16.google.protobuf.Empty\x1a\x1d.source.v1.PartitionsResponse\x12;\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x18.source.v1.ReadyResponseb\x06proto3' + b'\n\x0csource.proto\x12\tsource.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08"\xb1\x01\n\x0bReadRequest\x12/\n\x07request\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x35\n\x07Request\x12\x13\n\x0bnum_records\x18\x01 \x01(\x04\x12\x15\n\rtimeout_in_ms\x18\x02 \x01(\rB\x0c\n\n_handshake"\x81\x05\n\x0cReadResponse\x12.\n\x06result\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Result\x12.\n\x06status\x18\x02 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Status\x12,\n\thandshake\x18\x03 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\xe8\x01\n\x06Result\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12!\n\x06offset\x18\x02 \x01(\x0b\x32\x11.source.v1.Offset\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04keys\x18\x04 \x03(\t\x12<\n\x07headers\x18\x05 \x03(\x0b\x32+.source.v1.ReadResponse.Result.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xe9\x01\n\x06Status\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\x12\x31\n\x04\x63ode\x18\x02 \x01(\x0e\x32#.source.v1.ReadResponse.Status.Code\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0e\x32$.source.v1.ReadResponse.Status.ErrorH\x00\x88\x01\x01\x12\x10\n\x03msg\x18\x04 \x01(\tH\x01\x88\x01\x01" \n\x04\x43ode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01"\x1f\n\x05\x45rror\x12\x0b\n\x07UNACKED\x10\x00\x12\t\n\x05OTHER\x10\x01\x42\x08\n\x06_errorB\x06\n\x04_msgB\x0c\n\n_handshake"\xa7\x01\n\nAckRequest\x12.\n\x07request\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a-\n\x07Request\x12"\n\x07offsets\x18\x01 \x03(\x0b\x32\x11.source.v1.OffsetB\x0c\n\n_handshake"\xab\x01\n\x0b\x41\x63kResponse\x12-\n\x06result\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckResponse.Result\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x31\n\x06Result\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x16.google.protobuf.EmptyB\x0c\n\n_handshake"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08"]\n\x0fPendingResponse\x12\x31\n\x06result\x18\x01 \x01(\x0b\x32!.source.v1.PendingResponse.Result\x1a\x17\n\x06Result\x12\r\n\x05\x63ount\x18\x01 \x01(\x03"h\n\x12PartitionsResponse\x12\x34\n\x06result\x18\x01 \x01(\x0b\x32$.source.v1.PartitionsResponse.Result\x1a\x1c\n\x06Result\x12\x12\n\npartitions\x18\x01 \x03(\x05".\n\x06Offset\x12\x0e\n\x06offset\x18\x01 \x01(\x0c\x12\x14\n\x0cpartition_id\x18\x02 \x01(\x05\x32\xc8\x02\n\x06Source\x12=\n\x06ReadFn\x12\x16.source.v1.ReadRequest\x1a\x17.source.v1.ReadResponse(\x01\x30\x01\x12:\n\x05\x41\x63kFn\x12\x15.source.v1.AckRequest\x1a\x16.source.v1.AckResponse(\x01\x30\x01\x12?\n\tPendingFn\x12\x16.google.protobuf.Empty\x1a\x1a.source.v1.PendingResponse\x12\x45\n\x0cPartitionsFn\x12\x16.google.protobuf.Empty\x1a\x1d.source.v1.PartitionsResponse\x12;\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x18.source.v1.ReadyResponseb\x06proto3' ) _globals = globals() @@ -47,25 +47,25 @@ _globals["_READRESPONSE_STATUS_ERROR"]._serialized_start = 874 _globals["_READRESPONSE_STATUS_ERROR"]._serialized_end = 905 _globals["_ACKREQUEST"]._serialized_start = 940 - _globals["_ACKREQUEST"]._serialized_end = 1106 + _globals["_ACKREQUEST"]._serialized_end = 1107 _globals["_ACKREQUEST_REQUEST"]._serialized_start = 1048 - _globals["_ACKREQUEST_REQUEST"]._serialized_end = 1092 - _globals["_ACKRESPONSE"]._serialized_start = 1109 - _globals["_ACKRESPONSE"]._serialized_end = 1280 - _globals["_ACKRESPONSE_RESULT"]._serialized_start = 1217 - _globals["_ACKRESPONSE_RESULT"]._serialized_end = 1266 - _globals["_READYRESPONSE"]._serialized_start = 1282 - _globals["_READYRESPONSE"]._serialized_end = 1312 - _globals["_PENDINGRESPONSE"]._serialized_start = 1314 - _globals["_PENDINGRESPONSE"]._serialized_end = 1407 - _globals["_PENDINGRESPONSE_RESULT"]._serialized_start = 1384 - _globals["_PENDINGRESPONSE_RESULT"]._serialized_end = 1407 - _globals["_PARTITIONSRESPONSE"]._serialized_start = 1409 - _globals["_PARTITIONSRESPONSE"]._serialized_end = 1513 - _globals["_PARTITIONSRESPONSE_RESULT"]._serialized_start = 1485 - _globals["_PARTITIONSRESPONSE_RESULT"]._serialized_end = 1513 - _globals["_OFFSET"]._serialized_start = 1515 - _globals["_OFFSET"]._serialized_end = 1561 - _globals["_SOURCE"]._serialized_start = 1564 - _globals["_SOURCE"]._serialized_end = 1892 + _globals["_ACKREQUEST_REQUEST"]._serialized_end = 1093 + _globals["_ACKRESPONSE"]._serialized_start = 1110 + _globals["_ACKRESPONSE"]._serialized_end = 1281 + _globals["_ACKRESPONSE_RESULT"]._serialized_start = 1218 + _globals["_ACKRESPONSE_RESULT"]._serialized_end = 1267 + _globals["_READYRESPONSE"]._serialized_start = 1283 + _globals["_READYRESPONSE"]._serialized_end = 1313 + _globals["_PENDINGRESPONSE"]._serialized_start = 1315 + _globals["_PENDINGRESPONSE"]._serialized_end = 1408 + _globals["_PENDINGRESPONSE_RESULT"]._serialized_start = 1385 + _globals["_PENDINGRESPONSE_RESULT"]._serialized_end = 1408 + _globals["_PARTITIONSRESPONSE"]._serialized_start = 1410 + _globals["_PARTITIONSRESPONSE"]._serialized_end = 1514 + _globals["_PARTITIONSRESPONSE_RESULT"]._serialized_start = 1486 + _globals["_PARTITIONSRESPONSE_RESULT"]._serialized_end = 1514 + _globals["_OFFSET"]._serialized_start = 1516 + _globals["_OFFSET"]._serialized_end = 1562 + _globals["_SOURCE"]._serialized_start = 1565 + _globals["_SOURCE"]._serialized_end = 1893 # @@protoc_insertion_point(module_scope) diff --git a/pynumaflow/proto/sourcer/source_pb2.pyi b/pynumaflow/proto/sourcer/source_pb2.pyi index c47a75c1..f2cdc70e 100644 --- a/pynumaflow/proto/sourcer/source_pb2.pyi +++ b/pynumaflow/proto/sourcer/source_pb2.pyi @@ -122,10 +122,12 @@ class AckRequest(_message.Message): __slots__ = ("request", "handshake") class Request(_message.Message): - __slots__ = ("offset",) - OFFSET_FIELD_NUMBER: _ClassVar[int] - offset: Offset - def __init__(self, offset: _Optional[_Union[Offset, _Mapping]] = ...) -> None: ... + __slots__ = ("offsets",) + OFFSETS_FIELD_NUMBER: _ClassVar[int] + offsets: _containers.RepeatedCompositeFieldContainer[Offset] + def __init__( + self, offsets: _Optional[_Iterable[_Union[Offset, _Mapping]]] = ... + ) -> None: ... REQUEST_FIELD_NUMBER: _ClassVar[int] HANDSHAKE_FIELD_NUMBER: _ClassVar[int] request: AckRequest.Request diff --git a/pynumaflow/sinker/_dtypes.py b/pynumaflow/sinker/_dtypes.py index 1f436a85..c90f1f2e 100644 --- a/pynumaflow/sinker/_dtypes.py +++ b/pynumaflow/sinker/_dtypes.py @@ -215,10 +215,10 @@ def handler(self, datums: Iterator[Datum]) -> Responses: pass -# SyncSinkCallable is a callable which can be used as a handler for the Synchronous UDSink. +# SinkSyncCallable is a callable which can be used as a handler for the Synchronous UDSink. SinkHandlerCallable = Callable[[Iterator[Datum]], Responses] -SyncSinkCallable = Union[Sinker, SinkHandlerCallable] +SinkSyncCallable = Union[Sinker, SinkHandlerCallable] -# AsyncSinkCallable is a callable which can be used as a handler for the Asynchronous UDSink. +# SinkAsyncCallable is a callable which can be used as a handler for the Asynchronous UDSink. AsyncSinkHandlerCallable = Callable[[AsyncIterable[Datum]], Awaitable[Responses]] -AsyncSinkCallable = Union[Sinker, AsyncSinkHandlerCallable] +SinkAsyncCallable = Union[Sinker, AsyncSinkHandlerCallable] diff --git a/pynumaflow/sinker/async_server.py b/pynumaflow/sinker/async_server.py index 336f8cbe..a329e47e 100644 --- a/pynumaflow/sinker/async_server.py +++ b/pynumaflow/sinker/async_server.py @@ -22,7 +22,7 @@ ) from pynumaflow.shared.server import NumaflowServer, start_async_server -from pynumaflow.sinker._dtypes import AsyncSinkCallable +from pynumaflow.sinker._dtypes import SinkAsyncCallable class SinkAsyncServer(NumaflowServer): @@ -75,7 +75,7 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: def __init__( self, - sinker_instance: AsyncSinkCallable, + sinker_instance: SinkAsyncCallable, sock_path=SINK_SOCK_PATH, max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, diff --git a/pynumaflow/sinker/server.py b/pynumaflow/sinker/server.py index 10d3dade..dc3a4788 100644 --- a/pynumaflow/sinker/server.py +++ b/pynumaflow/sinker/server.py @@ -18,7 +18,7 @@ ) from pynumaflow.shared.server import NumaflowServer, sync_server_start -from pynumaflow.sinker._dtypes import SyncSinkCallable +from pynumaflow.sinker._dtypes import SinkSyncCallable class SinkServer(NumaflowServer): @@ -28,7 +28,7 @@ class SinkServer(NumaflowServer): def __init__( self, - sinker_instance: SyncSinkCallable, + sinker_instance: SinkSyncCallable, sock_path=SINK_SOCK_PATH, max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, diff --git a/pynumaflow/sinker/servicer/async_servicer.py b/pynumaflow/sinker/servicer/async_servicer.py index 96d6a62f..df9b6e7f 100644 --- a/pynumaflow/sinker/servicer/async_servicer.py +++ b/pynumaflow/sinker/servicer/async_servicer.py @@ -5,7 +5,7 @@ from pynumaflow.shared.asynciter import NonBlockingIterator from pynumaflow.shared.server import exit_on_error -from pynumaflow.sinker._dtypes import Datum, AsyncSinkCallable +from pynumaflow.sinker._dtypes import Datum, SinkAsyncCallable from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2 from pynumaflow.sinker.servicer.utils import ( datum_from_sink_req, @@ -25,10 +25,10 @@ class AsyncSinkServicer(sink_pb2_grpc.SinkServicer): def __init__( self, - handler: AsyncSinkCallable, + handler: SinkAsyncCallable, ): self.background_tasks = set() - self.__sink_handler: AsyncSinkCallable = handler + self.__sink_handler: SinkAsyncCallable = handler self.cleanup_coroutines = [] async def SinkFn( @@ -72,8 +72,7 @@ async def SinkFn( await req_queue.put(STREAM_EOF) await cur_task ret = cur_task.result() - for r in ret: - yield sink_pb2.SinkResponse(result=r) + yield sink_pb2.SinkResponse(results=ret) # send EOT after each finishing sink responses yield sink_pb2.SinkResponse(status=sink_pb2.TransmissionStatus(eot=True)) cur_task = None diff --git a/pynumaflow/sinker/servicer/sync_servicer.py b/pynumaflow/sinker/servicer/sync_servicer.py index 629ad96b..b3fc669a 100644 --- a/pynumaflow/sinker/servicer/sync_servicer.py +++ b/pynumaflow/sinker/servicer/sync_servicer.py @@ -6,7 +6,7 @@ from pynumaflow.shared.server import exit_on_error from pynumaflow.shared.synciter import SyncIterator from pynumaflow.shared.thread_with_return import ThreadWithReturnValue -from pynumaflow.sinker._dtypes import SyncSinkCallable +from pynumaflow.sinker._dtypes import SinkSyncCallable from pynumaflow.sinker.servicer.utils import ( datum_from_sink_req, _create_read_handshake_response, @@ -22,8 +22,8 @@ class SyncSinkServicer(sink_pb2_grpc.SinkServicer): Provides the functionality for the required rpc methods. """ - def __init__(self, handler: SyncSinkCallable): - self.handler: SyncSinkCallable = handler + def __init__(self, handler: SinkSyncCallable): + self.handler: SinkSyncCallable = handler def SinkFn( self, request_iterator: Iterable[sink_pb2.SinkRequest], context: NumaflowServicerContext @@ -65,8 +65,7 @@ def SinkFn( if d.status and d.status.eot: req_queue.put(STREAM_EOF) ret = cur_task.join() - for resp in ret: - yield sink_pb2.SinkResponse(result=resp) + yield sink_pb2.SinkResponse(results=ret) # send EOT after each finishing sink responses yield sink_pb2.SinkResponse(status=sink_pb2.TransmissionStatus(eot=True)) cur_task = None diff --git a/pynumaflow/sinker/servicer/utils.py b/pynumaflow/sinker/servicer/utils.py index ddd6b863..e3d648c2 100644 --- a/pynumaflow/sinker/servicer/utils.py +++ b/pynumaflow/sinker/servicer/utils.py @@ -66,6 +66,5 @@ def _create_read_handshake_response() -> sink_pb2.SinkResponse: sink_pb2.SinkResponse: A SinkResponse object indicating a successful handshake. """ return sink_pb2.SinkResponse( - result=sink_pb2.SinkResponse.Result(status=sink_pb2.SUCCESS), handshake=sink_pb2.Handshake(sot=True), ) diff --git a/pynumaflow/sourcer/_dtypes.py b/pynumaflow/sourcer/_dtypes.py index 8d7a7dac..b2b34810 100644 --- a/pynumaflow/sourcer/_dtypes.py +++ b/pynumaflow/sourcer/_dtypes.py @@ -149,25 +149,25 @@ def timeout_in_ms(self) -> int: class AckRequest: """ Class for defining the request for acknowledging datum. - It takes an offset that need to be acknowledged. + It takes a list of offsets that need to be acknowledged. Args: - offset: the offset to be acknowledged. + offsets: the offsets to be acknowledged. >>> # Example usage >>> from pynumaflow.sourcer import AckRequest, Offset - >>> offset = Offset(offset=b"123", partition_id="0") - >>> ack_request = AckRequest(offsets=[offset, offset]) + >>> offset_val = Offset(offset=b"123", partition_id=0) + >>> ack_request = AckRequest(offsets=[offset_val, offset_val]) """ - __slots__ = ("_offset",) - _offset: Offset + __slots__ = ("_offsets",) + _offsets: list[Offset] - def __init__(self, offset: Offset): - self._offset = offset + def __init__(self, offsets: list[Offset]): + self._offsets = offsets @property - def offset(self) -> Offset: + def offsets(self) -> list[Offset]: """Returns the offsets to be acknowledged.""" - return self._offset + return self._offsets @dataclass(init=False) diff --git a/pynumaflow/sourcer/servicer/async_servicer.py b/pynumaflow/sourcer/servicer/async_servicer.py index e0d9c61d..31c00f21 100644 --- a/pynumaflow/sourcer/servicer/async_servicer.py +++ b/pynumaflow/sourcer/servicer/async_servicer.py @@ -6,7 +6,7 @@ from pynumaflow.shared.asynciter import NonBlockingIterator from pynumaflow.shared.server import exit_on_error, handle_async_error -from pynumaflow.sourcer._dtypes import ReadRequest +from pynumaflow.sourcer._dtypes import ReadRequest, Offset from pynumaflow.sourcer._dtypes import AckRequest, SourceCallable from pynumaflow.proto.sourcer import source_pb2 from pynumaflow.proto.sourcer import source_pb2_grpc @@ -155,7 +155,13 @@ async def AckFn( # process the incoming Ack requests async for req in request_iterator: - await self.__source_ack_handler(AckRequest(req.request.offset)) + # proto repeated field(offsets) is of + # type google._upb._message.RepeatedScalarContainer + # we need to explicitly convert it to list + offsets = [ + Offset(offset.offset, offset.partition_id) for offset in req.request.offsets + ] + await self.__source_ack_handler(AckRequest(offsets=offsets)) yield _create_ack_response() except BaseException as err: _LOGGER.critical("User-Defined Source AckFn error", exc_info=True) diff --git a/tests/sink/test_async_sink.py b/tests/sink/test_async_sink.py index 14cc2008..af29932c 100644 --- a/tests/sink/test_async_sink.py +++ b/tests/sink/test_async_sink.py @@ -149,18 +149,23 @@ def test_sink(self) -> None: handshake = next(generator_response) # assert that handshake response is received. self.assertTrue(handshake.handshake.sot) + data_resp = [] for r in generator_response: data_resp.append(r) + + # 1 sink data response + 1 EOT response + self.assertEqual(2, len(data_resp)) + idx = 0 - while idx < len(data_resp) - 1: - # capture the output from the SinkFn generator and assert. - self.assertEqual(data_resp[idx].result.status, sink_pb2.Status.SUCCESS) + # capture the output from the SinkFn generator and assert. + for resp in data_resp[0].results: + self.assertEqual(resp.id, str(idx)) + self.assertEqual(resp.status, sink_pb2.Status.SUCCESS) idx += 1 # EOT Response - self.assertEqual(data_resp[len(data_resp) - 1].status.eot, True) - # 10 sink responses + 1 EOT response - self.assertEqual(11, len(data_resp)) + self.assertEqual(data_resp[1].status.eot, True) + except grpc.RpcError as e: logging.error(e) grpc_exception = e @@ -214,19 +219,23 @@ def test_sink_fallback(self) -> None: handshake = next(generator_response) # assert that handshake response is received. self.assertTrue(handshake.handshake.sot) + data_resp = [] for r in generator_response: data_resp.append(r) + # 1 sink data response + 1 EOT response + self.assertEqual(2, len(data_resp)) + idx = 0 - while idx < len(data_resp) - 1: - # capture the output from the SinkFn generator and assert. - self.assertEqual(data_resp[idx].result.status, sink_pb2.Status.FALLBACK) + # capture the output from the SinkFn generator and assert. + for resp in data_resp[0].results: + self.assertEqual(resp.id, str(idx)) + self.assertEqual(resp.status, sink_pb2.Status.FALLBACK) idx += 1 # EOT Response - self.assertEqual(data_resp[len(data_resp) - 1].status.eot, True) - # 10 sink responses + 1 EOT response - self.assertEqual(11, len(data_resp)) + self.assertEqual(data_resp[1].status.eot, True) + except grpc.RpcError as e: logging.error(e) diff --git a/tests/sink/test_server.py b/tests/sink/test_server.py index 1382b335..b67145c0 100644 --- a/tests/sink/test_server.py +++ b/tests/sink/test_server.py @@ -234,21 +234,21 @@ def test_forward_message(self): if "No more responses!" in err.__str__(): break - # 1 handshake + 2 data messages + 1 EOT - self.assertEqual(4, len(responses)) + # 1 handshake + 1 data messages + 1 EOT + self.assertEqual(3, len(responses)) # first message should be handshake response self.assertTrue(responses[0].handshake.sot) # assert the values for the corresponding messages - self.assertEqual("test_id_0", responses[1].result.id) - self.assertEqual("test_id_1", responses[2].result.id) - self.assertEqual(responses[1].result.status, sink_pb2.Status.SUCCESS) - self.assertEqual(responses[2].result.status, sink_pb2.Status.FAILURE) - self.assertEqual("", responses[1].result.err_msg) - self.assertEqual("mock sink message error", responses[2].result.err_msg) + self.assertEqual("test_id_0", responses[1].results[0].id) + self.assertEqual("test_id_1", responses[1].results[1].id) + self.assertEqual(responses[1].results[0].status, sink_pb2.Status.SUCCESS) + self.assertEqual(responses[1].results[1].status, sink_pb2.Status.FAILURE) + self.assertEqual("", responses[1].results[0].err_msg) + self.assertEqual("mock sink message error", responses[1].results[1].err_msg) # last message should be EOT response - self.assertTrue(responses[3].status.eot) + self.assertTrue(responses[2].status.eot) _, code, _ = method.termination() self.assertEqual(code, StatusCode.OK) diff --git a/tests/source/utils.py b/tests/source/utils.py index 0da5f4e9..5a63683d 100644 --- a/tests/source/utils.py +++ b/tests/source/utils.py @@ -72,7 +72,7 @@ def read_req_source_fn() -> ReadRequest: def ack_req_source_fn(): msg = source_pb2.Offset(offset=mock_offset().offset, partition_id=mock_offset().partition_id) - request = source_pb2.AckRequest.Request(offset=msg) + request = source_pb2.AckRequest.Request(offsets=[msg]) return request