Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ ignore:
- "pynumaflow/function/generated/udfunction_pb2.py"
- "pynumaflow/function/_udfunction_pb2.pyi"
- "pynumaflow/function/generated/udfunction_pb2_grpc.py"
- "pynumaflow/sink/generated/udsink_pb2.py"
- "pynumaflow/sink/_udsink_pb2.pyi"
- "pynumaflow/sink/generated/udsink_pb2_grpc.py"
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ and [UDSinks](https://numaproj.github.io/numaflow/sinks/user-defined-sinks/) in

```python

from pynumaflow.function import Messages, Message, Datum
from pynumaflow.function.server import UserDefinedFunctionServicer
from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer


def map_handler(key: str, datum: Datum) -> Messages:
Expand Down
3 changes: 1 addition & 2 deletions examples/function/forward_message/example.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from pynumaflow.function import Messages, Message, Datum
from pynumaflow.function.server import UserDefinedFunctionServicer
from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer


def map_handler(key: str, datum: Datum) -> Messages:
Expand Down
13 changes: 6 additions & 7 deletions examples/sink/simplesink/example.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from typing import List
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer

from pynumaflow.sink import Message, Responses, Response, HTTPSinkHandler


def udsink_handler(messages: List[Message], __) -> Responses:
def udsink_handler(datums: List[Datum], __) -> Responses:
responses = Responses()
for msg in messages:
print("Msg", msg)
for msg in datums:
print("User Defined Sink", msg)
responses.append(Response.as_success(msg.id))
return responses


if __name__ == "__main__":
handler = HTTPSinkHandler(udsink_handler)
handler.start()
grpc_server = UserDefinedSinkServicer(udsink_handler)
grpc_server.start()
7 changes: 0 additions & 7 deletions pynumaflow/_tools.py

This file was deleted.

19 changes: 0 additions & 19 deletions pynumaflow/exceptions.py

This file was deleted.

18 changes: 8 additions & 10 deletions pynumaflow/function/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
FUNCTION_SOCK_PATH,
DATUM_KEY,
)
from pynumaflow.function import Messages
from pynumaflow.function._dtypes import Datum
from pynumaflow.function import Messages, Datum
from pynumaflow.types import NumaflowServicerContext

if environ.get("PYTHONDEBUG"):
Expand All @@ -36,8 +35,7 @@ class UserDefinedFunctionServicer(udfunction_pb2_grpc.UserDefinedFunctionService
sock_path: Path to the UNIX Domain Socket

Example invocation:
>>> from pynumaflow.function import Messages, Message, Datum
>>> from pynumaflow.function.server import UserDefinedFunctionServicer
>>> from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer
>>> def map_handler(key: str, datum: Datum) -> Messages:
... val = datum.value
... _ = datum.event_time
Expand All @@ -59,7 +57,7 @@ def MapFn(
) -> udfunction_pb2.DatumList:
"""
Applies a function to each datum element.
The camel case function name comes from the generated udfunction_pb2_grpc.py file.
The pascal case function name comes from the generated udfunction_pb2_grpc.py file.
"""
key = ""
for metadata_key, metadata_value in context.invocation_metadata():
Expand All @@ -75,18 +73,18 @@ def MapFn(
),
)

datum_list = []
datums = []
for msg in msgs.items():
datum_list.append(udfunction_pb2.Datum(key=msg.key, value=msg.value))
datums.append(udfunction_pb2.Datum(key=msg.key, value=msg.value))

return udfunction_pb2.DatumList(elements=datum_list)
return udfunction_pb2.DatumList(elements=datums)

def ReduceFn(
self, request_iterator: Iterator[Datum], context: NumaflowServicerContext
) -> udfunction_pb2.DatumList:
"""
Applies a reduce function to a datum stream.
The camel case function name comes from the generated udfunction_pb2_grpc.py file.
The pascal case function name comes from the generated udfunction_pb2_grpc.py file.
"""
# TODO: implement Reduce function
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
Expand All @@ -98,7 +96,7 @@ def IsReady(
) -> udfunction_pb2.ReadyResponse:
"""
IsReady is the heartbeat endpoint for gRPC.
The camel case function name comes from the generated udfunction_pb2_grpc.py file.
The pascal case function name comes from the generated udfunction_pb2_grpc.py file.
"""
return udfunction_pb2.ReadyResponse(ready=True)

Expand Down
6 changes: 3 additions & 3 deletions pynumaflow/sink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pynumaflow.sink.handler import HTTPSinkHandler
from pynumaflow.sink._dtypes import Message, Response, Responses
from pynumaflow.sink._dtypes import Response, Responses, Datum
from pynumaflow.sink.server import UserDefinedSinkServicer

__all__ = ["HTTPSinkHandler", "Message", "Response", "Responses"]
__all__ = ["Response", "Responses", "Datum", "UserDefinedSinkServicer"]
77 changes: 58 additions & 19 deletions pynumaflow/sink/_dtypes.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
import json
from dataclasses import dataclass
from datetime import datetime
from typing import List, TypeVar, Type, Optional

import msgpack

from pynumaflow._constants import APPLICATION_JSON, APPLICATION_MSG_PACK
from pynumaflow.sink.encoder import NumaflowJSONEncoder, msgpack_encoding
from pynumaflow.exceptions import MarshalError

R = TypeVar("R", bound="Response")
Rs = TypeVar("Rs", bound="Responses")


@dataclass
class Message:
id: str
payload: bytes


@dataclass
class Response:
id: str
Expand Down Expand Up @@ -49,9 +37,60 @@ def append(self, response: R) -> None:
def items(self) -> List[R]:
return self._responses

def dumps(self, udf_content_type: str) -> str:
if udf_content_type == APPLICATION_JSON:
return json.dumps(self._responses, cls=NumaflowJSONEncoder, separators=(",", ":"))
elif udf_content_type == APPLICATION_MSG_PACK:
return msgpack.dumps(self._responses, default=msgpack_encoding)
raise MarshalError(udf_content_type)
def dumps(self) -> str:
return str(self)


class Datum:
"""
Class to define the important information for the event.
Args:
value: the payload of the event.
event_time: the event time of the event.
watermark: the watermark of the event.
>>> # Example usage
>>> from pynumaflow.function import Datum
>>> from datetime import datetime, timezone
>>> payload = bytes("test_mock_message", encoding="utf-8")
>>> t1 = datetime.fromtimestamp(1662998400, timezone.utc)
>>> t2 = datetime.fromtimestamp(1662998460, timezone.utc)
>>> msg_id = "test_id"
>>> d = Datum(sink_msg_id=msg_id, value=payload, event_time=t1, watermark=t2)
"""

def __init__(self, sink_msg_id: str, value: bytes, event_time: datetime, watermark: datetime):
self._id = sink_msg_id or ""
self._value = value or b""
if not isinstance(event_time, datetime):
raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
self._event_time = event_time
if not isinstance(watermark, datetime):
raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
self._watermark = watermark

def __str__(self):
value_string = self._value.decode("utf-8")
return f"id: {self._id}, value: {value_string}, event_time: {str(self._event_time)}, watermark: {str(self._watermark)}"

def __repr__(self):
return str(self)

@property
def id(self) -> str:
"""Returns the id of the event."""
return self._id

@property
def value(self) -> bytes:
"""Returns the value of the event."""
return self._value

@property
def event_time(self) -> datetime:
"""Returns the event time of the event."""
return self._event_time

@property
def watermark(self) -> datetime:
"""Returns the watermark of the event."""
return self._watermark
79 changes: 79 additions & 0 deletions pynumaflow/sink/_udfunction_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import timestamp_pb2 as _timestamp_pb2
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
from typing import (
ClassVar as _ClassVar,
Mapping as _Mapping,
Optional as _Optional,
Union as _Union,
List,
)

DESCRIPTOR: _descriptor.FileDescriptor

class ReadyResponse(_message.Message):
__slots__ = ["ready"]
READY_FIELD_NUMBER: _ClassVar[int]
ready: bool
def __init__(self, ready: _Optional[bool] = ...) -> None: ...

class EventTime(_message.Message):
__slots__ = ["event_time"]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
event_time: _timestamp_pb2.Timestamp
def __init__(self, event_time: _Optional[_timestamp_pb2.Timestamp] = ...) -> None: ...

class Watermark(_message.Message):
__slots__ = ["watermark"]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
watermark: _timestamp_pb2.Timestamp
def __init__(self, watermark: _Optional[_timestamp_pb2.Timestamp] = ...) -> None: ...

class Datum(_message.Message):
__slots__ = ["key", "value", "event_time", "watermark", "id"]
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
key: str
value: bytes
id: str
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
def __init__(
self,
key: _Optional[str],
value: _Optional[bytes],
id: _Optional[str],
event_time: _Optional[_timestamp_pb2.Timestamp] = ...,
watermark: _Optional[_timestamp_pb2.Timestamp] = ...,
) -> None: ...

class DatumList(_message.Message):
__slots__ = ["elements"]
ELEMENTS_FIELD_NUMBER: _ClassVar[int]
elements: List[Datum]
def __init__(self, elements: _Optional[List[Datum]]) -> None: ...

class Response(_message.Message):
__slots__ = ["id", "success", "err_msg"]
ID_FIELD_NUMBER: _ClassVar[int]
SUCCESS_FIELD_NUMBER: _ClassVar[int]
ERR_MSG_FIELD_NUMBER: _ClassVar[int]
id: str
success: bool
err_msg: str
def __init__(
self,
id: _Optional[str],
success: _Optional[bool],
err_msg: _Optional[str],
) -> None: ...

class ResponseList(_message.Message):
__slots__ = ["responses"]
RESPONSES_FIELD_NUMBER: _ClassVar[int]
responses: List[Response]
def __init__(self, responses: _Optional[List[Response]]) -> None: ...
38 changes: 0 additions & 38 deletions pynumaflow/sink/decoder.py

This file was deleted.

40 changes: 0 additions & 40 deletions pynumaflow/sink/encoder.py

This file was deleted.

Empty file.
Loading