diff --git a/python/reactivedataflow/reactivedataflow/__init__.py b/python/reactivedataflow/reactivedataflow/__init__.py index 4aa72543..13f2c3f6 100644 --- a/python/reactivedataflow/reactivedataflow/__init__.py +++ b/python/reactivedataflow/reactivedataflow/__init__.py @@ -1,6 +1,14 @@ # Copyright (c) 2024 Microsoft Corporation. """The reactivedataflow Library.""" +from .bindings import ( + ArrayInput, + Bindings, + Config, + Input, + NamedInputs, + Output, +) from .decorators import ( apply_decorators, connect_input, @@ -22,14 +30,6 @@ VerbInput, VerbOutput, ) -from .ports import ( - ArrayInputPort, - ConfigPort, - InputPort, - NamedInputsPort, - OutputPort, - Ports, -) from .registry import Registry # Public API @@ -43,20 +43,20 @@ # - reactivedataflow.conditions - this contains a listing of emit and fire condition functions for users of the library. # - recativedataflow.model - this contains the Pydantic model for graph assembly. __all__ = [ - "ArrayInputPort", - "ConfigPort", + "ArrayInput", + "Bindings", + "Config", "EmitMode", "ExecutionGraph", "ExecutionNode", "GraphAssembler", + "Input", "InputMode", "InputNode", - "InputPort", - "NamedInputsPort", + "NamedInputs", "Node", + "Output", "OutputMode", - "OutputPort", - "Ports", "Registry", "VerbInput", "VerbOutput", diff --git a/python/reactivedataflow/reactivedataflow/ports.py b/python/reactivedataflow/reactivedataflow/bindings.py similarity index 64% rename from python/reactivedataflow/reactivedataflow/ports.py rename to python/reactivedataflow/reactivedataflow/bindings.py index 759c1ea8..3678bd9f 100644 --- a/python/reactivedataflow/reactivedataflow/ports.py +++ b/python/reactivedataflow/reactivedataflow/bindings.py @@ -10,7 +10,7 @@ from reactivedataflow.utils.equality import IsEqualCheck -class ArrayInputPort(BaseModel, extra="allow"): +class ArrayInput(BaseModel, extra="allow"): """Specification for an array-based input port.""" type: str | None = Field(default=None, description="The type of the port.") @@ -23,7 +23,7 @@ class ArrayInputPort(BaseModel, extra="allow"): ) -class NamedInputsPort(BaseModel, extra="allow"): +class NamedInputs(BaseModel, extra="allow"): """Specification for injecting all named inputs as a single dictionary.""" type: dict[str, str] | None = Field( @@ -38,7 +38,7 @@ class NamedInputsPort(BaseModel, extra="allow"): ) -class InputPort(BaseModel, extra="allow"): +class Input(BaseModel, extra="allow"): """Specification for a named input port.""" name: str = Field(..., description="The name of the port.") @@ -52,7 +52,7 @@ class InputPort(BaseModel, extra="allow"): ) -class OutputPort(BaseModel, extra="allow"): +class Output(BaseModel, extra="allow"): """Specification for an output port.""" name: str = Field(..., description="The name of the port.") @@ -65,7 +65,7 @@ class OutputPort(BaseModel, extra="allow"): ) -class ConfigPort(BaseModel, extra="allow"): +class Config(BaseModel, extra="allow"): """Specification for a configuration field of an verb function.""" name: str = Field(..., description="The name of the port.") @@ -77,56 +77,58 @@ class ConfigPort(BaseModel, extra="allow"): ) -Port = InputPort | ArrayInputPort | NamedInputsPort | ConfigPort | OutputPort +Binding = Input | ArrayInput | NamedInputs | Config | Output -class Ports: - """PortMapper class.""" +class Bindings: + """Node Binding Managemer class. - _ports: list[Port] + Node bindings are used to map processing-graph inputs, outputs, and configuration values into the appropriate + function parameters. This class is used to manage the bindings for a node. + """ - def __init__(self, ports: list[Port]): - self._ports = ports + _bindings: list[Binding] + + def __init__(self, ports: list[Binding]): + self._bindings = ports self._validate() def _validate(self): """Validate the ports.""" - input_names = [port.name for port in self.ports if isinstance(port, InputPort)] + input_names = [port.name for port in self.bindings if isinstance(port, Input)] if len(input_names) != len(set(input_names)): raise PortNamesMustBeUniqueError - config_names = [ - port.name for port in self.ports if isinstance(port, ConfigPort) - ] + config_names = [port.name for port in self.bindings if isinstance(port, Config)] if len(config_names) != len(set(config_names)): raise PortNamesMustBeUniqueError @property - def ports(self) -> list[Port]: - """Return the ports.""" - return self._ports + def bindings(self) -> list[Binding]: + """Return the bindings.""" + return self._bindings @property - def config(self) -> list[ConfigPort]: - """Return the configuration ports.""" - return [port for port in self.ports if isinstance(port, ConfigPort)] + def config(self) -> list[Config]: + """Return the configuration bindings.""" + return [port for port in self.bindings if isinstance(port, Config)] @property - def input(self) -> list[InputPort]: - """Return the input ports.""" - return [port for port in self.ports if isinstance(port, InputPort)] + def input(self) -> list[Input]: + """Return the input bindings.""" + return [port for port in self.bindings if isinstance(port, Input)] @property - def outputs(self) -> list[OutputPort]: - """Return the output ports.""" - return [port for port in self._ports if isinstance(port, OutputPort)] + def outputs(self) -> list[Output]: + """Return the output bindings.""" + return [port for port in self._bindings if isinstance(port, Output)] @property - def array_input(self) -> ArrayInputPort | None: + def array_input(self) -> ArrayInput | None: """Return the array input port.""" - return next((p for p in self._ports if isinstance(p, ArrayInputPort)), None) + return next((p for p in self._bindings if isinstance(p, ArrayInput)), None) @property - def named_inputs(self) -> NamedInputsPort | None: + def named_inputs(self) -> NamedInputs | None: """Return the named inputs port.""" - return next((p for p in self._ports if isinstance(p, NamedInputsPort)), None) + return next((p for p in self._bindings if isinstance(p, NamedInputs)), None) diff --git a/python/reactivedataflow/reactivedataflow/decorators/__init__.py b/python/reactivedataflow/reactivedataflow/decorators/__init__.py index 922076fb..d31999ce 100644 --- a/python/reactivedataflow/reactivedataflow/decorators/__init__.py +++ b/python/reactivedataflow/reactivedataflow/decorators/__init__.py @@ -1,7 +1,7 @@ # Copyright (c) 2024 Microsoft Corporation. """reactivedataflow Decorators.""" -from .apply_decorators import apply_decorators +from .apply_decorators import AnyFn, Decorator, apply_decorators from .connect_input import connect_input from .connect_output import connect_output from .emit_conditions import emit_conditions @@ -10,6 +10,8 @@ from .verb import verb __all__ = [ + "AnyFn", + "Decorator", "apply_decorators", "connect_input", "connect_output", diff --git a/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py b/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py index 61fe8f9b..2f6ba7b6 100644 --- a/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py +++ b/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py @@ -5,12 +5,11 @@ from functools import reduce from typing import Any -Decorator = Callable[[Callable[..., Any]], Callable[..., Any]] +AnyFn = Callable[..., Any] +Decorator = Callable[[AnyFn], AnyFn] -def apply_decorators( - fn: Callable[..., Any], decorators: list[Decorator] -) -> Callable[..., Any]: +def apply_decorators(fn: AnyFn, decorators: list[Decorator]) -> AnyFn: """ Apply a series of decorators to a function reference. diff --git a/python/reactivedataflow/reactivedataflow/decorators/connect_input.py b/python/reactivedataflow/reactivedataflow/decorators/connect_input.py index 830665e2..0d689643 100644 --- a/python/reactivedataflow/reactivedataflow/decorators/connect_input.py +++ b/python/reactivedataflow/reactivedataflow/decorators/connect_input.py @@ -4,15 +4,15 @@ from collections.abc import Callable from typing import Any, ParamSpec, TypeVar, cast +from reactivedataflow.bindings import Bindings from reactivedataflow.nodes import VerbInput -from reactivedataflow.ports import Ports T = TypeVar("T") P = ParamSpec("P") def connect_input( - ports: Ports, + bindings: Bindings, ) -> Callable[[Callable[P, T]], Callable[[VerbInput], T]]: """Decorate an execution function with input conditions. @@ -27,26 +27,27 @@ def wrapped_fn(inputs: VerbInput, *args: P.args, **kwargs: P.kwargs) -> T: fn_kwargs = {**kwargs} # Inject named-input Dictionary - named_inputs_port = ports.named_inputs + named_inputs_port = bindings.named_inputs if ( named_inputs_port is not None and named_inputs_port.parameter is not None ): fn_kwargs[named_inputs_port.parameter] = inputs.named_inputs - # Inject array-ports - array_port = ports.array_input + # Inject array input + array_port = bindings.array_input if array_port is not None and array_port.parameter is not None: fn_kwargs[array_port.parameter] = inputs.array_inputs - # Inject named ports + # Inject named parameters fn_kwargs.update({ p.parameter or p.name: inputs.named_inputs.get(p.name) - for p in ports.input + for p in bindings.input }) - # Inject config ports + # Inject configuration values fn_kwargs.update({ - p.parameter or p.name: inputs.config.get(p.name) for p in ports.config + p.parameter or p.name: inputs.config.get(p.name) + for p in bindings.config }) return cast(Any, fn)(*args, **fn_kwargs) diff --git a/python/reactivedataflow/reactivedataflow/decorators/verb.py b/python/reactivedataflow/reactivedataflow/decorators/verb.py index fef7abd5..9a81774d 100644 --- a/python/reactivedataflow/reactivedataflow/decorators/verb.py +++ b/python/reactivedataflow/reactivedataflow/decorators/verb.py @@ -2,56 +2,28 @@ """reactivedataflow Verb Decorator.""" from collections.abc import Callable -from dataclasses import dataclass from typing import Any, ParamSpec -from reactivedataflow.conditions import ( - array_input_values_are_defined, - output_changed, - require_config, - require_inputs, -) +from reactivedataflow.bindings import Binding, Bindings from reactivedataflow.nodes import ( EmitCondition, - EmitMode, FireCondition, InputMode, OutputMode, - VerbFunction, ) -from reactivedataflow.ports import ConfigPort, InputPort, Port, Ports from reactivedataflow.registry import Registration, Registry -from .apply_decorators import apply_decorators -from .connect_input import connect_input -from .connect_output import connect_output -from .emit_conditions import emit_conditions as emitting_conditions_decorator -from .fire_conditions import fire_conditions as firing_conditions_decorator -from .handle_async_output import handle_async_output +from .apply_decorators import Decorator P = ParamSpec("P") -@dataclass -class VerbSpecification: - """A container class for verb specifications and augmentation.""" - - ports: Ports - adapters: list[Callable[[Callable[..., Any]], Callable[..., Any]]] - fire_conditions: list[FireCondition] - emit_conditions: list[EmitCondition] - input_mode: InputMode - output_mode: OutputMode - output_names: list[str] | None - is_async: bool - - def verb( name: str, - adapters: list[Callable[[Callable[..., Any]], Callable[..., Any]]] | None = None, + adapters: list[Decorator] | None = None, fire_conditions: list[FireCondition] | None = None, emit_conditions: list[EmitCondition] | None = None, - ports: list[Port] | None = None, + bindings: list[Binding] | None = None, registry: Registry | None = None, input_mode: InputMode | None = None, output_mode: OutputMode | None = None, @@ -61,20 +33,19 @@ def verb( ) -> Callable[[Callable[P, Any]], Callable[P, Any]]: """Register an verb function with the given name.""" registry = registry or Registry.get_instance() - spec = VerbSpecification( - ports=Ports(ports or []), - fire_conditions=fire_conditions or [], - emit_conditions=emit_conditions or [], - adapters=adapters or [], - is_async=is_async or False, - input_mode=input_mode or InputMode.PortMapped, - output_mode=output_mode or OutputMode.Value, - output_names=output_names, - ) def wrap_fn(verb: Callable[P, Any]) -> Callable[P, Any]: - fn = _wrap_verb_fn(verb, spec) - registration = Registration(fn, spec.ports) + registration = Registration( + fn=verb, + bindings=Bindings(bindings or []), + fire_conditions=fire_conditions or [], + emit_conditions=emit_conditions or [], + adapters=adapters or [], + is_async=is_async or False, + input_mode=input_mode or InputMode.PortMapped, + output_mode=output_mode or OutputMode.Value, + output_names=output_names, + ) registry.register( name, registration, @@ -83,102 +54,3 @@ def wrap_fn(verb: Callable[P, Any]) -> Callable[P, Any]: return verb return wrap_fn - - -def _wrap_verb_fn( - fn: Callable[P, Any], - spec: VerbSpecification, -) -> VerbFunction: - fire_conditions = spec.fire_conditions + _infer_firing_conditions(spec) - emit_conditions = spec.emit_conditions + _infer_emit_conditions(spec) - decorators: list = spec.adapters.copy() - - def push(x): - decorators.insert(0, x) - - if spec.is_async: - push(handle_async_output()) - - if spec.output_mode != OutputMode.Raw: - push(connect_output(mode=spec.output_mode, output_names=spec.output_names)) - - if spec.input_mode == InputMode.PortMapped: - input_parameter_map = _input_parameter_map(spec.ports.input) - config_parameter_map = _config_parameter_map(spec.ports.config) - array_inputs_parameter: str | None = ( - spec.ports.array_input and spec.ports.array_input.parameter - ) - dict_inputs_parameter: str | None = ( - spec.ports.named_inputs and spec.ports.named_inputs.parameter - ) - is_input_connection_required = ( - len(input_parameter_map) > 0 - or len(config_parameter_map) > 0 - or array_inputs_parameter - or dict_inputs_parameter - ) - if is_input_connection_required: - push(connect_input(ports=spec.ports)) - - if len(fire_conditions) > 0: - push(firing_conditions_decorator(*fire_conditions)) - if len(emit_conditions) > 0: - push(emitting_conditions_decorator(*emit_conditions)) - - return apply_decorators(fn, decorators) - - -def _infer_firing_conditions( - spec: VerbSpecification, -) -> list[FireCondition]: - firing_conditions = [] - required_inputs: list[str] = [p.name for p in spec.ports.input if p.required] - required_config: list[str] = [p.name for p in spec.ports.config if p.required] - - if spec.ports.array_input and spec.ports.array_input.required: - firing_conditions.append(array_input_values_are_defined()) - if spec.ports.named_inputs and spec.ports.named_inputs.required: - required_inputs.extend(spec.ports.named_inputs.required) - - if len(required_inputs) > 0: - firing_conditions.append(require_inputs(*required_inputs)) - if len(required_config) > 0: - firing_conditions.append(require_config(*required_config)) - - return firing_conditions - - -def _infer_emit_conditions( - spec: VerbSpecification, -) -> list[EmitCondition]: - result: list[EmitCondition] = [] - - # Create a composite emit condition. Any output that emits a changed value will allow an emit. - ports = [p for p in spec.ports.outputs if p.emits_on == EmitMode.OnChange] - if len(ports) > 0: - change_checks = [output_changed(p.name) for p in ports] - - def any_output_changed(inputs, outputs): - return any(c(inputs, outputs) for c in change_checks) - - result.append(any_output_changed) - - return result - - -def _input_parameter_map( - inputs: list[InputPort], -) -> dict[str, str]: - result: dict[str, str] = {} - for port in inputs: - result[port.name] = port.parameter or port.name - return result - - -def _config_parameter_map( - config: list[ConfigPort], -) -> dict[str, str]: - result: dict[str, str] = {} - for port in config: - result[port.name] = port.parameter or port.name - return result diff --git a/python/reactivedataflow/reactivedataflow/graph_assembler.py b/python/reactivedataflow/reactivedataflow/graph_assembler.py index 2a86c1df..054b7cce 100644 --- a/python/reactivedataflow/reactivedataflow/graph_assembler.py +++ b/python/reactivedataflow/reactivedataflow/graph_assembler.py @@ -96,7 +96,7 @@ def build_nodes() -> dict[str, Node]: nodes[nid] = InputNode(nid) continue - verb = registry.get(node["verb"]).fn + verb = registry.get_verb_function(node["verb"]) node_final_config = {**config, **node_config} execution_node = ExecutionNode(nid, verb, node_final_config) nodes[nid] = execution_node diff --git a/python/reactivedataflow/reactivedataflow/registry.py b/python/reactivedataflow/reactivedataflow/registry.py deleted file mode 100644 index bb66d148..00000000 --- a/python/reactivedataflow/reactivedataflow/registry.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (c) 2024 Microsoft Corporation. -"""reactivedataflow Verb Registry.""" - -from dataclasses import dataclass -from typing import ClassVar - -from reactivedataflow.errors import VerbAlreadyRegisteredError, VerbNotFoundError - -from .nodes import VerbFunction -from .ports import Ports - - -@dataclass -class Registration: - """Registration of an verb function.""" - - fn: VerbFunction - """The verb function.""" - - ports: Ports - """The ports of the verb function.""" - - -class Registry: - """A registry for verbs.""" - - _instance: ClassVar["Registry | None"] = None - _verbs: dict[str, Registration] - - def __init__(self): - self._verbs = {} - - def register( - self, - name: str, - registration: Registration, - override: bool | None = None, - ) -> None: - """Register a verb.""" - if name in self._verbs and not override: - raise VerbAlreadyRegisteredError(name) - self._verbs[name] = registration - - def get(self, name: str) -> Registration: - """Get a verb by name.""" - result = self._verbs.get(name) - if result is None: - raise VerbNotFoundError(name) - return result - - @staticmethod - def get_instance() -> "Registry": - """Get the singleton instance of the registry.""" - if Registry._instance is None: - Registry._instance = Registry() - return Registry._instance diff --git a/python/reactivedataflow/reactivedataflow/registry/__init__.py b/python/reactivedataflow/reactivedataflow/registry/__init__.py new file mode 100644 index 00000000..2aa1fe44 --- /dev/null +++ b/python/reactivedataflow/reactivedataflow/registry/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) 2024 Microsoft Corporation. +"""reactivedataflow Verb Registry.""" + +from .registration import Registration +from .registry import Registry, VerbConstructor + +__all__ = ["Registration", "Registry", "VerbConstructor"] diff --git a/python/reactivedataflow/reactivedataflow/registry/registration.py b/python/reactivedataflow/reactivedataflow/registry/registration.py new file mode 100644 index 00000000..f6e4b383 --- /dev/null +++ b/python/reactivedataflow/reactivedataflow/registry/registration.py @@ -0,0 +1,35 @@ +# Copyright (c) 2024 Microsoft Corporation. +"""Registration of a verb function.""" + +from collections.abc import Callable +from dataclasses import dataclass + +from reactivedataflow.bindings import Bindings +from reactivedataflow.decorators import Decorator +from reactivedataflow.nodes import ( + EmitCondition, + FireCondition, + InputMode, + OutputMode, +) + + +@dataclass +class Registration: + """Registration of an verb function.""" + + fn: Callable + """The verb function.""" + + bindings: Bindings + """The ports of the verb function.""" + + adapters: list[Decorator] + """A list of decorators to apply to the raw verb function.""" + + fire_conditions: list[FireCondition] + emit_conditions: list[EmitCondition] + input_mode: InputMode + output_mode: OutputMode + output_names: list[str] | None + is_async: bool diff --git a/python/reactivedataflow/reactivedataflow/registry/registry.py b/python/reactivedataflow/reactivedataflow/registry/registry.py new file mode 100644 index 00000000..b34c03a1 --- /dev/null +++ b/python/reactivedataflow/reactivedataflow/registry/registry.py @@ -0,0 +1,70 @@ +# Copyright (c) 2024 Microsoft Corporation. +"""reactivedataflow Verb Registry.""" + +from collections.abc import Callable +from typing import ClassVar + +from reactivedataflow.errors import VerbAlreadyRegisteredError, VerbNotFoundError +from reactivedataflow.nodes import ( + VerbFunction, +) + +from .registration import Registration +from .verb_constructor import verb_constructor + +VerbConstructor = Callable[[Registration], VerbFunction] + + +class Registry: + """A registry for verbs.""" + + _instance: ClassVar["Registry | None"] = None + _verbs: dict[str, Registration] + _verb_fns: dict[str, VerbFunction] + _verb_constructor: VerbConstructor + + def __init__( + self, + verb_constructor: VerbConstructor = verb_constructor, + verbs: dict[str, Registration] | None = None, + ): + self._verbs = verbs or {} + self._verb_fns = {} + self._verb_constructor = verb_constructor + + def clone(self) -> "Registry": + """Create a new, duplicate registry with the registrations of the current registry.""" + return Registry(self._verb_constructor, self._verbs.copy()) + + def register( + self, + name: str, + registration: Registration, + override: bool | None = None, + ) -> None: + """Register a verb.""" + if name in self._verbs and not override: + raise VerbAlreadyRegisteredError(name) + self._verbs[name] = registration + + def get(self, name: str) -> Registration: + """Get a verb by name.""" + result = self._verbs.get(name) + if result is None: + raise VerbNotFoundError(name) + return result + + def get_verb_function(self, name: str) -> VerbFunction: + """Get a built verb function.""" + if self._verb_fns.get(name) is None: + registration = self.get(name) + verb_fn = self._verb_constructor(registration) + self._verb_fns[name] = verb_fn + return self._verb_fns[name] + + @staticmethod + def get_instance() -> "Registry": + """Get the singleton instance of the registry.""" + if Registry._instance is None: + Registry._instance = Registry() + return Registry._instance diff --git a/python/reactivedataflow/reactivedataflow/registry/verb_constructor.py b/python/reactivedataflow/reactivedataflow/registry/verb_constructor.py new file mode 100644 index 00000000..15de3e89 --- /dev/null +++ b/python/reactivedataflow/reactivedataflow/registry/verb_constructor.py @@ -0,0 +1,149 @@ +# Copyright (c) 2024 Microsoft Corporation. +"""reactivedataflow Verb Registry.""" + +from reactivedataflow.bindings import Config, Input +from reactivedataflow.conditions import ( + array_input_values_are_defined, + output_changed, + require_config, + require_inputs, +) +from reactivedataflow.decorators.apply_decorators import apply_decorators +from reactivedataflow.decorators.connect_input import connect_input +from reactivedataflow.decorators.connect_output import connect_output +from reactivedataflow.decorators.emit_conditions import ( + emit_conditions as emitting_conditions_decorator, +) +from reactivedataflow.decorators.fire_conditions import ( + fire_conditions as firing_conditions_decorator, +) +from reactivedataflow.decorators.handle_async_output import handle_async_output +from reactivedataflow.nodes import ( + EmitCondition, + EmitMode, + FireCondition, + InputMode, + OutputMode, + VerbFunction, +) + +from .registry import Registration + + +def verb_constructor( + registration: Registration, +) -> VerbFunction: + """Wrap a verb function with reactive-dataflow plumbing.""" + fn = registration.fn + fire_conditions = registration.fire_conditions + _infer_firing_conditions( + registration + ) + emit_conditions = registration.emit_conditions + _infer_emit_conditions( + registration + ) + decorators: list = registration.adapters.copy() + + def push(x): + decorators.insert(0, x) + + if registration.is_async: + push(handle_async_output()) + + if registration.output_mode != OutputMode.Raw: + push( + connect_output( + mode=registration.output_mode, output_names=registration.output_names + ) + ) + + if registration.input_mode == InputMode.PortMapped: + input_parameter_map = _input_parameter_map(registration.bindings.input) + config_parameter_map = _config_parameter_map(registration.bindings.config) + array_inputs_parameter: str | None = ( + registration.bindings.array_input + and registration.bindings.array_input.parameter + ) + dict_inputs_parameter: str | None = ( + registration.bindings.named_inputs + and registration.bindings.named_inputs.parameter + ) + is_input_connection_required = ( + len(input_parameter_map) > 0 + or len(config_parameter_map) > 0 + or array_inputs_parameter + or dict_inputs_parameter + ) + if is_input_connection_required: + push(connect_input(bindings=registration.bindings)) + + if len(fire_conditions) > 0: + push(firing_conditions_decorator(*fire_conditions)) + if len(emit_conditions) > 0: + push(emitting_conditions_decorator(*emit_conditions)) + + return apply_decorators(fn, decorators) + + +def _infer_firing_conditions( + registration: Registration, +) -> list[FireCondition]: + firing_conditions = [] + required_inputs: list[str] = [ + p.name for p in registration.bindings.input if p.required + ] + required_config: list[str] = [ + p.name for p in registration.bindings.config if p.required + ] + + if registration.bindings.array_input and registration.bindings.array_input.required: + firing_conditions.append(array_input_values_are_defined()) + if ( + registration.bindings.named_inputs + and registration.bindings.named_inputs.required + ): + required_inputs.extend(registration.bindings.named_inputs.required) + + if len(required_inputs) > 0: + firing_conditions.append(require_inputs(*required_inputs)) + if len(required_config) > 0: + firing_conditions.append(require_config(*required_config)) + + return firing_conditions + + +def _infer_emit_conditions( + registration: Registration, +) -> list[EmitCondition]: + result: list[EmitCondition] = [] + + # Create a composite emit condition. Any output that emits a changed value will allow an emit. + ports = [ + p for p in registration.bindings.outputs if p.emits_on == EmitMode.OnChange + ] + if len(ports) > 0: + change_checks = [output_changed(p.name) for p in ports] + + def any_output_changed(inputs, outputs): + return any(c(inputs, outputs) for c in change_checks) + + result.append(any_output_changed) + + return result + + +def _input_parameter_map( + inputs: list[Input], +) -> dict[str, str]: + result: dict[str, str] = {} + for port in inputs: + result[port.name] = port.parameter or port.name + return result + + +def _config_parameter_map( + config: list[Config], +) -> dict[str, str]: + result: dict[str, str] = {} + for port in config: + result[port.name] = port.parameter or port.name + return result diff --git a/python/reactivedataflow/reactivedataflow/types.py b/python/reactivedataflow/reactivedataflow/types.py index efa54bb0..a28850ca 100644 --- a/python/reactivedataflow/reactivedataflow/types.py +++ b/python/reactivedataflow/reactivedataflow/types.py @@ -1,14 +1,19 @@ # Copyright (c) 2024 Microsoft Corporation. """reactivedataflow Types.""" +from .bindings import Binding +from .decorators import AnyFn, Decorator from .nodes import EmitCondition, FireCondition, VerbFunction -from .ports import Port +from .registry import VerbConstructor from .utils.equality import IsEqualCheck __all__ = [ + "AnyFn", + "Binding", + "Decorator", "EmitCondition", "FireCondition", "IsEqualCheck", - "Port", + "VerbConstructor", "VerbFunction", ] diff --git a/python/reactivedataflow/tests/unit/decorators/test_connect_input.py b/python/reactivedataflow/tests/unit/decorators/test_connect_input.py index aabc9225..75ed0c37 100644 --- a/python/reactivedataflow/tests/unit/decorators/test_connect_input.py +++ b/python/reactivedataflow/tests/unit/decorators/test_connect_input.py @@ -2,11 +2,11 @@ """reactivedataflow Inputs Decorator Tests.""" from reactivedataflow import ( - ArrayInputPort, - ConfigPort, - InputPort, - NamedInputsPort, - Ports, + ArrayInput, + Bindings, + Config, + Input, + NamedInputs, VerbInput, connect_input, ) @@ -14,9 +14,9 @@ def test_named_input_mapping(): @connect_input( - ports=Ports([ - InputPort(name="input_1", parameter="a"), - InputPort(name="input_2", parameter="b"), + bindings=Bindings([ + Input(name="input_1", parameter="a"), + Input(name="input_2", parameter="b"), ]) ) def stub(a: int, b: int) -> int: @@ -28,9 +28,9 @@ def stub(a: int, b: int) -> int: def test_input_with_default_parameter_names(): @connect_input( - ports=Ports([ - InputPort(name="a"), - InputPort(name="b"), + bindings=Bindings([ + Input(name="a"), + Input(name="b"), ]) ) def stub(a: int, b: int) -> int: @@ -41,7 +41,7 @@ def stub(a: int, b: int) -> int: def test_input_dict_mapping(): - @connect_input(Ports([NamedInputsPort(parameter="inputs", required=["a", "b"])])) + @connect_input(Bindings([NamedInputs(parameter="inputs", required=["a", "b"])])) def stub(inputs: dict[str, int]) -> int: return sum(inputs.values()) @@ -51,9 +51,9 @@ def stub(inputs: dict[str, int]) -> int: def test_config_parameters_mapping(): @connect_input( - ports=Ports([ - ConfigPort(name="in_1", parameter="a"), - ConfigPort(name="in_2", parameter="b"), + bindings=Bindings([ + Config(name="in_1", parameter="a"), + Config(name="in_2", parameter="b"), ]) ) def stub(a: str, b: str) -> str: @@ -64,7 +64,7 @@ def stub(a: str, b: str) -> str: def test_array_parameter_mapping(): - @connect_input(ports=Ports([ArrayInputPort(parameter="values")])) + @connect_input(bindings=Bindings([ArrayInput(parameter="values")])) def stub(values: list[int]) -> int: return sum(values) diff --git a/python/reactivedataflow/tests/unit/decorators/test_verb.py b/python/reactivedataflow/tests/unit/decorators/test_verb.py index eb739fdf..737005c5 100644 --- a/python/reactivedataflow/tests/unit/decorators/test_verb.py +++ b/python/reactivedataflow/tests/unit/decorators/test_verb.py @@ -6,7 +6,7 @@ import pytest from reactivedataflow import ( - InputPort, + Input, OutputMode, Registry, VerbInput, @@ -51,7 +51,7 @@ async def test_fn(inputs: VerbInput): await asyncio.sleep(0.001) return VerbOutput(outputs={default_output: result}) - decorated_fn = registry.get("test_fn").fn + decorated_fn = registry.get_verb_function("test_fn") result = decorated_fn(VerbInput(named_inputs={})) assert result.outputs[default_output] == 200 @@ -71,16 +71,16 @@ def wrapper(*args, **kwargs): @verb( name="test_fn", registry=registry, - ports=[ - InputPort(name="a", required=True), - InputPort(name="b", required=True), + bindings=[ + Input(name="a", required=True), + Input(name="b", required=True), ], adapters=[custom_decorator], ) def test_fn(a: int, b: int): return a + b - decorated_fn = registry.get("test_fn").fn + decorated_fn = registry.get_verb_function("test_fn") assert test_fn(1, 2) == 3 decorated_output = decorated_fn(VerbInput(named_inputs={"a": 1, "b": 2})) @@ -97,16 +97,16 @@ def condition(inputs): @verb( name="test_fn", registry=registry, - ports=[ - InputPort(name="a", required=True), - InputPort(name="b", required=True), + bindings=[ + Input(name="a", required=True), + Input(name="b", required=True), ], fire_conditions=[condition], ) def test_fn(a: int, b: int) -> int: return a + b - wrapped_fn = registry.get("test_fn").fn + wrapped_fn = registry.get_verb_function("test_fn") result = wrapped_fn(VerbInput(named_inputs={"a": 1, "b": 2})) assert result.no_output @@ -125,9 +125,9 @@ def condition(inputs, results): @verb( name="test_fn", - ports=[ - InputPort(name="a", required=True), - InputPort(name="b", required=True), + bindings=[ + Input(name="a", required=True), + Input(name="b", required=True), ], emit_conditions=[condition], registry=registry, @@ -135,7 +135,7 @@ def condition(inputs, results): def test_fn(a: int, b: int) -> int: return a + b - wrapped_fn = registry.get("test_fn").fn + wrapped_fn = registry.get_verb_function("test_fn") result = wrapped_fn(VerbInput(named_inputs={"a": 1, "b": 2})) assert result.no_output diff --git a/python/reactivedataflow/tests/unit/define_math_ops.py b/python/reactivedataflow/tests/unit/define_math_ops.py new file mode 100644 index 00000000..724d4b43 --- /dev/null +++ b/python/reactivedataflow/tests/unit/define_math_ops.py @@ -0,0 +1,43 @@ +# Copyright (c) 2024 Microsoft Corporation. +"""reactivedataflow Graph Assembler Tests.""" + +from reactivedataflow import ( + ArrayInput, + Config, + Input, + Registry, + verb, +) +from reactivedataflow.conditions import ( + array_input_not_empty, +) + + +def define_math_ops(registry: Registry) -> None: + @verb( + name="add", + registry=registry, + bindings=[ArrayInput(required=True, parameter="values")], + fire_conditions=[array_input_not_empty()], + ) + def add(values: list[int]) -> int: + return sum(values) + + @verb( + name="multiply", + registry=registry, + bindings=[ + Input(name="a", required=True), + Input(name="b", required=True), + ], + ) + def multiply(a: int, b: int) -> int: + return a * b + + @verb( + name="constant", + registry=registry, + bindings=[Config(name="value", required=True)], + ) + def constant(value: int) -> int: + return value diff --git a/python/reactivedataflow/tests/unit/test_execution_node.py b/python/reactivedataflow/tests/unit/test_execution_node.py index 24090ca7..0bac8a67 100644 --- a/python/reactivedataflow/tests/unit/test_execution_node.py +++ b/python/reactivedataflow/tests/unit/test_execution_node.py @@ -11,14 +11,14 @@ VerbInput, verb, ) -from reactivedataflow.nodes import InputMode -from reactivedataflow.ports import ( - ArrayInputPort, - ConfigPort, - InputPort, - NamedInputsPort, - OutputPort, +from reactivedataflow.bindings import ( + ArrayInput, + Config, + Input, + NamedInputs, + Output, ) +from reactivedataflow.nodes import InputMode from reactivedataflow.registry import Registry @@ -27,14 +27,14 @@ def test_configure_and_reconfigure(): @verb( "execute", - ports=[ConfigPort(name="value", required=True)], + bindings=[Config(name="value", required=True)], input_mode=InputMode.Raw, registry=registry, ) def execute(inputs: VerbInput) -> Any: return inputs.config.get("value") - wrapped_fn = registry.get("execute").fn + wrapped_fn = registry.get_verb_function("execute") node = ExecutionNode("a", wrapped_fn, config={"value": "Hello"}) assert node.config.get("value") == "Hello" assert node.output_value() == "Hello" @@ -58,7 +58,7 @@ def execute(inputs: VerbInput) -> str: + inputs.named_inputs.get("input_2", "") ) - wrapped_fn = registry.get("execute").fn + wrapped_fn = registry.get_verb_function("execute") output: str | None = None node = ExecutionNode("a", wrapped_fn) @@ -80,13 +80,13 @@ def test_execution_node_with_named_inputs(): @verb( "execute", - ports=[NamedInputsPort(parameter="inputs")], + bindings=[NamedInputs(parameter="inputs")], registry=registry, ) def execute(inputs: dict[str, str]) -> str: return inputs.get("input_1", "") + " " + inputs.get("input_2", "") - wrapped_fn = registry.get("execute").fn + wrapped_fn = registry.get_verb_function("execute") output: str | None = None node = ExecutionNode("a", wrapped_fn) @@ -109,13 +109,13 @@ def test_execution_node_with_named_required_inputs(): @verb( "execute", - ports=[NamedInputsPort(parameter="inputs", required=["input_1", "input_2"])], + bindings=[NamedInputs(parameter="inputs", required=["input_1", "input_2"])], registry=registry, ) def execute(inputs: dict[str, str]) -> str: return inputs["input_1"] + " " + inputs["input_2"] - wrapped_fn = registry.get("execute").fn + wrapped_fn = registry.get_verb_function("execute") output: str | None = None node = ExecutionNode("a", wrapped_fn) @@ -138,9 +138,9 @@ def test_execution_node_with_required_inputs(): @verb( "execute_with_required_inputs", - ports=[ - InputPort(name="input_1", required=True), - InputPort(name="input_2", required=True), + bindings=[ + Input(name="input_1", required=True), + Input(name="input_2", required=True), ], input_mode=InputMode.Raw, registry=registry, @@ -149,7 +149,7 @@ def execute_with_required_inputs(inputs: VerbInput) -> str: return inputs.named_inputs["input_1"] + " " + inputs.named_inputs["input_2"] output: str | None = None - fn = registry.get("execute_with_required_inputs").fn + fn = registry.get_verb_function("execute_with_required_inputs") node = ExecutionNode("a", fn) assert node.id == "a" @@ -170,9 +170,9 @@ def test_execution_node_with_required_config(): @verb( "execute_with_required_config", - ports=[ - ConfigPort(name="conf_1", required=True), - ConfigPort(name="conf_2", required=True), + bindings=[ + Config(name="conf_1", required=True), + Config(name="conf_2", required=True), ], input_mode=InputMode.Raw, registry=registry, @@ -181,7 +181,7 @@ def execute_with_required_inputs(inputs: VerbInput) -> str: return inputs.config["conf_1"] + " " + inputs.config["conf_2"] output: str | None = None - fn = registry.get("execute_with_required_config").fn + fn = registry.get_verb_function("execute_with_required_config") node = ExecutionNode("a", fn) assert node.id == "a" @@ -202,10 +202,10 @@ def test_execution_node_with_required_config_and_inputs(): @verb( "execute_with_required_config", - ports=[ - InputPort(name="input_1", required=True), - ConfigPort(name="conf_1", required=True), - ConfigPort(name="conf_2", required=True), + bindings=[ + Input(name="input_1", required=True), + Config(name="conf_1", required=True), + Config(name="conf_2", required=True), ], input_mode=InputMode.Raw, registry=registry, @@ -218,7 +218,7 @@ def execute_with_required_inputs(inputs: VerbInput) -> str: ]) output: str | None = None - fn = registry.get("execute_with_required_config").fn + fn = registry.get_verb_function("execute_with_required_config") node = ExecutionNode("a", fn) assert node.id == "a" @@ -243,9 +243,9 @@ def test_execution_node_with_optional_inputs(): @verb( "execute_with_optional_inputs", - ports=[ - InputPort(name="input_1", parameter="x"), - InputPort(name="input_2", parameter="y"), + bindings=[ + Input(name="input_1", parameter="x"), + Input(name="input_2", parameter="y"), ], registry=registry, ) @@ -254,7 +254,7 @@ def execute_with_optional_named_inputs(x: str | None, y: str | None) -> str: y = y or "" return f"{x} {y}" - wrapped_fn = registry.get("execute_with_optional_inputs").fn + wrapped_fn = registry.get_verb_function("execute_with_optional_inputs") output: str | None = None node = ExecutionNode("a", wrapped_fn) @@ -275,11 +275,11 @@ def test_execution_node_with_multiple_outputs(): @verb( "execute_with_two_outputs", - ports=[ - InputPort(name="input_1", parameter="x"), - InputPort(name="input_2", parameter="y"), - OutputPort(name="output_1"), - OutputPort(name="output_2"), + bindings=[ + Input(name="input_1", parameter="x"), + Input(name="input_2", parameter="y"), + Output(name="output_1"), + Output(name="output_2"), ], output_names=["output_1", "output_2"], output_mode=OutputMode.Tuple, @@ -290,7 +290,7 @@ def execute_with_two_outputs(x: str | None, y: str | None): output_2 = f"{y} {x}" return output_1, output_2 - wrapped_fn = registry.get("execute_with_two_outputs").fn + wrapped_fn = registry.get_verb_function("execute_with_two_outputs") node = ExecutionNode("a", wrapped_fn) output_1: str | None = None @@ -319,7 +319,7 @@ def test_execution_node_with_array_inputs(): @verb( "execute_with_array_inputs", - ports=[ArrayInputPort(parameter="values")], + bindings=[ArrayInput(parameter="values")], registry=registry, ) def execute_with_array_inputs(values: list[int]) -> int: @@ -327,7 +327,7 @@ def execute_with_array_inputs(values: list[int]) -> int: return 0 return sum(values) - wrapped_fn = registry.get("execute_with_array_inputs").fn + wrapped_fn = registry.get_verb_function("execute_with_array_inputs") node = ExecutionNode("a", wrapped_fn) output: str | None = None diff --git a/python/reactivedataflow/tests/unit/test_graph_assembler.py b/python/reactivedataflow/tests/unit/test_graph_assembler.py index 546986df..2bf4e6ae 100644 --- a/python/reactivedataflow/tests/unit/test_graph_assembler.py +++ b/python/reactivedataflow/tests/unit/test_graph_assembler.py @@ -7,10 +7,6 @@ from reactivedataflow import ( GraphAssembler, Registry, - verb, -) -from reactivedataflow.conditions import ( - array_input_not_empty, ) from reactivedataflow.constants import default_output from reactivedataflow.errors import NodeIdAlreadyExistsError, OutputNotDefinedError @@ -20,37 +16,8 @@ InputNode, ProcessingNode, ) -from reactivedataflow.ports import ArrayInputPort, ConfigPort, InputPort - -def define_math_ops(registry: Registry): - @verb( - name="add", - registry=registry, - ports=[ArrayInputPort(required=True, parameter="values")], - fire_conditions=[array_input_not_empty()], - ) - def add(values: list[int]) -> int: - return sum(values) - - @verb( - name="multiply", - registry=registry, - ports=[ - InputPort(name="a", required=True), - InputPort(name="b", required=True), - ], - ) - def multiply(a: int, b: int) -> int: - return a * b - - @verb( - name="constant", - registry=registry, - ports=[ConfigPort(name="value", required=True)], - ) - def constant(value: int) -> int: - return value +from .define_math_ops import define_math_ops def test_double_add_node_raises_error(): diff --git a/python/reactivedataflow/tests/unit/test_ports.py b/python/reactivedataflow/tests/unit/test_ports.py index 6ea83593..91ae7af3 100644 --- a/python/reactivedataflow/tests/unit/test_ports.py +++ b/python/reactivedataflow/tests/unit/test_ports.py @@ -3,22 +3,22 @@ import pytest -from reactivedataflow.errors import PortNamesMustBeUniqueError -from reactivedataflow.ports import ( - ArrayInputPort, - ConfigPort, - InputPort, - OutputPort, - Ports, +from reactivedataflow.bindings import ( + ArrayInput, + Bindings, + Config, + Input, + Output, ) +from reactivedataflow.errors import PortNamesMustBeUniqueError def test_port_map_can_separate_port_types() -> None: - ports = Ports([ - InputPort(name="value", type="str"), - ConfigPort(name="value", type="str"), - OutputPort(name="output", type="str"), - ArrayInputPort(type="str"), + ports = Bindings([ + Input(name="value", type="str"), + Config(name="value", type="str"), + Output(name="output", type="str"), + ArrayInput(type="str"), ]) assert len(ports.input) == 1 assert len(ports.config) == 1 @@ -28,15 +28,15 @@ def test_port_map_can_separate_port_types() -> None: def test_port_map_throws_on_duplicate_input_ports() -> None: with pytest.raises(PortNamesMustBeUniqueError): - Ports([ - InputPort(name="input", type="str"), - InputPort(name="input", type="str"), + Bindings([ + Input(name="input", type="str"), + Input(name="input", type="str"), ]) def test_port_map_throws_on_duplicate_config_ports() -> None: with pytest.raises(PortNamesMustBeUniqueError): - Ports([ - ConfigPort(name="input", type="str"), - ConfigPort(name="input", type="str"), + Bindings([ + Config(name="input", type="str"), + Config(name="input", type="str"), ]) diff --git a/python/reactivedataflow/tests/unit/test_executor_registry.py b/python/reactivedataflow/tests/unit/test_registry.py similarity index 69% rename from python/reactivedataflow/tests/unit/test_executor_registry.py rename to python/reactivedataflow/tests/unit/test_registry.py index 61aebf17..8e655636 100644 --- a/python/reactivedataflow/tests/unit/test_executor_registry.py +++ b/python/reactivedataflow/tests/unit/test_registry.py @@ -6,7 +6,7 @@ import pytest from reactivedataflow import OutputMode, VerbInput, VerbOutput, verb -from reactivedataflow.errors import VerbAlreadyRegisteredError +from reactivedataflow.errors import VerbAlreadyRegisteredError, VerbNotFoundError from reactivedataflow.registry import Registry if TYPE_CHECKING: @@ -53,7 +53,7 @@ def test_fn2(inputs: VerbInput) -> VerbOutput: e2 = test_fn2 double_register() - found = registry.get("test_fn").fn + found = registry.get_verb_function("test_fn") assert found == e2 @@ -61,3 +61,22 @@ def test_static_registry_instance(): registry = Registry.get_instance() registry2 = Registry.get_instance() assert registry == registry2 + + +def test_child(): + registry = Registry() + + @verb(name="test_fn", registry=registry, output_mode=OutputMode.Raw) + def test_fn(inputs: VerbInput) -> VerbOutput: + return VerbOutput(no_output=True) + + clone = registry.clone() + assert clone.get("test_fn") == registry.get("test_fn") + + @verb(name="test_fn2", registry=clone, output_mode=OutputMode.Raw) + def test_fn2(inputs: VerbInput) -> VerbOutput: + return VerbOutput(no_output=True) + + assert clone.get("test_fn2") is not None + with pytest.raises(VerbNotFoundError): + registry.get("test_fn2")