Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ lint: format


test:
poetry run pytest tests/
poetry run pytest tests/ -rA


requirements:
Expand Down
47 changes: 37 additions & 10 deletions examples/source/simple_source/example.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid
from datetime import datetime
import logging

from pynumaflow.shared.asynciter import NonBlockingIterator
from pynumaflow.sourcer import (
Expand All @@ -12,21 +13,29 @@
get_default_partitions,
Sourcer,
SourceAsyncServer,
NackRequest,
)

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)


class AsyncSource(Sourcer):
"""
AsyncSource is a class for User Defined Source implementation.
"""

def __init__(self):
"""
to_ack_set: Set to maintain a track of the offsets yet to be acknowledged
read_idx : the offset idx till where the messages have been read
"""
self.to_ack_set = set()
self.read_idx = 0
# The offset idx till where the messages have been read
self.read_idx: int = 0
# Set to maintain a track of the offsets yet to be acknowledged
self.to_ack_set: set[int] = set()
# Set to maintain a track of the offsets that have been negatively acknowledged
self.nacked: set[int] = set()

async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
"""
Expand All @@ -38,25 +47,42 @@ async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
return

for x in range(datum.num_records):
# If there are any nacked offsets, re-deliver them
if self.nacked:
idx = self.nacked.pop()
else:
idx = self.read_idx
self.read_idx += 1
headers = {"x-txn-id": str(uuid.uuid4())}
await output.put(
Message(
payload=str(self.read_idx).encode(),
offset=Offset.offset_with_default_partition_id(str(self.read_idx).encode()),
offset=Offset.offset_with_default_partition_id(str(idx).encode()),
event_time=datetime.now(),
headers=headers,
)
)
self.to_ack_set.add(str(self.read_idx))
self.read_idx += 1
self.to_ack_set.add(idx)

async def ack_handler(self, ack_request: AckRequest):
"""
The ack handler is used acknowledge the offsets that have been read, and remove them
from the to_ack_set
"""
for req in ack_request.offsets:
self.to_ack_set.remove(str(req.offset, "utf-8"))
offset = int(req.offset)
self.to_ack_set.remove(offset)

async def nack_handler(self, ack_request: NackRequest):
"""
Add the offsets that have been negatively acknowledged to the nacked set
"""

for req in ack_request.offsets:
offset = int(req.offset)
self.to_ack_set.remove(offset)
self.nacked.add(offset)
logger.info("Negatively acknowledged offsets: %s", self.nacked)

async def pending_handler(self) -> PendingResponse:
"""
Expand All @@ -74,4 +100,5 @@ async def partitions_handler(self) -> PartitionsResponse:
if __name__ == "__main__":
ud_source = AsyncSource()
grpc_server = SourceAsyncServer(ud_source)
logger.info("Starting grpc server")
grpc_server.start()
22 changes: 22 additions & 0 deletions pynumaflow/proto/sourcer/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ service Source {
// Clients sends n requests and expects n responses.
rpc AckFn(stream AckRequest) returns (stream AckResponse);

// NackFn negatively acknowledges a batch of offsets. Invoked during a critical error in the monovertex or pipeline.
// Unlike AckFn its not a streaming rpc because this is only invoked when there is a critical error (error path).
rpc NackFn(NackRequest) returns (NackResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);

Expand Down Expand Up @@ -139,6 +143,24 @@ message AckResponse {
optional Handshake handshake = 2;
}

message NackRequest {
message Request {
// Required field holding the offset to be nacked
repeated Offset offsets = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
}

message NackResponse {
message Result {
// Required field indicating the nack request is successful.
google.protobuf.Empty success = 1;
}
// Required field holding the result.
Result result = 1;
}

/*
* ReadyResponse is the health check result for user defined source.
*/
Expand Down
38 changes: 23 additions & 15 deletions pynumaflow/proto/sourcer/source_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions pynumaflow/proto/sourcer/source_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,28 @@ class AckResponse(_message.Message):
handshake: Handshake
def __init__(self, result: _Optional[_Union[AckResponse.Result, _Mapping]] = ..., handshake: _Optional[_Union[Handshake, _Mapping]] = ...) -> None: ...

class NackRequest(_message.Message):
__slots__ = ("request",)
class Request(_message.Message):
__slots__ = ("offsets",)
OFFSETS_FIELD_NUMBER: _ClassVar[int]
offsets: _containers.RepeatedCompositeFieldContainer[Offset]
def __init__(self, offsets: _Optional[_Iterable[_Union[Offset, _Mapping]]] = ...) -> None: ...
REQUEST_FIELD_NUMBER: _ClassVar[int]
request: NackRequest.Request
def __init__(self, request: _Optional[_Union[NackRequest.Request, _Mapping]] = ...) -> None: ...

class NackResponse(_message.Message):
__slots__ = ("result",)
class Result(_message.Message):
__slots__ = ("success",)
SUCCESS_FIELD_NUMBER: _ClassVar[int]
success: _empty_pb2.Empty
def __init__(self, success: _Optional[_Union[_empty_pb2.Empty, _Mapping]] = ...) -> None: ...
RESULT_FIELD_NUMBER: _ClassVar[int]
result: NackResponse.Result
def __init__(self, result: _Optional[_Union[NackResponse.Result, _Mapping]] = ...) -> None: ...

class ReadyResponse(_message.Message):
__slots__ = ("ready",)
READY_FIELD_NUMBER: _ClassVar[int]
Expand Down
45 changes: 45 additions & 0 deletions pynumaflow/proto/sourcer/source_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def __init__(self, channel):
request_serializer=source__pb2.AckRequest.SerializeToString,
response_deserializer=source__pb2.AckResponse.FromString,
_registered_method=True)
self.NackFn = channel.unary_unary(
'/source.v1.Source/NackFn',
request_serializer=source__pb2.NackRequest.SerializeToString,
response_deserializer=source__pb2.NackResponse.FromString,
_registered_method=True)
self.PendingFn = channel.unary_unary(
'/source.v1.Source/PendingFn',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
Expand Down Expand Up @@ -88,6 +93,14 @@ def AckFn(self, request_iterator, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def NackFn(self, request, context):
"""NackFn negatively acknowledges a batch of offsets. Invoked during a critical error in the monovertex or pipeline.
Unlike AckFn its not a streaming rpc because this is only invoked when there is a critical error (error path).
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def PendingFn(self, request, context):
"""PendingFn returns the number of pending records at the user defined source.
"""
Expand Down Expand Up @@ -122,6 +135,11 @@ def add_SourceServicer_to_server(servicer, server):
request_deserializer=source__pb2.AckRequest.FromString,
response_serializer=source__pb2.AckResponse.SerializeToString,
),
'NackFn': grpc.unary_unary_rpc_method_handler(
servicer.NackFn,
request_deserializer=source__pb2.NackRequest.FromString,
response_serializer=source__pb2.NackResponse.SerializeToString,
),
'PendingFn': grpc.unary_unary_rpc_method_handler(
servicer.PendingFn,
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
Expand Down Expand Up @@ -202,6 +220,33 @@ def AckFn(request_iterator,
metadata,
_registered_method=True)

@staticmethod
def NackFn(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/source.v1.Source/NackFn',
source__pb2.NackRequest.SerializeToString,
source__pb2.NackResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def PendingFn(request,
target,
Expand Down
4 changes: 4 additions & 0 deletions pynumaflow/sourcer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
ReadRequest,
PendingResponse,
AckRequest,
NackRequest,
Offset,
PartitionsResponse,
get_default_partitions,
Sourcer,
SourceCallable,
)
from pynumaflow.sourcer.async_server import SourceAsyncServer

Expand All @@ -15,9 +17,11 @@
"ReadRequest",
"PendingResponse",
"AckRequest",
"NackRequest",
"Offset",
"PartitionsResponse",
"get_default_partitions",
"Sourcer",
"SourceAsyncServer",
"SourceCallable",
]
Loading