Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/


# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk

# Skip wheel
**/pynumaflow_lite-*.whl

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
50 changes: 50 additions & 0 deletions packages/pynumaflow-lite/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[package]
name = "pynumaflow-lite"
version = "0.1.0"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "pynumaflow_lite"
crate-type = ["cdylib", "rlib"]

[dependencies]
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "export-accum-items" }
pyo3 = { version = "0.26.0", features = ["chrono", "experimental-inspect"] }
tokio = "1.47.1"
tonic = "0.14.2"
tokio-stream = "0.1.17"
tower = "0.5.2"
hyper-util = "0.1.16"
prost-types = "0.14.1"
chrono = "0.4.42"
pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
futures-core = "0.3.31"
pin-project = "1.1.10"


[[bin]]
name = "test_map"
path = "tests/bin/map.rs"

[[bin]]
name = "test_batchmap"
path = "tests/bin/batchmap.rs"


[[bin]]
name = "test_mapstream"
path = "tests/bin/mapstream.rs"


[[bin]]
name = "test_reduce"
path = "tests/bin/reduce.rs"

[[bin]]
name = "test_session_reduce"
path = "tests/bin/session_reduce.rs"

[[bin]]
name = "test_accumulator"
path = "tests/bin/accumulator.rs"
44 changes: 44 additions & 0 deletions packages/pynumaflow-lite/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
.PHONY: help build develop stubgen clean test test-rust

# Default Python package/module name
MODULE ?= pynumaflow_lite

# Optional args to pass through to cargo test, e.g., ARGS="--lib pyiterables::tests::py_async_iter_stream_yields_incrementally"
ARGS ?=


help:
@echo "Targets:"
@echo " build - cargo build the Rust library"
@echo " develop - maturin develop (install in current Python env)"
@echo " test - run end-to-end pytest (depends on develop)"
@echo " test-rust - cargo test with PYTHONHOME set; pass args via ARGS=\"...\""

@echo " clean - cargo clean"

build:
cargo build

# Installs the extension into the active Python environment.
# You can then discover the installed .so path to run stubgen against it if preferred.
develop:
maturin develop

# Run pytest end-to-end tests. Assumes a working Python env with pytest installed.
# Example: (cd pynumaflow-lite && make test)
# Note: we do not install pytest here to avoid mutating global envs.
test: develop
pytest -v


# Run cargo tests with PYTHONHOME pointed at base_prefix so embedded CPython finds stdlib
# Usage examples:
# make test-rust ARGS="--lib"
# make test-rust ARGS="--lib pyiterables::tests::py_async_iter_stream_yields_incrementally"
test-rust:
@export PYTHONHOME="$(shell python -c 'import sys; print(sys.base_prefix)')" && \
cargo test $(ARGS)


clean:
cargo clean
37 changes: 37 additions & 0 deletions packages/pynumaflow-lite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
## Development Setup

```bash
# new venv
uv venv

# activate venv
source venv/bin/activate

uv pip install maturin

# install dependencies
uv sync
```

### Testing

```bash
make test
```

### HOWTO create .whl

Go to `pynumaflow-lite` (top level) directory and run the below command.

```bash
docker run --rm -v $(pwd):/io ghcr.io/pyo3/maturin build -i python3.11 --release
```

This will create the `wheel` file in `target/wheels/` directory. You should copy it over to where we
are writing the python code referencing this library.

e.g.,

```bash
cp target/wheels/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl manifests/simple-async-map/
```
39 changes: 39 additions & 0 deletions packages/pynumaflow-lite/manifests/accumulator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM python:3.11-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup"

ENV PATH="$POETRY_HOME/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -sSL https://install.python-poetry.org | python3 -

FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./ ./

# NOTE: place the built wheel in this directory before building the image
RUN pip install $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl

RUN poetry lock
RUN poetry install --no-cache --no-root && \
rm -rf ~/.cache/pypoetry/

CMD ["python", "accumulator_stream_sorter.py"]

