# Improved code for pyiron core 

In [8]:
from collections.abc import MutableMapping
from typing import Any, Dict, List, Optional

import pandas as pd

PORT_LABEL = 'label'
PORT_TYPE = 'type'
PORT_DEFAULT = 'default'
PORT_VALUE = 'value'

# Dummy Node class for demonstration.
class Node:
    pass

class _NotDataType:
    def __repr__(self): return "NotData"
    def __bool__(self): return False
NotData = _NotDataType()

class Port:
    def __init__(
        self,
        label: str,
        type_: Optional[Any] = None,
        default: Optional[Any] = None,
        value: Any = NotData
    ) -> None:
        self.label = label
        self.type = type_
        self.default = default
        self.value = value

    def __repr__(self) -> str:
        return (f"Port(label={self.label!r}, type={self.type!r}, "
                f"default={self.default!r}, value={self.value!r})")

    @property
    def ready(self) -> bool:
        return self.value is not NotData

    @property
    def connected(self) -> bool:
        return isinstance(self.value, (Port, Node))

class PortCollection(MutableMapping):
    """A dict-like container for Port objects (by label), no attribute shadowing ever occurs."""
    def __init__(self, port_dict: Dict[str, List[Any]]):
        self._ports: Dict[str, Port] = {}
        labels = port_dict.get(PORT_LABEL, [])
        keys = list(port_dict.keys())
        for i, label in enumerate(labels):
            attrs = {}
            for key in keys:
                value_list = port_dict.get(key, [])
                attrs[key] = value_list[i] if i < len(value_list) else NotData
            port = Port(
                label=attrs.get(PORT_LABEL),
                type_=attrs.get(PORT_TYPE),
                default=attrs.get(PORT_DEFAULT),
                value=attrs.get(PORT_VALUE, NotData)
            )
            self._ports[label] = port

    # MutableMapping required methods:
    def __getitem__(self, key: str) -> Port:
        return self._ports[key]

    def __setitem__(self, key: str, value: Any):
        if key not in self._ports:
            raise KeyError(f"No port labeled '{key}'")
        self._ports[key].value = value

    def __delitem__(self, key: str):
        raise NotImplementedError("Removing ports is not supported.")

    def __iter__(self):
        return iter(self._ports)

    def __len__(self):
        return len(self._ports)

    # Optional: only support attribute access for getting/setting port value, not for .keys() etc.
    def __getattr__(self, name: str) -> Port:
        if name in self._ports:
            return self._ports[name]
        raise AttributeError(f"No port named '{name}'")

    def __setattr__(self, name: str, value: Any):
        # all real attributes are prefixed with _
        if name.startswith("_"):
            super().__setattr__(name, value)
        elif name in self._ports:
            self._ports[name].value = value
        else:
            super().__setattr__(name, value)

    def __repr__(self) -> str:
        df = self.to_dataframe()
        return df.__repr__()

    def _repr_html_(self):
        df = self.to_dataframe()
        return df._repr_html_()

    def to_dataframe(self) -> pd.DataFrame:
        rows = []
        for port in self._ports.values():
            rows.append({
                PORT_LABEL: port.label,
                PORT_TYPE: port.type,
                PORT_DEFAULT: port.default,
                PORT_VALUE: port.value,
                "ready": port.ready,
                "connected": port.connected
            })
        return pd.DataFrame(rows)

In [9]:
ports = PortCollection({
    PORT_LABEL:   ['label_1', 'label_2', 'keys'],
    PORT_TYPE:    ['int', 'float', 'object'],
    PORT_DEFAULT: [0, 1.0, None],
    PORT_VALUE:   [NotData, NotData, NotData],
})

ports.label_1 = 42
ports.label_2 = ports.label_1
ports.keys = Node()

[k for k in ports.keys()], 'label_1' in ports, ports['label_2'].value #, ports['label_3'].connected  
ports

Unnamed: 0,label,type,default,value,ready,connected
0,label_1,int,0.0,42,True,False
1,label_2,float,1.0,"Port(label='label_1', type='int', default=0, v...",True,True
2,keys,object,,<__main__.Node object at 0x1694ebce0>,True,True


In [10]:
import functools
from typing import (
    Any,
    Callable,
    Dict,
    Generic,
    List,
    Optional,
    TypeVar,
)


