Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/pynumaflow-lite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ path = "tests/bin/session_reduce.rs"
name = "test_accumulator"
path = "tests/bin/accumulator.rs"

[[bin]]
name = "test_reducestream"
path = "tests/bin/reducestream.rs"

[[bin]]
name = "test_sink"
path = "tests/bin/sink.rs"
Expand Down
16 changes: 14 additions & 2 deletions packages/pynumaflow-lite/pynumaflow_lite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
except Exception: # pragma: no cover
session_reducer = None

try:
reducestreamer = _import_module(__name__ + ".reducestreamer")
except Exception: # pragma: no cover
reducestreamer = None

try:
accumulator = _import_module(__name__ + ".accumulator")
except Exception: # pragma: no cover
Expand All @@ -43,12 +48,13 @@
except Exception: # pragma: no cover
sourcer = None

# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, Accumulator, Sinker, and Sourcer classes under the extension submodules for convenient access
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, ReduceStreamer, Accumulator, Sinker, and Sourcer classes under the extension submodules for convenient access
from ._map_dtypes import Mapper
from ._batchmapper_dtypes import BatchMapper
from ._mapstream_dtypes import MapStreamer
from ._reduce_dtypes import Reducer
from ._session_reduce_dtypes import SessionReducer
from ._reducestreamer_dtypes import ReduceStreamer
from ._accumulator_dtypes import Accumulator
from ._sink_dtypes import Sinker
from ._source_dtypes import Sourcer
Expand Down Expand Up @@ -83,6 +89,12 @@
except Exception:
pass

if reducestreamer is not None:
try:
setattr(reducestreamer, "ReduceStreamer", ReduceStreamer)
except Exception:
pass

if accumulator is not None:
try:
setattr(accumulator, "Accumulator", Accumulator)
Expand All @@ -102,7 +114,7 @@
pass

# Public API
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "accumulator", "sinker", "sourcer"]
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator", "sinker", "sourcer"]

__doc__ = pynumaflow_lite.__doc__
if hasattr(pynumaflow_lite, "__all__"):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from abc import ABCMeta, abstractmethod
from pynumaflow_lite.reducestreamer import Datum, Message, Metadata
from collections.abc import AsyncIterable, AsyncIterator


class ReduceStreamer(metaclass=ABCMeta):
"""
Interface for reduce streaming handlers. A new instance will be created per window.

Unlike regular Reducer which returns all messages at once, ReduceStreamer
allows you to yield messages incrementally as an async iterator.
"""

def __call__(self, *args, **kwargs):
return self.handler(*args, **kwargs)

@abstractmethod
async def handler(
self,
keys: list[str],
datums: AsyncIterable[Datum],
md: Metadata
) -> AsyncIterator[Message]:
"""
Implement this handler; consume `datums` async iterable and yield Messages incrementally.

Args:
keys: List of keys for this window
datums: An async iterator of Datum objects
md: Metadata containing window information

Yields:
Message objects to be sent to the next vertex
"""
pass

78 changes: 78 additions & 0 deletions packages/pynumaflow-lite/pynumaflow_lite/reducestreamer.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

import datetime as _dt
from typing import Optional, List, Dict, Awaitable, AsyncIterator

# Re-export the Python ABC for user convenience and typing
from ._reducestreamer_dtypes import ReduceStreamer as ReduceStreamer


class Message:
keys: Optional[List[str]]
value: bytes
tags: Optional[List[str]]

def __init__(
self,
value: bytes,
keys: Optional[List[str]] = ...,
tags: Optional[List[str]] = ...,
) -> None: ...

@staticmethod
def message_to_drop() -> Message: ...


class Datum:
keys: List[str]
value: bytes
watermark: _dt.datetime
eventtime: _dt.datetime
headers: Dict[str, str]

def __repr__(self) -> str: ...

def __str__(self) -> str: ...


class IntervalWindow:
start: _dt.datetime
end: _dt.datetime


class Metadata:
interval_window: IntervalWindow


class PyAsyncDatumStream:
"""
Python-visible async iterator that yields Datum items from a Tokio mpsc channel.
"""

def __init__(self) -> None: ...
def __aiter__(self) -> PyAsyncDatumStream: ...
def __anext__(self) -> Datum: ...


class ReduceStreamAsyncServer:
def __init__(
self,
sock_file: str = ...,
info_file: str = ...,
) -> None: ...

def start(self, py_creator: type, init_args: tuple | None = ...) -> Awaitable[None]: ...

