Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Verb Functions Async #255

Merged
merged 2 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/reactivedataflow/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "reactivedataflow"
version = "0.1.5"
version = "0.1.6"
description = "Reactive Dataflow Graphs"
license = "MIT"
authors = ["Chris Trevino <chtrevin@microsoft.com>"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Outputs Decorator."""

from collections.abc import Callable
from collections.abc import Awaitable, Callable
from typing import Any, ParamSpec, TypeVar, cast

from reactivedataflow.constants import default_output
Expand All @@ -21,7 +21,7 @@
def connect_output(
mode: OutputMode = OutputMode.Value,
output_names: list[str] | None = None,
) -> Callable[[Callable[P, Any]], Callable[P, VerbOutput]]:
) -> Callable[[Callable[P, Awaitable[Any]]], Callable[P, Awaitable[VerbOutput]]]:
"""Decorate an execution function with output conditions.

Args:
Expand All @@ -34,9 +34,9 @@ def connect_output(
if mode == OutputMode.Value and output_names is not None:
raise OutputNamesNotValidInValueOutputModeError

def wrap_fn(fn: Callable[P, Any]) -> Callable[P, VerbOutput]:
def wrapped_fn(*args: P.args, **kwargs: P.kwargs) -> VerbOutput:
result = fn(*args, **kwargs)
def wrap_fn(fn: Callable[P, Awaitable[Any]]) -> Callable[P, Awaitable[VerbOutput]]:
async def wrapped_fn(*args: P.args, **kwargs: P.kwargs) -> VerbOutput:
result = await fn(*args, **kwargs)
match mode:
case OutputMode.Raw:
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def emit_conditions(
"""Conditionally emit to output ports."""

def wrap_fn(fn: VerbFunction) -> VerbFunction:
def wrapped_fn(inputs: VerbInput) -> VerbOutput:
result = fn(inputs)
async def wrapped_fn(inputs: VerbInput) -> VerbOutput:
result = await fn(inputs)
are_conditions_met = all(
condition(inputs, result) for condition in conditions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def fire_conditions(
"""Apply conditional firing conditions to a function."""

def wrap_fn(fn: VerbFunction) -> VerbFunction:
def wrapped_fn(inputs: VerbInput) -> VerbOutput:
async def wrapped_fn(inputs: VerbInput) -> VerbOutput:
are_conditions_met = all(condition(inputs) for condition in conditions)
if not are_conditions_met:
return VerbOutput(no_output=True)
return fn(inputs)
return await fn(inputs)

wrapped_fn.__qualname__ = f"{fn.__qualname__}_wrapfirecond"
return wrapped_fn
Expand Down
3 changes: 0 additions & 3 deletions python/reactivedataflow/reactivedataflow/decorators/verb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def verb(
input_mode: InputMode | None = None,
output_mode: OutputMode | None = None,
output_names: list[str] | None = None,
is_async: bool | None = None,
override: bool | None = None,
include_default_output: bool | None = None,
strict: bool | None = None,
Expand All @@ -45,7 +44,6 @@ def verb(
input_mode (InputMode | None): The input mode of the verb. If raw, then the function is expected to adhere to the VerbFunction interface.
output_mode (OutputMode | None): The output mode of the verb, either a single-value or tuple.
output_names (list[str] | None): The names of the outputs in tuple output mode.
is_async (bool | None): Whether the verb is an async function.
override (bool | None): Whether to override the verb if it already exists.
strict (bool | None): Whether to enforce strict port names. If True, then errors wil be raised at graph-building time if any inputs to this node don't align with the defined input bindings, or any mapped outputs don't align to the defined outputs list.
include_default_output (bool | None): Whether to include a default output port, default=True.
Expand All @@ -62,7 +60,6 @@ def wrap_fn(verb: Callable[P, Any]) -> Callable[P, Any]:
fire_conditions=fire_conditions or [],
emit_conditions=emit_conditions or [],
adapters=adapters or [],
is_async=is_async or False,
strict=strict or False,
input_mode=input_mode or InputMode.PortMapped,
output_mode=output_mode or OutputMode.Value,
Expand Down
3 changes: 1 addition & 2 deletions python/reactivedataflow/reactivedataflow/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
VerbOutput,
)
from .node import Node
from .types import AsyncVerbFunction, EmitCondition, FireCondition, VerbFunction
from .types import EmitCondition, FireCondition, VerbFunction

__all__ = [
"AsyncVerbFunction",
"EmitCondition",
"EmitMode",
"ExecutionNode",
Expand Down
10 changes: 3 additions & 7 deletions python/reactivedataflow/reactivedataflow/nodes/execution_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
"""The reactivedataflow ExecutionNode class."""

import asyncio
from collections.abc import Awaitable
from inspect import iscoroutine
from typing import Any, cast
from typing import Any

import reactivex as rx

from reactivedataflow.constants import default_output

from .io import VerbInput, VerbOutput
from .io import VerbInput
from .node import Node
from .types import VerbFunction

Expand Down Expand Up @@ -168,9 +166,7 @@ async def _recompute(self) -> None:
previous_output={name: obs.value for name, obs in self._outputs.items()},
)

result = self._fn(inputs)
if iscoroutine(result):
result = await cast(Awaitable[VerbOutput], result)
result = await self._fn(inputs)
if not result.no_output:
for name, value in result.outputs.items():
self._output(name).on_next(value)
3 changes: 1 addition & 2 deletions python/reactivedataflow/reactivedataflow/nodes/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from .io import VerbInput, VerbOutput

VerbFunction = Callable[[VerbInput], VerbOutput]
AsyncVerbFunction = Callable[[VerbInput], Awaitable[VerbOutput]]
VerbFunction = Callable[[VerbInput], Awaitable[VerbOutput]]
FireCondition = Callable[[VerbInput], bool]
EmitCondition = Callable[[VerbInput, VerbOutput], bool]
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ class Registration:
input_mode: InputMode
output_mode: OutputMode
output_names: list[str] | None
is_async: bool
strict: bool
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def verb_constructor(
def push(x):
decorators.insert(0, x)

if registration.is_async:
push(handle_async_output())
push(handle_async_output())

if registration.output_mode != OutputMode.Raw:
push(
Expand Down
3 changes: 1 addition & 2 deletions python/reactivedataflow/reactivedataflow/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
"""reactivedataflow Types."""

from .decorators import AnyFn, Decorator
from .nodes import AsyncVerbFunction, EmitCondition, FireCondition, VerbFunction
from .nodes import EmitCondition, FireCondition, VerbFunction
from .ports import PortBinding
from .registry import VerbConstructor
from .utils.equality import IsEqualCheck

__all__ = [
"AnyFn",
"AsyncVerbFunction",
"Decorator",
"EmitCondition",
"FireCondition",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2024 Microsoft Corporation.
"""Output decorator tests."""

import asyncio

import pytest

from reactivedataflow import (
Expand All @@ -18,56 +20,62 @@
def test_outputs_raises_when_output_names_are_missing():
def decorate():
@connect_output(mode=OutputMode.Tuple)
def stub():
async def stub():
await asyncio.sleep(0.001)
return 1, 2, 3

with pytest.raises(OutputNamesMissingInTupleOutputModeError):
decorate()


def test_outputs_raises_when_output_names_mismatch_num_outputs():
async def test_outputs_raises_when_output_names_mismatch_num_outputs():
@connect_output(mode=OutputMode.Tuple, output_names=["a"])
def stub():
async def stub():
await asyncio.sleep(0.001)
return 1, 2, 3

with pytest.raises(ValueError): # noqa PT011 (emitted from zip() function)
stub()
await stub()


def test_tuple_mode_output():
async def test_tuple_mode_output():
@connect_output(mode=OutputMode.Tuple, output_names=[default_output, "a", "b"])
def stub():
async def stub():
await asyncio.sleep(0.001)
return 1, 2, 3

result = stub()
result = await stub()
assert result.outputs == {default_output: 1, "a": 2, "b": 3}


def test_value_mode_output_with_output_names_throws():
def decorate():
@connect_output(mode=OutputMode.Value, output_names=["a", "b"])
def stub():
async def stub():
await asyncio.sleep(0.001)
return 123

with pytest.raises(OutputNamesNotValidInValueOutputModeError):
decorate()


def test_value_mode_output():
async def test_value_mode_output():
@connect_output(mode=OutputMode.Value)
def stub():
async def stub():
await asyncio.sleep(0.001)
return 123

result = stub()
result = await stub()
assert result.outputs == {default_output: 123}
assert not result.no_output


def test_raw_mode_output():
async def test_raw_mode_output():
@connect_output(mode=OutputMode.Raw)
def stub() -> VerbOutput:
async def stub() -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={default_output: 123})

result = stub()
result = await stub()
assert result.outputs == {default_output: 123}
assert not result.no_output
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Emit Conditions Tests."""

import asyncio

from reactivedataflow import VerbInput, VerbOutput, emit_conditions
from reactivedataflow.conditions import output_changed

Expand All @@ -13,42 +15,45 @@ def returns_false(inputs: VerbInput, result: VerbOutput) -> bool:
return False


def test_emits_output_when_conditions_pass() -> None:
async def test_emits_output_when_conditions_pass() -> None:
@emit_conditions(returns_true)
def test_fn2(inputs):
async def test_fn2(inputs):
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn2(VerbInput())
result = await test_fn2(VerbInput())
assert result.no_output is False
assert result.outputs["result"] == "result"


def test_emits_no_output_if_conditions_fail() -> None:
async def test_emits_no_output_if_conditions_fail() -> None:
@emit_conditions(returns_true, returns_false)
def test_fn(inputs: VerbInput) -> VerbOutput:
async def test_fn(inputs: VerbInput) -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is True
assert result.outputs == {}


def test_output_changed():
async def test_output_changed():
return_value = None

@emit_conditions(output_changed("result"))
def test_fn(inputs: VerbInput) -> VerbOutput:
async def test_fn(inputs: VerbInput) -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": return_value})

result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is True
assert result.outputs == {}

return_value = "result"
result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is False
assert result.outputs["result"] == "result"

result = test_fn(VerbInput(previous_output={"result": return_value}))
result = await test_fn(VerbInput(previous_output={"result": return_value}))
assert result.no_output is True
assert result.outputs == {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Emit Conditions Tests."""

import asyncio

from reactivedataflow import VerbInput, VerbOutput, fire_conditions


Expand All @@ -12,21 +14,23 @@ def returns_false(inputs: VerbInput) -> bool:
return False


def test_emits_output_when_conditions_pass() -> None:
async def test_emits_output_when_conditions_pass() -> None:
@fire_conditions(returns_true)
def test_fn2(inputs):
async def test_fn2(inputs):
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn2(VerbInput())
result = await test_fn2(VerbInput())
assert result.no_output is False
assert result.outputs["result"] == "result"


def test_emits_no_output_if_conditions_fail() -> None:
async def test_emits_no_output_if_conditions_fail() -> None:
@fire_conditions(returns_true, returns_false)
def test_fn(inputs: VerbInput) -> VerbOutput:
async def test_fn(inputs: VerbInput) -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is True
assert result.outputs == {}
Loading
Loading