Skip to content

Commit

Permalink
make async a first-class primitive by defining the VerbFunction callb…
Browse files Browse the repository at this point in the history
…ack as async
  • Loading branch information
darthtrevino committed Jul 1, 2024
1 parent 3c3e5ce commit 1f17199
Show file tree
Hide file tree
Showing 16 changed files with 77 additions and 73 deletions.
2 changes: 1 addition & 1 deletion python/reactivedataflow/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "reactivedataflow"
version = "0.1.5"
version = "0.1.6"
description = "Reactive Dataflow Graphs"
license = "MIT"
authors = ["Chris Trevino <chtrevin@microsoft.com>"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Outputs Decorator."""

from collections.abc import Callable
from collections.abc import Awaitable, Callable
from typing import Any, ParamSpec, TypeVar, cast

from reactivedataflow.constants import default_output
Expand All @@ -21,7 +21,7 @@
def connect_output(
mode: OutputMode = OutputMode.Value,
output_names: list[str] | None = None,
) -> Callable[[Callable[P, Any]], Callable[P, VerbOutput]]:
) -> Callable[[Callable[P, Awaitable[Any]]], Callable[P, Awaitable[VerbOutput]]]:
"""Decorate an execution function with output conditions.
Args:
Expand All @@ -34,9 +34,9 @@ def connect_output(
if mode == OutputMode.Value and output_names is not None:
raise OutputNamesNotValidInValueOutputModeError

def wrap_fn(fn: Callable[P, Any]) -> Callable[P, VerbOutput]:
def wrapped_fn(*args: P.args, **kwargs: P.kwargs) -> VerbOutput:
result = fn(*args, **kwargs)
def wrap_fn(fn: Callable[P, Awaitable[Any]]) -> Callable[P, Awaitable[VerbOutput]]:
async def wrapped_fn(*args: P.args, **kwargs: P.kwargs) -> VerbOutput:
result = await fn(*args, **kwargs)
match mode:
case OutputMode.Raw:
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def emit_conditions(
"""Conditionally emit to output ports."""

def wrap_fn(fn: VerbFunction) -> VerbFunction:
def wrapped_fn(inputs: VerbInput) -> VerbOutput:
result = fn(inputs)
async def wrapped_fn(inputs: VerbInput) -> VerbOutput:
result = await fn(inputs)
are_conditions_met = all(
condition(inputs, result) for condition in conditions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def fire_conditions(
"""Apply conditional firing conditions to a function."""

def wrap_fn(fn: VerbFunction) -> VerbFunction:
def wrapped_fn(inputs: VerbInput) -> VerbOutput:
async def wrapped_fn(inputs: VerbInput) -> VerbOutput:
are_conditions_met = all(condition(inputs) for condition in conditions)
if not are_conditions_met:
return VerbOutput(no_output=True)
return fn(inputs)
return await fn(inputs)

wrapped_fn.__qualname__ = f"{fn.__qualname__}_wrapfirecond"
return wrapped_fn
Expand Down
2 changes: 0 additions & 2 deletions python/reactivedataflow/reactivedataflow/decorators/verb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def verb(
input_mode: InputMode | None = None,
output_mode: OutputMode | None = None,
output_names: list[str] | None = None,
is_async: bool | None = None,
override: bool | None = None,
include_default_output: bool | None = None,
strict: bool | None = None,
Expand Down Expand Up @@ -62,7 +61,6 @@ def wrap_fn(verb: Callable[P, Any]) -> Callable[P, Any]:
fire_conditions=fire_conditions or [],
emit_conditions=emit_conditions or [],
adapters=adapters or [],
is_async=is_async or False,
strict=strict or False,
input_mode=input_mode or InputMode.PortMapped,
output_mode=output_mode or OutputMode.Value,
Expand Down
3 changes: 1 addition & 2 deletions python/reactivedataflow/reactivedataflow/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
VerbOutput,
)
from .node import Node
from .types import AsyncVerbFunction, EmitCondition, FireCondition, VerbFunction
from .types import EmitCondition, FireCondition, VerbFunction

__all__ = [
"AsyncVerbFunction",
"EmitCondition",
"EmitMode",
"ExecutionNode",
Expand Down
10 changes: 3 additions & 7 deletions python/reactivedataflow/reactivedataflow/nodes/execution_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
"""The reactivedataflow ExecutionNode class."""

import asyncio
from collections.abc import Awaitable
from inspect import iscoroutine
from typing import Any, cast
from typing import Any

import reactivex as rx

from reactivedataflow.constants import default_output

from .io import VerbInput, VerbOutput
from .io import VerbInput
from .node import Node
from .types import VerbFunction

Expand Down Expand Up @@ -168,9 +166,7 @@ async def _recompute(self) -> None:
previous_output={name: obs.value for name, obs in self._outputs.items()},
)

result = self._fn(inputs)
if iscoroutine(result):
result = await cast(Awaitable[VerbOutput], result)
result = await self._fn(inputs)
if not result.no_output:
for name, value in result.outputs.items():
self._output(name).on_next(value)
3 changes: 1 addition & 2 deletions python/reactivedataflow/reactivedataflow/nodes/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from .io import VerbInput, VerbOutput

VerbFunction = Callable[[VerbInput], VerbOutput]
AsyncVerbFunction = Callable[[VerbInput], Awaitable[VerbOutput]]
VerbFunction = Callable[[VerbInput], Awaitable[VerbOutput]]
FireCondition = Callable[[VerbInput], bool]
EmitCondition = Callable[[VerbInput, VerbOutput], bool]
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ class Registration:
input_mode: InputMode
output_mode: OutputMode
output_names: list[str] | None
is_async: bool
strict: bool
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def verb_constructor(
def push(x):
decorators.insert(0, x)

if registration.is_async:
push(handle_async_output())
push(handle_async_output())

if registration.output_mode != OutputMode.Raw:
push(
Expand Down
3 changes: 1 addition & 2 deletions python/reactivedataflow/reactivedataflow/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
"""reactivedataflow Types."""

from .decorators import AnyFn, Decorator
from .nodes import AsyncVerbFunction, EmitCondition, FireCondition, VerbFunction
from .nodes import EmitCondition, FireCondition, VerbFunction
from .ports import PortBinding
from .registry import VerbConstructor
from .utils.equality import IsEqualCheck

__all__ = [
"AnyFn",
"AsyncVerbFunction",
"Decorator",
"EmitCondition",
"FireCondition",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2024 Microsoft Corporation.
"""Output decorator tests."""

import asyncio

import pytest

from reactivedataflow import (
Expand All @@ -18,56 +20,62 @@
def test_outputs_raises_when_output_names_are_missing():
def decorate():
@connect_output(mode=OutputMode.Tuple)
def stub():
async def stub():
await asyncio.sleep(0.001)
return 1, 2, 3

with pytest.raises(OutputNamesMissingInTupleOutputModeError):
decorate()


def test_outputs_raises_when_output_names_mismatch_num_outputs():
async def test_outputs_raises_when_output_names_mismatch_num_outputs():
@connect_output(mode=OutputMode.Tuple, output_names=["a"])
def stub():
async def stub():
await asyncio.sleep(0.001)
return 1, 2, 3

with pytest.raises(ValueError): # noqa PT011 (emitted from zip() function)
stub()
await stub()


def test_tuple_mode_output():
async def test_tuple_mode_output():
@connect_output(mode=OutputMode.Tuple, output_names=[default_output, "a", "b"])
def stub():
async def stub():
await asyncio.sleep(0.001)
return 1, 2, 3

result = stub()
result = await stub()
assert result.outputs == {default_output: 1, "a": 2, "b": 3}


def test_value_mode_output_with_output_names_throws():
def decorate():
@connect_output(mode=OutputMode.Value, output_names=["a", "b"])
def stub():
async def stub():
await asyncio.sleep(0.001)
return 123

with pytest.raises(OutputNamesNotValidInValueOutputModeError):
decorate()


def test_value_mode_output():
async def test_value_mode_output():
@connect_output(mode=OutputMode.Value)
def stub():
async def stub():
await asyncio.sleep(0.001)
return 123

result = stub()
result = await stub()
assert result.outputs == {default_output: 123}
assert not result.no_output


def test_raw_mode_output():
async def test_raw_mode_output():
@connect_output(mode=OutputMode.Raw)
def stub() -> VerbOutput:
async def stub() -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={default_output: 123})

result = stub()
result = await stub()
assert result.outputs == {default_output: 123}
assert not result.no_output
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Emit Conditions Tests."""

import asyncio

from reactivedataflow import VerbInput, VerbOutput, emit_conditions
from reactivedataflow.conditions import output_changed

Expand All @@ -13,42 +15,45 @@ def returns_false(inputs: VerbInput, result: VerbOutput) -> bool:
return False


def test_emits_output_when_conditions_pass() -> None:
async def test_emits_output_when_conditions_pass() -> None:
@emit_conditions(returns_true)
def test_fn2(inputs):
async def test_fn2(inputs):
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn2(VerbInput())
result = await test_fn2(VerbInput())
assert result.no_output is False
assert result.outputs["result"] == "result"


def test_emits_no_output_if_conditions_fail() -> None:
async def test_emits_no_output_if_conditions_fail() -> None:
@emit_conditions(returns_true, returns_false)
def test_fn(inputs: VerbInput) -> VerbOutput:
async def test_fn(inputs: VerbInput) -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is True
assert result.outputs == {}


def test_output_changed():
async def test_output_changed():
return_value = None

@emit_conditions(output_changed("result"))
def test_fn(inputs: VerbInput) -> VerbOutput:
async def test_fn(inputs: VerbInput) -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": return_value})

result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is True
assert result.outputs == {}

return_value = "result"
result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is False
assert result.outputs["result"] == "result"

result = test_fn(VerbInput(previous_output={"result": return_value}))
result = await test_fn(VerbInput(previous_output={"result": return_value}))
assert result.no_output is True
assert result.outputs == {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Emit Conditions Tests."""

import asyncio

from reactivedataflow import VerbInput, VerbOutput, fire_conditions


Expand All @@ -12,21 +14,23 @@ def returns_false(inputs: VerbInput) -> bool:
return False


def test_emits_output_when_conditions_pass() -> None:
async def test_emits_output_when_conditions_pass() -> None:
@fire_conditions(returns_true)
def test_fn2(inputs):
async def test_fn2(inputs):
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn2(VerbInput())
result = await test_fn2(VerbInput())
assert result.no_output is False
assert result.outputs["result"] == "result"


def test_emits_no_output_if_conditions_fail() -> None:
async def test_emits_no_output_if_conditions_fail() -> None:
@fire_conditions(returns_true, returns_false)
def test_fn(inputs: VerbInput) -> VerbOutput:
async def test_fn(inputs: VerbInput) -> VerbOutput:
await asyncio.sleep(0.001)
return VerbOutput(outputs={"result": "result"})

result = test_fn(VerbInput())
result = await test_fn(VerbInput())
assert result.no_output is True
assert result.outputs == {}
Loading

0 comments on commit 1f17199

Please sign in to comment.