def stop(self) -> None: ...


__all__ = [
"Message",
"Datum",
"IntervalWindow",
"Metadata",
"PyAsyncDatumStream",
"ReduceStreamAsyncServer",
"ReduceStreamer",
]

18 changes: 18 additions & 0 deletions packages/pynumaflow-lite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod mapstream;
pub mod pyiterables;
pub mod pyrs;
pub mod reduce;
pub mod reducestream;
pub mod session_reduce;
pub mod sink;
pub mod source;
Expand Down Expand Up @@ -46,6 +47,13 @@ fn session_reducer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
Ok(())
}

/// Submodule: pynumaflow_lite.reducestreamer
#[pymodule]
fn reducestreamer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
crate::reducestream::populate_py_module(m)?;
Ok(())
}

/// Submodule: pynumaflow_lite.accumulator
#[pymodule]
fn accumulator(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
Expand Down Expand Up @@ -76,6 +84,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(pyo3::wrap_pymodule!(mapstreamer))?;
m.add_wrapped(pyo3::wrap_pymodule!(reducer))?;
m.add_wrapped(pyo3::wrap_pymodule!(session_reducer))?;
m.add_wrapped(pyo3::wrap_pymodule!(reducestreamer))?;
m.add_wrapped(pyo3::wrap_pymodule!(accumulator))?;
m.add_wrapped(pyo3::wrap_pymodule!(sinker))?;
m.add_wrapped(pyo3::wrap_pymodule!(sourcer))?;
Expand Down Expand Up @@ -125,6 +134,15 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
.getattr("modules")?
.set_item(fullname, &sub)?;

// Ensure it's importable as `pynumaflow_lite.reducestreamer` as well
let binding = m.getattr("reducestreamer")?;
let sub = binding.cast::<PyModule>()?;
let fullname = "pynumaflow_lite.reducestreamer";
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;

// Ensure it's importable as `pynumaflow_lite.accumulator` as well
let binding = m.getattr("accumulator")?;
let sub = binding.cast::<PyModule>()?;
Expand Down
74 changes: 74 additions & 0 deletions packages/pynumaflow-lite/src/reducestream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::sync::Mutex;

pub mod server;

use pyo3::prelude::*;

// Re-export types from reduce module that are shared
pub use crate::reduce::{Datum, IntervalWindow, Message, Metadata, PyAsyncDatumStream};

/// Async Reduce Stream Server that can be started from Python code, taking a class (creator).
#[pyclass(module = "pynumaflow_lite.reducestreamer")]
pub struct ReduceStreamAsyncServer {
sock_file: String,
info_file: String,
shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
}

#[pymethods]
impl ReduceStreamAsyncServer {
#[new]
#[pyo3(signature = (sock_file="/var/run/numaflow/reducestream.sock".to_string(), info_file="/var/run/numaflow/reducestreamer-server-info".to_string()))]
fn new(sock_file: String, info_file: String) -> Self {
Self {
sock_file,
info_file,
shutdown_tx: Mutex::new(None),
}
}

/// Start the server with the given Python class (creator).
/// Only class-based reducers are supported (not function-based).
/// init_args is an optional tuple of positional arguments to pass to the class constructor.
#[pyo3(signature = (py_creator, init_args=None))]
pub fn start<'a>(
&self,
py: Python<'a>,
py_creator: Py<PyAny>,
init_args: Option<Py<PyAny>>,
) -> PyResult<Bound<'a, PyAny>> {
let sock_file = self.sock_file.clone();
let info_file = self.info_file.clone();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
{
let mut guard = self.shutdown_tx.lock().unwrap();
*guard = Some(tx);
}

pyo3_async_runtimes::tokio::future_into_py(py, async move {
crate::reducestream::server::start(py_creator, init_args, sock_file, info_file, rx)
.await
.expect("server failed to start");
Ok(())
})
}

/// Trigger server shutdown from Python (idempotent).
pub fn stop(&self) -> PyResult<()> {
if let Some(tx) = self.shutdown_tx.lock().unwrap().take() {
let _ = tx.send(());
}
Ok(())
}
}

/// Helper to populate a PyModule with reduce stream types/functions.
pub(crate) fn populate_py_module(m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<Message>()?;
m.add_class::<Datum>()?;
m.add_class::<IntervalWindow>()?;
m.add_class::<Metadata>()?;
m.add_class::<ReduceStreamAsyncServer>()?;
m.add_class::<PyAsyncDatumStream>()?;
Ok(())
}
Loading