# Pydantic-based executors
The [PEP-3184](https://peps.python.org/pep-3148/) executor standard allows us to create an interface for executor objects and provide intelligent context for their execution. Pydantic validators allow the dynamic validation of executor initialization and execution based on signature inspection.

Before you start, make sure you're using Pydantic >= 1.9.0. 1.8 has all sorts of bugs with json encoder propagation. 

In [1]:
# imports
import contextlib
import copy
import inspect
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from importlib import import_module
from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar
from types import FunctionType
from pydantic import BaseModel, Field, root_validator, validate_arguments, validator, ValidationError
from pydantic.generics import GenericModel

logger = logging.getLogger("__name__")


## GENERICS

Because the executor classes take many forms, we'll be making use of Pydantic's generic class composition for executor type interpolation. We are able to do this by creating a placeholder TypeVar. Here, this is names ObjType, because the executor classes make use of a generalizable loading approach that could be extented to objects generally.

In [2]:
ObjType = TypeVar("ObjType")

## JSON Encoders

Pydantic does not propogate JSON encoders to child classes, so we'll define a set of common encoders:

In [3]:
JSON_ENCODERS = {
    FunctionType: lambda x: f"{x.__module__}:{x.__name__}",
    Callable: lambda x: f"{x.__module__}:{type(x).__name__}", # for encoding functions
    type: lambda x: f"{x.__module__}:{x.__name__}", # for encoding a type
    ObjType: lambda x: f"{x.__module__}:{x.__class__.__name__}", # for encoding instances of the ObjType}
}

## Utility function for validating kwargs against a signature

Central to generalizablity between executors is the ability to validate kwargs provided against the executor class. Below we define a utility using pydantic's validate_arguments decorator and the inspect module. At present, this method is only configured to handle kwargs (which will cover most cases), but could be extended.

In [4]:

@validate_arguments(config={"arbitrary_types_allowed": True})
def validate_and_compose_kwargs(signature: inspect.Signature, kwargs: Dict[str, Any]):
    required_kwargs = [
        kwarg.name
        for kwarg in signature.parameters.values()
        if (kwarg.POSITIONAL_OR_KEYWORD or kwarg.KEYWORD_ONLY)
        and kwarg.default is inspect.Parameter.empty
    ]

    if any([required_kwarg not in kwargs.keys() for required_kwarg in kwargs.keys()]):
        raise ValueError(
            "All required kwargs not provided: %s", ", ".join(required_kwargs)
        )

    # check (kwarg.VAR_KEYWORD and kwarg.default is inspect.Parameter.empty) is not empty **kwargs
    sig_kwargs = {
        kwarg.name: kwarg.default
        for kwarg in signature.parameters.values()
        if (kwarg.POSITIONAL_OR_KEYWORD or kwarg.KEYWORD_ONLY)
        and not kwarg.kind == inspect.Parameter.VAR_KEYWORD
    }

    # validate kwargs
    if any([kwarg not in sig_kwargs.keys() for kwarg in kwargs.keys()]):
        raise ValueError(
            "Kwargs must be members of function signature. Accepted kwargs are: %s, Provided: %s",
            ", ".join(sig_kwargs.keys()),
            ", ".join(kwargs.keys()),
        )

    sig_kwargs.update(kwargs)

    return sig_kwargs

## Representing callables as Pydantic models
Representing callables as pydantic models allows us to take advantage of both pydantic serialization to json and pydantic's validation hooks for the kwarg validation upon creation, with possibility of delaying load. Here `CallableModel`, we can provide initialization kwargs for a to-be-instantiated-later object and reap the benefit of additional kwarg validation. 

In [12]:
class CallableModel(BaseModel):
    callable: Callable
    kwargs: dict

    class Config:
        arbitrary_types_allowed = True
        json_encoders = JSON_ENCODERS

    @root_validator(pre=True)
    def validate_all(cls, values):
        fn = values.pop("callable")

        if not isinstance(
            fn,
            (
                str,
                Callable,
            ),
        ):
            raise ValueError(
                "Callable must be object or a string. Provided %s", type(fn)
            )

        # parse string to callable
        if isinstance(fn, (str,)):

            # for function loading
            module_name, fn_name = fn.rsplit(":", 1)
            fn = getattr(import_module(module_name), fn_name)

        sig = inspect.signature(fn)

        # for reloading:
        if values.get("kwargs") is not None:
            values = values["kwargs"]

        kwargs = validate_and_compose_kwargs(sig, values)

        return {"callable": fn, "kwargs": kwargs}

    def __call__(self, **kwargs):
        return self.callable(**{**self.kwargs, **kwargs})


Let's test the callables on example function and class:

In [13]:
def test_function(x: int, y: int = 5):
    return x + y


class TestClass:
    def __init__(self, x, y):
        self.x = x
        self.y = y

In [14]:

fn = CallableModel(callable=test_function, x=1, y=3)
fn(y=9)

10

In [15]:
# dict rep
fn_dict = fn.dict()
fn_dict

{'callable': <function __main__.test_function(x: int, y: int = 5)>,
 'kwargs': {'x': 1, 'y': 3}}

In [17]:
# load from dict
fn_from_dict = CallableModel(**fn.dict()) 
fn_from_dict()

4

In [18]:
# json representation
fn.json() 

'{"callable": "__main__:test_function", "kwargs": {"x": 1, "y": 3}}'

In [19]:
# callable from json
fn_from_json = CallableModel.parse_raw(fn.json())
fn_from_json()

4

# With Classes

In [23]:
# Class kwargs passed after
parameterized_class = CallableModel(callable=TestClass, x=1, y=3)
test_class_obj = parameterized_class()
assert isinstance(test_class_obj, (TestClass,))

In [24]:
# dict rep
parameterized_class_dict = parameterized_class.dict()
parameterized_class_dict

{'callable': __main__.TestClass, 'kwargs': {'x': 1, 'y': 3}}

In [25]:
# from dict
parameterized_class_from_dict = CallableModel(**parameterized_class_dict)
parameterized_class_from_dict

CallableModel(callable=<class '__main__.TestClass'>, kwargs={'x': 1, 'y': 3})

In [26]:
parameterized_class_from_dict_obj = parameterized_class_from_dict()
assert isinstance(parameterized_class_from_dict_obj, (TestClass,))

In [27]:
#json 
parameterized_class_json = parameterized_class.json()
parameterized_class_json

'{"callable": "__main__:TestClass", "kwargs": {"x": 1, "y": 3}}'

In [28]:
parameterized_class_from_json = CallableModel.parse_raw(parameterized_class_json)
test_class_obj = parameterized_class_from_json()
assert isinstance(test_class_obj, (TestClass,))

We can use the callables to construct a dynamic object loader. The generic type allows us to use this same method for any executor. The syntax: `ObjLoader[ThreadPoolExecutor]` composes a new class entirely, this one specific to the `ThreadPoolExecutor`. 

In [29]:

class ObjLoader(
    GenericModel,
    Generic[ObjType],
    arbitrary_types_allowed=True,
    json_encoders=JSON_ENCODERS,
):
    object: Optional[ObjType]
    loader: CallableModel = None
    object_type: Optional[type]

    @root_validator(pre=True)
    def validate_all(cls, values):
        # inspect class init signature
        obj_type = cls.__fields__["object"].type_
        
        # adjust for re init from json
        if "loader" not in values:
            loader = CallableModel(callable=obj_type, **values)

        else:
            # validate loader callable is same as obj type
            if values["loader"].get("callable") is not None:
                # unparameterized callable will handle parsing
                callable = CallableModel(
                    callable=values["loader"]["callable"]
                )
                
                if not callable.callable is obj_type:
                    raise ValueError(
                        "Provided loader of type %s. ObjLoader parameterized for %s",
                        callable.callable.__name__,
                        obj_type,
                    )

                # opt for obj type
                values["loader"].pop("callable")

            # re-init drop callable from loader vals to use new instance
            loader = CallableModel(callable=obj_type, **values["loader"])

        # update the class json encoders. Will only execute on initial type construction
        if obj_type not in cls.__config__.json_encoders:
            cls.__config__.json_encoders[obj_type] = cls.__config__.json_encoders.pop(
                ObjType
            )
        return {"object_type": obj_type, "loader": loader}

    def load(self, store: bool = False):
        # store object reference on loader
        if store:
            self.object = self.loader.call()
            return self.object

        # return loaded object w/o storing
        else:
            return self.loader()

Let's test object loader on our `TestClass`:

In [30]:
obj_loader = ObjLoader[TestClass](x=1, y=3)
loaded = obj_loader.load()
loaded.x
loaded.y

3

Can do this for a generic object like `ThreadPoolExecutor`:

In [31]:
tpe_loader = ObjLoader[ThreadPoolExecutor](max_workers=1)
tpe = tpe_loader.load()
tpe
tpe_loader_json  = tpe_loader.json()
tpe_loader_json
tpe_loader_from_json = ObjLoader[ThreadPoolExecutor].parse_raw(tpe_loader_json)


# shutdown tpe
tpe.shutdown()


## Executors
The previous classes were an attempt to demonstrate generic utility. The Executors to follow will build off of those common utilities to parameterize generic executors complying with the pep-3148 standard (the callables have been typified in case of deviation). Likewise, the following BaseExecutor outlines common executor fields and methods.

In [33]:

# COMMON BASE FOR EXECUTORS
class BaseExecutor(
    GenericModel,
    Generic[ObjType],
    arbitrary_types_allowed=True,
    json_encoders=JSON_ENCODERS,
):
    # executor_type must comply with https://peps.python.org/pep-3148/ standard
    loader: Optional[ObjLoader[ObjType]] # loader of executor type

    # This is a utility field not included in reps. The typing lib has opened issues on access of generic type within class.
    # This tracks for if-necessary future use.
    executor_type: type = Field(None, exclude=True) 
    submit_callable: str = "submit"
    map_callable: str = "map"
    shutdown_callable: str = "shutdown"

    # executor will not be explicitely serialized, but loaded using loader with class
    # and kwargs
    executor: Optional[ObjType]

    @root_validator(pre=True)
    def validate_all(cls, values):
        executor_type = cls.__fields__["executor"].type_ # introspect fields to get type

        # check if executor provided
        executor = values.get("executor")
        if executor is not None:
            values.pop("executor")
        
        # VALIDATE SUBMIT CALLABLE AGAINST EXECUTOR TYPE
        if "submit_callable" not in values:
            # use default
            submit_callable = cls.__fields__["submit_callable"].default
        else:
            submit_callable = values.pop("submit_callable")

        try:
            getattr(executor_type, submit_callable)
        except AttributeError:
            raise ValueError(
                "Executor type %s has no submit method %s.",
                executor_type.__name__,
                submit_callable,
            )

        # VALIDATE MAP CALLABLE AGAINST EXECUTOR TYPE
        if not values.get("map_callable"):
            # use default
            map_callable = cls.__fields__["map_callable"].default
        else:
            map_callable = values.pop("map_callable")

        try:
            getattr(executor_type, map_callable)
        except AttributeError:
            raise ValueError(
                "Executor type %s has no map method %s.",
                executor_type.__name__,
                map_callable,
            )

        # VALIDATE SHUTDOWN CALLABLE AGAINST EXECUTOR TYPE
        if not values.get("shutdown_callable"):
            # use default
            shutdown_callable = cls.__fields__["shutdown_callable"].default
        else:
            shutdown_callable = values.pop("shutdown_callable")

        try:
            getattr(executor_type, shutdown_callable)
        except AttributeError:
            raise ValueError(
                "Executor type %s has no shutdown method %s.",
                executor_type.__name__,
                shutdown_callable,
            )

        # Compose loader utility
        if values.get("loader") is not None:
            loader_values = values.get("loader")
            loader = ObjLoader[executor_type](**loader_values)

        else:
            # maintain reference to original object
            loader_values = copy.copy(values)

            # if executor in values, need to remove
            if "executor" in loader_values:
                loader_values.pop("executor")

            loader = ObjLoader[executor_type](**loader_values)

        # update encoders
        # update the class json encoders. Will only execute on initial type construction
        if executor_type not in cls.__config__.json_encoders:
            cls.__config__.json_encoders[
                executor_type
            ] = cls.__config__.json_encoders.pop(ObjType)

        return {
            "executor_type": executor_type,
            "submit_callable": submit_callable,
            "shutdown_callable": shutdown_callable,
            "map_callable": map_callable,
            "loader": loader,
            "executor": executor,
        }

    def shutdown(self) -> None:
        shutdown_fn = getattr(self.executor, self.shutdown_callable)
        shutdown_fn()

## Normal, ContextExecutor
Now, we subclass base to create two executors: `NormalExecutor`, and `ContextExecutor`. In the case that the user would like to create a persistent executor passed to the Evaluator, they would use the NormalExecutor. The ContextExecutor provides a context manager to dynamically create executor instances during execution.

In [34]:

# NormalExecutor with no context handling on submission and executor persistence
class NormalExecutor(
    BaseExecutor[ObjType],
    Generic[ObjType],
    arbitrary_types_allowed=True,
    json_encoders=JSON_ENCODERS,
):

    @validator("executor", always=True)
    def validate_executor(cls, v, values):

        if v is None:
            v = values["loader"].load()

        # if not None, validate against executor type
        else:
            if not isinstance(v, (values["executor_type"],)):
                raise ValueError(
                    "Provided executor is not instance of %s",
                    values["executor_type"].__name__,
                )

        return v

    def submit(self, fn, **kwargs) -> Future:
        submit_fn = getattr(self.executor, self.submit_callable)
        return submit_fn(fn, **kwargs)

    def map(self, fn, iter: Iterable) -> Iterable[Future]:
        map_fn = getattr(self.executor, self.map_callable)
        return map_fn(fn, *iter)

Create some NormalExecutors: (must manually shutdown)

In [35]:
# ThreadPool
tpe_exec = NormalExecutor[ThreadPoolExecutor](max_workers=1)
# submit
tpe_exec.submit(fn=test_function, x=1, y=8)

<Future at 0x113b87970 state=finished returned int>

In [38]:
# map
tpe_exec.map(test_function, ((1, 4), (3, 4)))

<generator object Executor.map.<locals>.result_iterator at 0x114923dd0>

In [39]:
tpe_exec.shutdown()

In [40]:
# Dask
from distributed import Client
from distributed.cfexecutor import ClientExecutor

# Using an existing executor
client = Client(silence_logs=logging.ERROR)
executor = client.get_executor()

dask_executor = NormalExecutor[type(executor)](executor=executor)
dask_executor.submit(fn=test_function, x=1, y=8)

2022-05-31 13:20:27,006 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/chrisonian/Code/GitHub/jackie-Xopt/dask-worker-space/worker-jg0jp28d', purging
2022-05-31 13:20:27,006 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/chrisonian/Code/GitHub/jackie-Xopt/dask-worker-space/worker-uzsv2_8d', purging
2022-05-31 13:20:27,006 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/chrisonian/Code/GitHub/jackie-Xopt/dask-worker-space/worker-3e_2qujq', purging
2022-05-31 13:20:27,006 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/chrisonian/Code/GitHub/jackie-Xopt/dask-worker-space/worker-1czlowi0', purging
2022-05-31 13:20:27,007 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/chrisonian/Code/GitHub/jackie-Xopt/dask-worker-space/worker-0ygv01z1', purging


<Future at 0x117a7b3d0 state=pending>

In [41]:
res = dask_executor.map(test_function, ((1, 4), (3, 4)))

In [42]:
for r in res:
    print(r)

4
8


In [43]:
dask_executor_json = dask_executor.json()
dask_executor_json

'{"loader": {"object": null, "loader": {"callable": "distributed.cfexecutor:ClientExecutor", "kwargs": {"client": "inspect:_empty"}}, "object_type": "distributed.cfexecutor:ClientExecutor"}, "submit_callable": "submit", "map_callable": "map", "shutdown_callable": "shutdown", "executor": "distributed.cfexecutor:ClientExecutor"}'

In [44]:
dask_executor.shutdown()

In [45]:
# this raises error because client not passed...
try:
    dask_executor_from_json = NormalExecutor[ClientExecutor].parse_raw(dask_executor_json)
except ValidationError:
    pass

Context managers handle shutdown for us:

In [49]:
# ContexExecutor with context handling on submission and no executor persistence
class ContextExecutor(
    BaseExecutor[ObjType],
    Generic[ObjType],
    arbitrary_types_allowed=True,
    json_encoders=JSON_ENCODERS,
):
    @contextlib.contextmanager
    def context(self):

        try:
            self.executor = self.loader.load()
            yield self.executor

        finally:
            self.shutdown()
            self.executor = None

    def submit(self, fn, **kwargs) -> Future:
        with self.context() as ctxt:
            submit_fn = getattr(ctxt, self.submit_callable)
            return submit_fn(fn, **kwargs)
        
    def map(self, fn, iter: Iterable) -> Iterable[Future]:
        with self.context() as ctxt:
            map_fn = getattr(ctxt, self.map_callable)
            return map_fn(fn, iter)


Create some ContextExecutors

In [50]:
# ThreadPoolExecutor
context_exec = ContextExecutor[ThreadPoolExecutor](max_workers=1)
context_exec.submit(fn=test_function, x=1, y=8)

<Future at 0x16a789e20 state=finished returned int>

In [51]:
context_exec.map(test_function, ((1, 4), (3, 4)))

<generator object Executor.map.<locals>.result_iterator at 0x16a7ff5f0>

In [52]:
context_exec_json = context_exec.json()
context_exec_json

'{"loader": {"object": null, "loader": {"callable": "concurrent.futures.thread:ThreadPoolExecutor", "kwargs": {"max_workers": 1, "thread_name_prefix": "", "initializer": null, "initargs": []}}, "object_type": "concurrent.futures.thread:ThreadPoolExecutor"}, "submit_callable": "submit", "map_callable": "map", "shutdown_callable": "shutdown", "executor": null}'

In [53]:
context_exec_from_json = ContextExecutor[ThreadPoolExecutor].parse_raw(
        context_exec_json
    )
context_exec_from_json.submit(fn=test_function, x=1, y=8)

<Future at 0x16a792af0 state=finished returned int>

In [54]:
context_exec_from_json.map(test_function, ((1, 4), (3, 4)))

<generator object Executor.map.<locals>.result_iterator at 0x16a58c6d0>

2022-05-31 13:22:11,326 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-05-31 13:22:11,326 - distributed.nanny - ERROR - Worker process died unexpectedly
Exception in thread Nanny stop queue watch:
Traceback (most recent call last):
  File "/Users/chrisonian/Code/mambaforge/envs/xopt-dev/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/Users/chrisonian/Code/mambaforge/envs/xopt-dev/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/chrisonian/Code/mambaforge/envs/xopt-dev/lib/python3.9/site-packages/distributed/nanny.py", line 860, in watch_stop_q
    child_stop_q.close()
  File "/Users/chrisonian/Code/mambaforge/envs/xopt-dev/lib/python3.9/multiprocessing/queues.py", line 143, in close
    self._reader.close()
  File "/Users/chrisonian/Code/mambaforge/envs/xopt-dev/lib/python3.9/multiprocessing/connection.py", line 182, in close
    self._close()
  File "/Users/chrisonian/Code

Some executors are generated with Clients that manage sessions:
** will require gathering results before shutdown...