# --- Dummy Node and Data classes for demonstration ---
# class Port: pass
# class Data(dict):
#     def __init__(self, *a, **k): super().__init__(*a, **k)
# PORT_LABEL = 'label'
# PORT_TYPE = 'type'
# PORT_DEFAULT = 'default'
# PORT_VALUE = 'value'
class Node:
    def __init__(self, *args, **kwargs):
        self.data = (args, kwargs)

    def __repr__(self):
        return f"Node({self.data})"


# --- Typing for functions and nodes ---
F = TypeVar("F", bound=Callable[..., Any])


class NodeFunction(Generic[F]):
    """
    Represents a wrapped function which, when called, returns a Node.
    """

    def __init__(
        self,
        func: F,
        inner_wrap_return_func: Callable[..., Node],
        name_postfix: str,
        node_type: str,
        output_labels: Optional[List[str]],
    ) -> None:
        functools.update_wrapper(self, func)
        self._func = func
        self._inner_wrap_return_func = inner_wrap_return_func
        self._name_postfix = name_postfix
        self._node_type = node_type
        self._output_labels = output_labels

    def __call__(self, *f_args: Any, **f_kwargs: Any) -> Node:
        # Optionally extract label keyword
        cf_kwargs = dict(f_kwargs)
        label = cf_kwargs.pop("label", None)
        return self._inner_wrap_return_func(
            self._func,
            label,
            self._output_labels,
            self._node_type,
            *f_args,
            **cf_kwargs,
        )

    def __repr__(self) -> str:
        return f"<NodeFunction wrapping {self._func.__name__}>"

    @property
    def func(self) -> F:
        return self._func

    @property
    def output_labels(self) -> Optional[List[str]]:
        return self._output_labels


# --- Decorator factory ---
def make_node_decorator(
    inner_wrap_return_func: Callable[..., Node],
    name_postfix: str,
    node_type: str = "function_node",
) -> Callable[..., NodeFunction]:
    def _node_decorator(*args: Any, **kwargs: Any) -> Callable[[F], NodeFunction[F]]:
        def wrapper(func: F) -> NodeFunction[F]:
            # Parse output_labels
            if kwargs and "labels" in kwargs:
                output_labels = kwargs["labels"]
            elif args:
                arg0 = args[0]
                if isinstance(arg0, str):
                    output_labels = list(args)
                elif isinstance(arg0, list):
                    output_labels = arg0
                else:
                    output_labels = None
            else:
                output_labels = None
            if isinstance(output_labels, str):
                output_labels = [output_labels]
            # Return a NodeFunction; not just a raw function
            return NodeFunction(
                func, inner_wrap_return_func, name_postfix, node_type, output_labels
            )

        # Support both @decorator and @decorator(args)
        if args and callable(args[0]):
            return wrapper(args[0])
        else:
            return wrapper

    return _node_decorator


# --- Example inner_wrap_return_func, replace with your own logic ---
def _return_as_function_node(
    func: Callable[..., Any],
    label: Optional[str],
    output_labels: Optional[List[str]],
    node_type: str,
    *f_args: Any,
    **f_kwargs: Any,
) -> Node:
    # Dummy node-building logic for demonstration
    return Node(
        func=func,
        label=label,
        output_labels=output_labels,
        node_type=node_type,
        call_args=f_args,
        call_kwargs=f_kwargs,
    )


# --- Exported decorator ---
as_function_node: Callable[..., NodeFunction] = make_node_decorator(
    _return_as_function_node, "_postfix", "function_node"
)

In [None]:
# --- Usage and demonstration ---
@as_function_node(labels=['result'])
def myfunc(x: float, y: int) -> float:
    return x + y

print(type(myfunc))      # \<class '__main__.NodeFunction'\>
print(myfunc)            # \<NodeFunction wrapping myfunc\>
res = myfunc(2, 3)
print(res)               # Node instance

# For type checking: isinstance(myfunc, NodeFunction) is True
print(isinstance(myfunc, NodeFunction)) # True

# Type hints:
# reveal_type(myfunc)   # In mypy/IDE, this shows NodeFunction[Callable[[float, int], float]]

myfunc # Call the function node

<class '__main__.NodeFunction'>
<NodeFunction wrapping myfunc>
Node(((), {'func': <function myfunc at 0x169a071a0>, 'label': None, 'output_labels': ['result'], 'node_type': 'function_node', 'call_args': (2, 3), 'call_kwargs': {}}))
True


