diff --git a/python/reactivedataflow/pyproject.toml b/python/reactivedataflow/pyproject.toml index d6bb947f..08040b0f 100644 --- a/python/reactivedataflow/pyproject.toml +++ b/python/reactivedataflow/pyproject.toml @@ -130,7 +130,7 @@ ignore = [ "*.ipynb" = ["T201"] [tool.ruff.lint.pydocstyle] -convention = "numpy" +convention = "google" [tool.pytest.ini_options] log_cli = true diff --git a/python/reactivedataflow/reactivedataflow/bindings.py b/python/reactivedataflow/reactivedataflow/bindings.py index 3678bd9f..964b764d 100644 --- a/python/reactivedataflow/reactivedataflow/bindings.py +++ b/python/reactivedataflow/reactivedataflow/bindings.py @@ -11,9 +11,11 @@ class ArrayInput(BaseModel, extra="allow"): - """Specification for an array-based input port.""" + """Specification for an array-based input binding.""" - type: str | None = Field(default=None, description="The type of the port.") + type: str | None = Field( + default=None, description="The item-type of the array input port." + ) required: bool = Field( default=False, description="Whether the input port is required." ) @@ -81,7 +83,7 @@ class Config(BaseModel, extra="allow"): class Bindings: - """Node Binding Managemer class. + """Node Binding Manager class. Node bindings are used to map processing-graph inputs, outputs, and configuration values into the appropriate function parameters. This class is used to manage the bindings for a node. @@ -89,8 +91,13 @@ class Bindings: _bindings: list[Binding] - def __init__(self, ports: list[Binding]): - self._bindings = ports + def __init__(self, bindings: list[Binding]): + """Initialize the Bindings object. + + Args: + bindings: The list of bindings for the node. + """ + self._bindings = bindings self._validate() def _validate(self): @@ -111,24 +118,24 @@ def bindings(self) -> list[Binding]: @property def config(self) -> list[Config]: """Return the configuration bindings.""" - return [port for port in self.bindings if isinstance(port, Config)] + return [b for b in self.bindings if isinstance(b, Config)] @property def input(self) -> list[Input]: """Return the input bindings.""" - return [port for port in self.bindings if isinstance(port, Input)] + return [b for b in self.bindings if isinstance(b, Input)] @property def outputs(self) -> list[Output]: """Return the output bindings.""" - return [port for port in self._bindings if isinstance(port, Output)] + return [b for b in self._bindings if isinstance(b, Output)] @property def array_input(self) -> ArrayInput | None: - """Return the array input port.""" + """Return the array input binding.""" return next((p for p in self._bindings if isinstance(p, ArrayInput)), None) @property def named_inputs(self) -> NamedInputs | None: - """Return the named inputs port.""" + """Return the named inputs binding.""" return next((p for p in self._bindings if isinstance(p, NamedInputs)), None) diff --git a/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py b/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py index 2f6ba7b6..38fe4b2b 100644 --- a/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py +++ b/python/reactivedataflow/reactivedataflow/decorators/apply_decorators.py @@ -10,9 +10,12 @@ def apply_decorators(fn: AnyFn, decorators: list[Decorator]) -> AnyFn: - """ - Apply a series of decorators to a function reference. + """Apply a series of decorators to a function reference. This is useful for splitting apart verb registration from the verb implementation. + + Args: + fn: The function to decorate. + decorators: The decorators to apply. These will be applied in reverse order so that they can be copied/pasted from a decorated function. """ return reduce(lambda x, y: y(x), reversed(decorators), fn) diff --git a/python/reactivedataflow/reactivedataflow/decorators/connect_input.py b/python/reactivedataflow/reactivedataflow/decorators/connect_input.py index 0d689643..d617dc02 100644 --- a/python/reactivedataflow/reactivedataflow/decorators/connect_input.py +++ b/python/reactivedataflow/reactivedataflow/decorators/connect_input.py @@ -17,7 +17,7 @@ def connect_input( """Decorate an execution function with input conditions. Args: - required (list[str] | None): The list of required input names. Defaults to None. If present, the function will only execute if all required inputs are present. + bindings (Bindings): The input bindings for the function. """ def wrap_fn( diff --git a/python/reactivedataflow/reactivedataflow/decorators/connect_output.py b/python/reactivedataflow/reactivedataflow/decorators/connect_output.py index 70ff5d52..9968f5b9 100644 --- a/python/reactivedataflow/reactivedataflow/decorators/connect_output.py +++ b/python/reactivedataflow/reactivedataflow/decorators/connect_output.py @@ -25,7 +25,8 @@ def connect_output( """Decorate an execution function with output conditions. Args: - default_output (bool): The default output of the function. + mode (OutputMode, optional): The output mode. Defaults to OutputMode.Value. + output_names (list[str] | None, optional): The output names. Defaults to None. """ if mode == OutputMode.Tuple and output_names is None: raise OutputNamesMissingInTupleOutputModeError diff --git a/python/reactivedataflow/reactivedataflow/errors.py b/python/reactivedataflow/reactivedataflow/errors.py index 3e7ac716..da05ec52 100644 --- a/python/reactivedataflow/reactivedataflow/errors.py +++ b/python/reactivedataflow/reactivedataflow/errors.py @@ -6,6 +6,7 @@ class NodeAlreadyDefinedError(ValueError): """An exception for adding a node that already exists.""" def __init__(self, nid: str): + """Initialize the NodeAlreadyDefinedError.""" super().__init__(f"Node {nid} already defined.") @@ -13,6 +14,7 @@ class NodeNotFoundError(KeyError): """An exception for unknown node.""" def __init__(self, nid: str): + """Initialize the NodeNotFoundError.""" super().__init__(f"Node {nid} not found.") @@ -28,6 +30,7 @@ class OutputNotFoundError(ValueError): """An exception for output not defined.""" def __init__(self, output_name: str): + """Initialize the OutputNotFoundError.""" super().__init__(f"Output '{output_name}' not found.") @@ -35,6 +38,7 @@ class OutputNamesMissingInTupleOutputModeError(ValueError): """An exception for missing output names in tuple output mode.""" def __init__(self): + """Initialize the OutputNamesMissingInTupleOutputModeError.""" super().__init__("Output names are required in tuple output mode.") @@ -42,6 +46,7 @@ class OutputNamesNotValidInValueOutputModeError(ValueError): """An exception for output names in value output mode.""" def __init__(self): + """Initialize the OutputNamesNotValidInValueOutputModeError.""" super().__init__("Output names are not allowed in Value output mode") @@ -49,6 +54,7 @@ class VerbNotFoundError(KeyError): """An exception for unknown verb.""" def __init__(self, name: str): + """Initialize the VerbNotFoundError.""" super().__init__(f"Unknown verb: {name}") @@ -56,6 +62,7 @@ class VerbAlreadyDefinedError(ValueError): """An exception for already defined verb.""" def __init__(self, name: str): + """Initialize the VerbAlreadyDefinedError.""" super().__init__(f"Verb {name} already defined.") @@ -63,4 +70,5 @@ class PortNamesMustBeUniqueError(ValueError): """An exception for non-unique port names.""" def __init__(self): + """Initialize the PortNamesMustBeUniqueError.""" super().__init__("Port names must be unique.") diff --git a/python/reactivedataflow/reactivedataflow/execution_graph.py b/python/reactivedataflow/reactivedataflow/execution_graph.py index a4dc08b8..57da5a80 100644 --- a/python/reactivedataflow/reactivedataflow/execution_graph.py +++ b/python/reactivedataflow/reactivedataflow/execution_graph.py @@ -17,6 +17,12 @@ class ExecutionGraph: _outputs: dict[str, Output] def __init__(self, nodes: dict[str, Node], outputs: dict[str, Output]): + """Initialize the execution graph. + + Args: + nodes: The nodes in the graph. + outputs: The outputs of the graph. + """ self._nodes = nodes self._outputs = outputs @@ -30,7 +36,7 @@ def output(self, name: str) -> rx.Observable[Any]: output = self._outputs.get(name) if output is None: raise OutputNotFoundError(name) - node = self._nodes[output.node] + node = self._nodes[output.node or output.name] return node.output(output.port) def output_value(self, name: str) -> rx.Observable[Any]: @@ -38,5 +44,5 @@ def output_value(self, name: str) -> rx.Observable[Any]: output = self._outputs.get(name) if output is None: raise OutputNotFoundError(name) - node = self._nodes[output.node] + node = self._nodes[output.node or output.name] return node.output_value(output.port) diff --git a/python/reactivedataflow/reactivedataflow/graph_builder.py b/python/reactivedataflow/reactivedataflow/graph_builder.py index 0265989b..439c3326 100644 --- a/python/reactivedataflow/reactivedataflow/graph_builder.py +++ b/python/reactivedataflow/reactivedataflow/graph_builder.py @@ -19,7 +19,11 @@ class GraphBuilder: - """GraphBuilder class.""" + """GraphBuilder class. + + This class can be used to iteratively construct a graph by adding nodes and edges, or by ingesting a Graph model directly. + Once a graph has been built, run the `build` command with the global configuration object to use, and it will return an ExecutionGraph object. + """ _graph: nx.DiGraph _outputs: dict[str, Output] @@ -37,13 +41,12 @@ def add_input(self, nid: str) -> "GraphBuilder": def add_output( self, name: str, node: str | None = None, port: str = default_output ) -> "GraphBuilder": - """ - Add an output to the graph. + """Add an output to the graph. - --- - name: The unique name of the output. - node: The node identifier, if None, then the name is the node identifier. - port: The node output port. + Args: + name (str): The unique name of the output. + node (str | None): The node identifier, if None, then the name is the node identifier. + port (str | none): The node output port. """ if node is None: node = name @@ -60,7 +63,14 @@ def add_node( config: dict[str, Any] | None = None, override: bool = False, ) -> "GraphBuilder": - """Add a node to the graph.""" + """Add a node to the graph. + + Args: + nid: The unique identifier of the node. + verb: The verb to execute. + config: The configuration for the verb. + override: Whether to override the node if it already exists. + """ if self._graph.has_node(nid) and not override: raise NodeAlreadyDefinedError(nid) self._graph.add_node(nid, verb=verb, config=config) @@ -73,11 +83,13 @@ def add_edge( from_port: str | None = None, to_port: str | None = None, ) -> "GraphBuilder": - """ - Add an edge to the graph. + """Add an edge to the graph. - If from_port is None, then the default output port will be used. - If to_port is None, then this input will be used as an array input. + Args: + from_node: The node to connect from. + to_node: The node to connect to. + from_port (str | None): The output port to connect from. If None, then the default output port will be used. + to_port: The input port to connect to. If None, then this input will be used as an array input. """ if not self._graph.has_node(from_node): raise NodeNotFoundError(from_node) @@ -87,7 +99,11 @@ def add_edge( return self def load(self, model: Graph) -> "GraphBuilder": - """Load a graph model.""" + """Load a graph model. + + Args: + model: The graph model to load. + """ for node in model.inputs: self.add_input(node.id) for output in model.outputs: @@ -113,7 +129,13 @@ def build( config: dict[str, Any] | None = None, registry: Registry | None = None, ) -> ExecutionGraph: - """Build the graph.""" + """Build the graph. + + Args: + inputs: The inputs to the graph. + config: The global configuration for the graph. + registry: The registry to use for verb lookup. + """ inputs = inputs or {} registry = registry or Registry.get_instance() config = config or {} diff --git a/python/reactivedataflow/reactivedataflow/model.py b/python/reactivedataflow/reactivedataflow/model.py index 6bcc0f93..ae68d59a 100644 --- a/python/reactivedataflow/reactivedataflow/model.py +++ b/python/reactivedataflow/reactivedataflow/model.py @@ -45,7 +45,7 @@ class Output(BaseModel): """Output model.""" name: str = Field(..., description="The unique name of the output.") - node: str = Field(..., description="Node identifier.") + node: str | None = Field(default=None, description="Node identifier.") port: str = Field(default=default_output, description="Port identifier.") diff --git a/python/reactivedataflow/reactivedataflow/nodes/execution_node.py b/python/reactivedataflow/reactivedataflow/nodes/execution_node.py index 9064e179..f58867b2 100644 --- a/python/reactivedataflow/reactivedataflow/nodes/execution_node.py +++ b/python/reactivedataflow/reactivedataflow/nodes/execution_node.py @@ -38,7 +38,9 @@ def __init__( """Initialize the ExecutionNode. Args: + nid (str): The node identifier. fn (VerbFunction): The execution logic for the function. The input is a dictionary of input names to their latest values. + config (dict[str, Any], optional): The configuration for the node. Defaults to None. """ self._id = nid self._fn = fn diff --git a/python/reactivedataflow/reactivedataflow/registry/registry.py b/python/reactivedataflow/reactivedataflow/registry/registry.py index 38afe713..078cf43e 100644 --- a/python/reactivedataflow/reactivedataflow/registry/registry.py +++ b/python/reactivedataflow/reactivedataflow/registry/registry.py @@ -28,6 +28,12 @@ def __init__( verb_constructor: VerbConstructor = verb_constructor, verbs: dict[str, Registration] | None = None, ): + """Initialize the registry. + + Args: + verb_constructor: The constructor function for creating verb-functions from Registrations. + verbs: The initial set of verbs to register. + """ self._verbs = verbs or {} self._verb_fns = {} self._verb_constructor = verb_constructor