37 changes: 37 additions & 0 deletions packages/pynumaflow-lite/manifests/accumulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
To create the `wheel` file, refer [root](../../README.md)

## HOWTO build Image

```bash
docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1 --load
```

Load it now to `k3d`

### `k3d`

```bash
k3d image import quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1
```

### Minikube

```bash
minikube image load quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1
```

#### Delete image from minikube

`minikube` doesn't like pushing the same image over, delete and load if you are using
the same tag.

```bash
minikube image rm quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1
```

## Run the pipeline

```bash
kubectl apply -f pipeline.yaml
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""
Stream sorter accumulator example.

This accumulator buffers incoming data and sorts it by event time,
flushing sorted data when the watermark advances.
"""
import asyncio
from datetime import datetime
from typing import AsyncIterator

from pynumaflow_lite.accumulator import Datum, Message, AccumulatorAsyncServer, Accumulator


class StreamSorter(Accumulator):
"""
A stream sorter that buffers and sorts data by event time,
flushing when watermark advances.
"""

def __init__(self):
from datetime import timezone
# Initialize with a very old timestamp (timezone-aware)
self.latest_wm = datetime.fromtimestamp(-1, tz=timezone.utc)
self.sorted_buffer: list[Datum] = []
print("StreamSorter initialized")

async def handler(self, datums: AsyncIterator[Datum]) -> AsyncIterator[Message]:
"""
Buffer and sort datums, yielding sorted messages when watermark advances.
"""
print("Handler started, waiting for datums...")
datum_count = 0

async for datum in datums:
datum_count += 1
print(f"Received datum #{datum_count}: event_time={datum.event_time}, "
f"watermark={datum.watermark}, value={datum.value}")

# If watermark has moved forward
if datum.watermark and datum.watermark > self.latest_wm:
old_wm = self.latest_wm
self.latest_wm = datum.watermark
print(f"Watermark advanced from {old_wm} to {self.latest_wm}")

# Flush buffer
flushed = 0
async for msg in self.flush_buffer():
flushed += 1
yield msg

if flushed > 0:
print(f"Flushed {flushed} messages from buffer")

# Insert into sorted buffer
self.insert_sorted(datum)
print(f"Buffer size: {len(self.sorted_buffer)}")

print(f"Handler finished. Total datums processed: {datum_count}")
print(f"Remaining in buffer: {len(self.sorted_buffer)}")

# Flush any remaining items in the buffer at the end
if self.sorted_buffer:
print("Flushing remaining buffer at end...")
for datum in self.sorted_buffer:
print(f" Flushing: event_time={datum.event_time}, value={datum.value}")
# Use Message.from_datum to preserve all metadata
yield Message.from_datum(datum)
self.sorted_buffer = []

def insert_sorted(self, datum: Datum):
"""Binary insert to keep sorted buffer in order by event_time."""
left, right = 0, len(self.sorted_buffer)
while left < right:
mid = (left + right) // 2
if self.sorted_buffer[mid].event_time > datum.event_time:
right = mid
else:
left = mid + 1
self.sorted_buffer.insert(left, datum)

async def flush_buffer(self) -> AsyncIterator[Message]:
"""Flush all items from buffer that are before or at the watermark."""
i = 0
for datum in self.sorted_buffer:
if datum.event_time > self.latest_wm:
break
print(f" Flushing: event_time={datum.event_time}, value={datum.value}")
# Use Message.from_datum to preserve all metadata (id, headers, event_time, watermark)
yield Message.from_datum(datum)
i += 1

# Remove flushed items
self.sorted_buffer = self.sorted_buffer[i:]


async def main():
"""
Start the accumulator server.
"""
import signal

server = AccumulatorAsyncServer()

# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
try:
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
except (NotImplementedError, RuntimeError):
pass

try:
print("Starting Stream Sorter Accumulator Server...")
await server.start(StreamSorter)
print("Shutting down gracefully...")
except asyncio.CancelledError:
try:
server.stop()
except Exception:
pass
return


# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
import signal
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass

if __name__ == "__main__":
asyncio.run(main())
Loading