AttributeError: 'Node' object has no attribute 'inputs'

In [12]:
xx

NameError: name 'xx' is not defined

In [None]:
import ast
from typing import Any, Dict, List, Tuple

from pyiron_core.pyiron_workflow import Node


### BEGIN: Helper function from previous answer with stricter checks
class ReturnAnalysisError(Exception):
    pass

def analyze_function_code(func_str):
    tree = ast.parse(func_str)
    # Find the first function definition
    func_node = next(node for node in tree.body if isinstance(node, ast.FunctionDef))
    # Arguments
    args_info = []
    all_args = func_node.args.args
    defaults = [None] * (len(all_args) - len(func_node.args.defaults)) + func_node.args.defaults
    for arg, default in zip(all_args, defaults):
        arg_name = arg.arg
        arg_type = ast.unparse(arg.annotation) if arg.annotation else None
        if default is not None:
            try:
                default_value = ast.literal_eval(default)
            except Exception:
                default_value = ast.unparse(default)
        else:
            default_value = None
        args_info.append({
            'name': arg_name,
            'type': arg_type,
            'default': default_value
        })
    # Return type annotation
    return_type = ast.unparse(func_node.returns) if func_node.returns else None

    # Extract return variable names from 'return' (with safety checks)
    class ReturnVisitor(ast.NodeVisitor):
        def __init__(self):
            self.return_vars = []
        def visit_Return(self, node):
            if node.value is None:
                self.return_vars.append(None)
            elif isinstance(node.value, ast.Name):
                self.return_vars.append(node.value.id)
            elif isinstance(node.value, ast.Tuple):
                names = []
                for elt in node.value.elts:
                    if isinstance(elt, ast.Name):
                        names.append(elt.id)
                    else:
                        raise ReturnAnalysisError(
                            f"Invalid return variable: {ast.unparse(elt)}. Must return variable names, not expressions."
                        )
                self.return_vars.append(tuple(names))
            else:
                raise ReturnAnalysisError(
                    f"Invalid return value: {ast.unparse(node.value)}. Must return variable names, not expressions."
                )
    visitor = ReturnVisitor()
    visitor.visit(func_node)
    if len(visitor.return_vars) > 1:
        raise ReturnAnalysisError("Function contains multiple 'return' statements, which is not supported.")
    return_vars = visitor.return_vars[0] if visitor.return_vars else None
    return {
        'function_name': func_node.name,
        'arguments': args_info,
        'return_type': return_type,
        'returned_variables': return_vars
    }
### END: Helper function

### The requested main function:
def function_string_to_node(func_str):
    # Analyze the function code to extract port specifications and return variable names
    info = analyze_function_code(func_str)
    arg_info = info['arguments']
    return_type = info['return_type']
    return_vars = info['returned_variables']
    # Compose code string and function object
    local_vars = {}
    exec_globals = {
        '__builtins__': __builtins__,
        'Tuple': Tuple,
        'List': List,
        'Dict': Dict,
        'Any': Any,
    }
    exec(func_str, exec_globals, local_vars)
    fn = [v for k,v in local_vars.items() if callable(v)][0]  # Gets the defined function
    
    # Prepare inputs for Node
    inputs = PortCollection({
        PORT_LABEL: [arg['name'] for arg in arg_info],
        PORT_TYPE: [arg['type'] for arg in arg_info],
        PORT_DEFAULT: [arg['default'] for arg in arg_info],
        PORT_VALUE: [None] * len(arg_info),
        "ready": [False] * len(arg_info)
    })

    # Prepare outputs for Node
    if return_vars is None:
        output_labels = []
        output_types = []
    elif isinstance(return_vars, str):
        output_labels = [return_vars]
        output_types = [return_type]
    else:  # tuple of names
        output_labels = list(return_vars)
        # split return_type if it's a tuple type (e.g. Tuple[int,int])
        # Otherwise just assign the same return_type to all outputs for lack of better info
        if return_type and return_type.startswith('Tuple['):
            inside = return_type[6:-1]
            out_types = [s.strip() for s in inside.split(',')]
            if len(out_types) == len(output_labels):
                output_types = out_types
            else:
                output_types = [return_type] * len(output_labels)
        else:
            output_types = [return_type] * len(output_labels)
    outputs = PortCollection({
        PORT_LABEL: output_labels,
        PORT_TYPE: output_types,
        PORT_VALUE: [None]*len(output_labels),
        "ready": [False] * len(output_labels)
    })
    
    node = Node(
        func=fn,
        inputs=inputs,
        outputs=outputs,
        label=info['function_name'],
        output_labels=output_labels if output_labels else None,
        node_type="function_node"
    )
    return node

