Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/sink/async_log/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
python = ">=3.10,<3.13"
pynumaflow = { path = "../../../"}

[tool.poetry.dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion examples/sink/log/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
python = ">=3.10,<3.13"
pynumaflow = { path = "../../../"}

[tool.poetry.dev-dependencies]
Expand Down
43 changes: 35 additions & 8 deletions pynumaflow/proto/sinker/sink.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

Expand All @@ -7,7 +8,7 @@ package sink.v1;

service Sink {
// SinkFn writes the request to a user defined sink.
rpc SinkFn(stream SinkRequest) returns (SinkResponse);
rpc SinkFn(stream SinkRequest) returns (stream SinkResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
Expand All @@ -17,12 +18,29 @@ service Sink {
* SinkRequest represents a request element.
*/
message SinkRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
string id = 5;
map<string, string> headers = 6;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
string id = 5;
map<string, string> headers = 6;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
TransmissionStatus status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
Expand All @@ -32,6 +50,13 @@ message ReadyResponse {
bool ready = 1;
}

/**
* TransmissionStatus is the status of the transmission.
*/
message TransmissionStatus {
bool eot = 1;
}

/*
* Status is the status of the response.
*/
Expand All @@ -53,5 +78,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
repeated Result results = 1;
Result result = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
38 changes: 22 additions & 16 deletions pynumaflow/proto/sinker/sink_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 63 additions & 29 deletions pynumaflow/proto/sinker/sink_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,72 @@ FAILURE: Status
FALLBACK: Status

class SinkRequest(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "id", "headers")
__slots__ = ("request", "status", "handshake")

class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
class Request(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "id", "headers")

class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
id: str
headers: _containers.ScalarMap[str, str]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
id: str
headers: _containers.ScalarMap[str, str]
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
id: _Optional[str] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
) -> None: ...
REQUEST_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
request: SinkRequest.Request
status: TransmissionStatus
handshake: Handshake
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
id: _Optional[str] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
request: _Optional[_Union[SinkRequest.Request, _Mapping]] = ...,
status: _Optional[_Union[TransmissionStatus, _Mapping]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
) -> None: ...

class Handshake(_message.Message):
__slots__ = ("sot",)
SOT_FIELD_NUMBER: _ClassVar[int]
sot: bool
def __init__(self, sot: bool = ...) -> None: ...

class ReadyResponse(_message.Message):
__slots__ = ("ready",)
READY_FIELD_NUMBER: _ClassVar[int]
ready: bool
def __init__(self, ready: bool = ...) -> None: ...

class TransmissionStatus(_message.Message):
__slots__ = ("eot",)
EOT_FIELD_NUMBER: _ClassVar[int]
eot: bool
def __init__(self, eot: bool = ...) -> None: ...

class SinkResponse(_message.Message):
__slots__ = ("results",)
__slots__ = ("result", "handshake", "status")

class Result(_message.Message):
__slots__ = ("id", "status", "err_msg")
Expand All @@ -79,8 +106,15 @@ class SinkResponse(_message.Message):
status: _Optional[_Union[Status, str]] = ...,
err_msg: _Optional[str] = ...,
) -> None: ...
RESULTS_FIELD_NUMBER: _ClassVar[int]
results: _containers.RepeatedCompositeFieldContainer[SinkResponse.Result]
RESULT_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
result: SinkResponse.Result
handshake: Handshake
status: TransmissionStatus
def __init__(
self, results: _Optional[_Iterable[_Union[SinkResponse.Result, _Mapping]]] = ...
self,
result: _Optional[_Union[SinkResponse.Result, _Mapping]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
status: _Optional[_Union[TransmissionStatus, _Mapping]] = ...,
) -> None: ...
6 changes: 3 additions & 3 deletions pynumaflow/proto/sinker/sink_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, channel):
Args:
channel: A grpc.Channel.
"""
self.SinkFn = channel.stream_unary(
self.SinkFn = channel.stream_stream(
"/sink.v1.Sink/SinkFn",
request_serializer=sink__pb2.SinkRequest.SerializeToString,
response_deserializer=sink__pb2.SinkResponse.FromString,
Expand Down Expand Up @@ -45,7 +45,7 @@ def IsReady(self, request, context):

def add_SinkServicer_to_server(servicer, server):
rpc_method_handlers = {
"SinkFn": grpc.stream_unary_rpc_method_handler(
"SinkFn": grpc.stream_stream_rpc_method_handler(
servicer.SinkFn,
request_deserializer=sink__pb2.SinkRequest.FromString,
response_serializer=sink__pb2.SinkResponse.SerializeToString,
Expand Down Expand Up @@ -77,7 +77,7 @@ def SinkFn(
timeout=None,
metadata=None,
):
return grpc.experimental.stream_unary(
return grpc.experimental.stream_stream(
request_iterator,
target,
"/sink.v1.Sink/SinkFn",
Expand Down
24 changes: 4 additions & 20 deletions pynumaflow/reducestreamer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections.abc import AsyncIterable
from typing import Union

import grpc
from google.protobuf import empty_pb2 as _empty_pb2

from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
Expand All @@ -13,7 +12,7 @@
ReduceRequest,
)
from pynumaflow.reducestreamer.servicer.task_manager import TaskManager
from pynumaflow.shared.server import exit_on_error, handle_error
from pynumaflow.shared.server import handle_async_error
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -95,35 +94,20 @@
async for msg in consumer:
# If the message is an exception, we raise the exception
if isinstance(msg, BaseException):
handle_error(context, msg)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(msg)),
return_exceptions=True,
)
exit_on_error(
err=repr(msg), parent=False, context=context, update_context=False
)
await handle_async_error(context, msg)
return
# Send window EOF response or Window result response
# back to the client
else:
yield msg
except BaseException as e:
handle_error(context, e)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
await handle_async_error(context, e)

Check warning on line 104 in pynumaflow/reducestreamer/servicer/async_servicer.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/reducestreamer/servicer/async_servicer.py#L104

Added line #L104 was not covered by tests
return
# Wait for the process_input_stream task to finish for a clean exit
try:
await producer
except BaseException as e:
handle_error(context, e)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
await handle_async_error(context, e)

Check warning on line 110 in pynumaflow/reducestreamer/servicer/async_servicer.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/reducestreamer/servicer/async_servicer.py#L110

Added line #L110 was not covered by tests
return

async def IsReady(
Expand Down
4 changes: 2 additions & 2 deletions pynumaflow/shared/asynciter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ class NonBlockingIterator:

__slots__ = "_queue"

def __init__(self):
self._queue = asyncio.Queue()
def __init__(self, size=0):
self._queue = asyncio.Queue(maxsize=size)

async def read_iterator(self):
item = await self._queue.get()
Expand Down
17 changes: 16 additions & 1 deletion pynumaflow/shared/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import io
import multiprocessing
Expand Down Expand Up @@ -266,7 +267,10 @@ def exit_on_error(
p.kill()


def handle_error(context: NumaflowServicerContext, e: BaseException):
def update_context_err(context: NumaflowServicerContext, e: BaseException):
"""
Update the context with the error and log the exception.
"""
trace = get_exception_traceback_str(e)
_LOGGER.critical(trace)
_LOGGER.critical(e.__str__())
Expand All @@ -278,3 +282,14 @@ def get_exception_traceback_str(exc) -> str:
file = io.StringIO()
traceback.print_exception(exc, value=exc, tb=exc.__traceback__, file=file)
return file.getvalue().rstrip()


async def handle_async_error(context: NumaflowServicerContext, exception: BaseException):
"""
Handle exceptions for async servers by updating the context and exiting.
"""
update_context_err(context, exception)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(exception)), return_exceptions=True
)
exit_on_error(err=repr(exception), parent=False, context=context, update_context=False)
Loading
Loading