Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,36 @@ Pure Python SDK implementation for Numaflow - [pynumaflow](packages/pynumaflow/R
## `pynumaflow-lite`

Coming shortly (Rust based Python SDK) with better performance

## Example Use Cases
### AsyncIO Reduce Example

Note: This example uses the `asyncio` library to demonstrate how to use the `ExecutorPool` class for parallel processing.

```python
import asyncio
from pynumaflow import ExecutorPool

async def worker(num):
# Simulate some work
await asyncio.sleep(1)
return num * num

async def main():
# Create an ExecutorPool instance
executor_pool = ExecutorPool()

# Submit tasks to the executor pool
tasks = [executor_pool.submit(worker, i) for i in range(10)]

# Wait for all tasks to complete
results = await asyncio.gather(*tasks)

# Print the results
print(results)

# Run the main function
asyncio.run(main())
```

Note: The `ExecutorPool` class is used to manage a pool of worker threads or processes that can be used to execute tasks concurrently. In this example, we create an instance of `ExecutorPool`, submit tasks to it using the `submit` method, and then wait for all tasks to complete using the `gather` function.
10 changes: 10 additions & 0 deletions README.md.bak.20260318031915
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# numaflow-python

Python SDK for Numaflow.

## `pynumaflow`
Pure Python SDK implementation for Numaflow - [pynumaflow](packages/pynumaflow/README.md)

## `pynumaflow-lite`

Coming shortly (Rust based Python SDK) with better performance
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
This accumulator buffers incoming data and sorts it by event time,
flushing sorted data when the watermark advances.
"""

import asyncio
from datetime import datetime
from typing import AsyncIterator

from pynumaflow_lite.accumulator import Datum, Message, AccumulatorAsyncServer, Accumulator
from pynumaflow_lite.accumulator import (
Datum,
Message,
AccumulatorAsyncServer,
Accumulator,
)


class StreamSorter(Accumulator):
Expand All @@ -19,6 +25,7 @@ class StreamSorter(Accumulator):

def __init__(self):
from datetime import timezone

# Initialize with a very old timestamp (timezone-aware)
self.latest_wm = datetime.fromtimestamp(-1, tz=timezone.utc)
self.sorted_buffer: list[Datum] = []
Expand All @@ -33,8 +40,10 @@ async def handler(self, datums: AsyncIterator[Datum]) -> AsyncIterator[Message]:

async for datum in datums:
datum_count += 1
print(f"Received datum #{datum_count}: event_time={datum.event_time}, "
f"watermark={datum.watermark}, value={datum.value}")
print(
f"Received datum #{datum_count}: event_time={datum.event_time}, "
f"watermark={datum.watermark}, value={datum.value}"
)

# If watermark has moved forward
if datum.watermark and datum.watermark > self.latest_wm:
Expand Down Expand Up @@ -123,6 +132,7 @@ async def main():

# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
import signal

signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
Expand Down
10 changes: 8 additions & 2 deletions packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@


class SimpleBatchCat(batchmapper.BatchMapper):
async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.BatchResponses:
async def handler(
self, batch: AsyncIterable[batchmapper.Datum]
) -> batchmapper.BatchResponses:
responses = batchmapper.BatchResponses()
async for d in batch:
resp = batchmapper.BatchResponse(d.id)
Expand All @@ -29,7 +31,11 @@ async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.
pass


async def start(f: Callable[[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]]):
async def start(
f: Callable[
[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]
],
):
server = batchmapper.BatchMapAsyncServer()

# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
Expand Down
4 changes: 1 addition & 3 deletions packages/pynumaflow-lite/manifests/map/map_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@


class SimpleCat(mapper.Mapper):
async def handler(
self, keys: list[str], payload: mapper.Datum
) -> mapper.Messages:
async def handler(self, keys: list[str], payload: mapper.Datum) -> mapper.Messages:

messages = mapper.Messages()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@


class SimpleStreamCat(mapstreamer.MapStreamer):
async def handler(self, keys: list[str], datum: mapstreamer.Datum) -> AsyncIterator[Message]:
async def handler(
self, keys: list[str], datum: mapstreamer.Datum
) -> AsyncIterator[Message]:
parts = datum.value.decode("utf-8").split(",")
if not parts:
yield Message.to_drop()
Expand Down Expand Up @@ -51,4 +53,3 @@ async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Messag
if __name__ == "__main__":
async_handler = SimpleStreamCat()
asyncio.run(start(async_handler))

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ def __init__(self, initial: int = 0) -> None:
self.counter = initial

async def handler(
self, keys: list[str], datums: AsyncIterable[reducer.Datum], md: reducer.Metadata
self,
keys: list[str],
datums: AsyncIterable[reducer.Datum],
md: reducer.Metadata,
) -> reducer.Messages:
iw = md.interval_window
self.counter = 0
Expand Down Expand Up @@ -57,4 +60,3 @@ async def start(creator: type[reducer.Reducer], init_args: tuple):

if __name__ == "__main__":
asyncio.run(start(ReduceCounter, (0,)))

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
The counter increments for each datum and emits a message every 10 items,
plus a final message at the end.
"""

import asyncio
import signal
from collections.abc import AsyncIterable, AsyncIterator
Expand All @@ -17,12 +18,12 @@
class ReduceCounter(reducestreamer.ReduceStreamer):
"""
A reduce streaming counter that emits intermediate results.

This demonstrates the key difference from regular Reducer:
- Regular Reducer: waits for all data, then returns Messages
- ReduceStreamer: yields Message objects incrementally as an async iterator
"""

def __init__(self, initial: int = 0) -> None:
self.counter = initial

Expand All @@ -34,21 +35,21 @@ async def handler(
) -> AsyncIterator[reducestreamer.Message]:
"""
Process datums and yield messages incrementally.

Args:
keys: List of keys for this window
datums: Async iterable of incoming data
md: Metadata containing window information

Yields:
Message objects to send to the next vertex
"""
iw = md.interval_window
print(f"Handler started for keys={keys}, window=[{iw.start}, {iw.end}]")

async for _ in datums:
self.counter += 1

# Emit intermediate result every 10 items
if self.counter % 10 == 0:
msg = (
Expand All @@ -59,7 +60,7 @@ async def handler(
print(f"Yielding intermediate result: counter={self.counter}")
# Early release of data - this is the key feature of reduce streaming!
yield reducestreamer.Message(msg, keys=keys)

# Emit final result
msg = (
f"counter:{self.counter} (FINAL) "
Expand Down Expand Up @@ -105,4 +106,3 @@ async def start(creator: type, init_args: tuple):

if __name__ == "__main__":
asyncio.run(start(ReduceCounter, (0,)))

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, initial: int = 0) -> None:
self.counter = initial

async def session_reduce(
self, keys: list[str], datums: AsyncIterable[session_reducer.Datum]
self, keys: list[str], datums: AsyncIterable[session_reducer.Datum]
) -> AsyncIterator[session_reducer.Message]:
"""
Count all incoming messages in this session and yield the count.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- If MAPPER is set to "true", runs as a Mapper that reads side input files
- Otherwise, runs as a SideInput retriever that broadcasts values
"""

import asyncio
import os
import signal
Expand Down Expand Up @@ -142,4 +143,3 @@ async def start_mapper():
else:
print("Starting as SideInput retriever...")
asyncio.run(start_sideinput())

5 changes: 3 additions & 2 deletions packages/pynumaflow-lite/manifests/sink/sink_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses
pass


async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]]):
async def start(
f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]],
):
server = sinker.SinkAsyncServer()

# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
Expand All @@ -61,4 +63,3 @@ async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Resp
if __name__ == "__main__":
async_handler = SimpleLogSink()
asyncio.run(start(async_handler))

20 changes: 14 additions & 6 deletions packages/pynumaflow-lite/manifests/source/simple_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ def __init__(self):
self.counter = 0
self.partition_idx = 0

async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[sourcer.Message]:
async def read_handler(
self, datum: sourcer.ReadRequest
) -> AsyncIterator[sourcer.Message]:
"""
The simple source generates messages with incrementing numbers.
"""
_LOGGER.info(f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}")
_LOGGER.info(
f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}"
)

# Generate the requested number of messages
for i in range(datum.num_records):
Expand All @@ -36,7 +40,7 @@ async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[source
# Create offset
offset = sourcer.Offset(
offset=str(self.counter).encode("utf-8"),
partition_id=self.partition_idx
partition_id=self.partition_idx,
)

# Create message
Expand All @@ -45,7 +49,7 @@ async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[source
offset=offset,
event_time=datetime.now(timezone.utc),
keys=["key1"],
headers={"source": "simple"}
headers={"source": "simple"},
)

_LOGGER.info(f"Generated message: {self.counter}")
Expand All @@ -62,15 +66,19 @@ async def ack_handler(self, request: sourcer.AckRequest) -> None:
"""
_LOGGER.info(f"Acknowledging {len(request.offsets)} offsets")
for offset in request.offsets:
_LOGGER.debug(f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
_LOGGER.debug(
f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
)

async def nack_handler(self, request: sourcer.NackRequest) -> None:
"""
The simple source negatively acknowledges the offsets.
"""
_LOGGER.info(f"Negatively acknowledging {len(request.offsets)} offsets")
for offset in request.offsets:
_LOGGER.warning(f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
_LOGGER.warning(
f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
)

async def pending_handler(self) -> sourcer.PendingResponse:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
class EventFilter(sourcetransformer.SourceTransformer):
"""
A source transformer that filters and routes messages based on event time.

- Messages before 2022 are dropped
- Messages within 2022 are tagged with "within_year_2022"
- Messages after 2022 are tagged with "after_year_2022"
"""

async def handler(
self, keys: list[str], datum: sourcetransformer.Datum
self, keys: list[str], datum: sourcetransformer.Datum
) -> sourcetransformer.Messages:
val = datum.value
event_time = datum.event_time
Expand All @@ -30,23 +30,27 @@ async def handler(
print(f"Got event time: {event_time}, it is before 2022, so dropping")
messages.append(sourcetransformer.Message.message_to_drop(event_time))
elif event_time < january_first_2023:
print(f"Got event time: {event_time}, it is within year 2022, so forwarding to within_year_2022")
print(
f"Got event time: {event_time}, it is within year 2022, so forwarding to within_year_2022"
)
messages.append(
sourcetransformer.Message(
value=val,
event_time=january_first_2022,
keys=keys,
tags=["within_year_2022"]
tags=["within_year_2022"],
)
)
else:
print(f"Got event time: {event_time}, it is after year 2022, so forwarding to after_year_2022")
print(
f"Got event time: {event_time}, it is after year 2022, so forwarding to after_year_2022"
)
messages.append(
sourcetransformer.Message(
value=val,
event_time=january_first_2023,
keys=keys,
tags=["after_year_2022"]
tags=["after_year_2022"],
)
)

Expand All @@ -61,7 +65,9 @@ async def handler(
pass


async def start(f: Callable[[list[str], sourcetransformer.Datum], sourcetransformer.Messages]):
async def start(
f: Callable[[list[str], sourcetransformer.Datum], sourcetransformer.Messages],
):
server = sourcetransformer.SourceTransformAsyncServer()

# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
Expand Down Expand Up @@ -92,4 +98,3 @@ async def start(f: Callable[[list[str], sourcetransformer.Datum], sourcetransfor
if __name__ == "__main__":
async_handler = EventFilter()
asyncio.run(start(async_handler))

Loading