In [None]:
# --- Exported decorator ---
as_function_node: Callable[..., NodeFunction] = make_node_decorator(
    _return_as_function_node, "_postfix", "function_node"
)

# --- Usage and demonstration ---
@as_function_node(labels=['result'])
def myfunc(x: float, y: int) -> float:
    return x + y

print(type(myfunc))      # \<class '__main__.NodeFunction'\>
print(myfunc)            # \<NodeFunction wrapping myfunc\>
res = myfunc(2, 3)
print(res)               # Node instance

# For type checking: isinstance(myfunc, NodeFunction) is True
print(isinstance(myfunc, NodeFunction)) # True

# Type hints:
# reveal_type(myfunc)   # In mypy/IDE, this shows NodeFunction[Callable[[float, int], float]]

<class '__main__.NodeFunction'>
<NodeFunction wrapping myfunc>


TypeError: Node.__init__() got an unexpected keyword argument 'call_args'

In [None]:
# Test with an example

func_str = '''
def AddMultiply(x: float, y: int = 2) -> Tuple[float, float]:
    z = x + y
    w = x * y
    return z, w
'''
n = function_string_to_node(func_str)
n(x=3, y=2)
n.outputs.z.value

AttributeError: No port named 'data'

In [None]:
# --- Example usage ---
if __name__ == "__main__":
    ports = PortCollection({
        PORT_LABEL:   ['label_1', 'label_2', 'label_3'],
        PORT_TYPE:    ['int', 'float', 'object'],
        PORT_DEFAULT: [0, 1.0, None],
        PORT_VALUE:   [NotData, NotData, NotData],
    })

    print(ports.label_1)                      # Port(label=...)
    print("Is ready?", ports.label_1.ready)   # False
    print("Is connected?", ports.label_1.connected) # False
    ports.label_1 = 42
    print("Is ready after setting:", ports.label_1.ready)  # True
    print("Is connected after setting:", ports.label_1.connected)  # False

    # Connect to a Port
    ports.label_2 = ports.label_1
    print("Now label_2 connected?", ports.label_2.connected)  # True

    # Connect to a Node
    ports.label_3 = Node()
    print("Now label_3 connected?", ports.label_3.connected)  # True


Port(label='label_1', type='int', default=0, value=NotData)
Is ready? False
Is connected? False
Is ready after setting: True
Is connected after setting: False
Now label_2 connected? True
Now label_3 connected? True


In [None]:
ports = PortCollection({
    PORT_LABEL: ['label_1', 'label_2'],
    PORT_TYPE: ['int', 'float'],
    PORT_DEFAULT: [0, 1.0],
    PORT_VALUE: [NotData, NotData],  # explicit, or omit for default
})

# Access by attribute
print(ports.label_1)        # Port(label='label_1', type='int', default=0, value=NotData)
print(ports.label_2.ready)  # False

# Access by key
print(ports['label_2'])     # Port(label='label_2', type='float', default=1.0, value=NotData)

# Assignment by attribute
ports.label_1 = 42
print(ports.label_1.value)  # 42
print(ports.label_1.ready)  # True

# Assignment by key
ports['label_2'] = 3.14
print(ports.label_2.value)  # 3.14
print(ports['label_2'].ready)  # True

# Print the whole set
ports  # PortCollection(Port(...), Port(...))

Port(label='label_1', type='int', default=0, value=NotData)
False
Port(label='label_2', type='float', default=1.0, value=NotData)
42
True
3.14
True


Unnamed: 0,label,type,default,value,ready,connected
0,label_1,int,0.0,42.0,True,False
1,label_2,float,1.0,3.14,True,False


In [None]:
ports.label_1 = NotData

In [None]:
ports.label_1.ready  # False

False

In [None]:
ports

Unnamed: 0,label,type,default,value,ready,connected
0,label_1,int,0.0,NotData,False,False
1,label_2,float,1.0,3.14,True,False
