This SDK provides the interface for writing UDFs and UDSinks in Python.
Install the package using pip.
pip install pynumaflow
This project uses Poetry for dependency management and packaging. To build the package locally, run the following command from the root of the project.
make setup
To run unit tests:
make test
To format code style using black and ruff:
make lint
Setup pre-commit hooks:
pre-commit install
from pynumaflow.mapper import Messages, Message, Datum, Mapper
def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
return Messages(Message(value=val, keys=keys))
if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
grpc_server.start()
In addition to the regular Map function, SourceTransformer supports assigning a new event time to the message. SourceTransformer is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
from datetime import datetime
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformer
def transform_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
new_event_time = datetime.now()
_ = datum.watermark
message_t_s = Messages(Message(val, event_time=new_event_time, keys=keys))
return message_t_s
if __name__ == "__main__":
grpc_server = SourceTransformer(handler=transform_handler)
grpc_server.start()
import aiorun
from typing import Iterator, List
from pynumaflow.reducer import Messages, Message, Datum, Metadata, AsyncReducer
async def my_handler(
keys: List[str], datums: Iterator[Datum], md: Metadata
) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
counter += 1
msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys))
if __name__ == "__main__":
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
A sample UDF Dockerfile is provided under examples.
from typing import Iterator
from pynumaflow.sinker import Datum, Responses, Response, Sinker
def my_handler(datums: Iterator[Datum]) -> Responses:
responses = Responses()
for msg in datums:
print("User Defined Sink", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
if __name__ == "__main__":
grpc_server = Sinker(my_handler)
grpc_server.start()
A sample UDSink Dockerfile is provided under examples.