# Pydantic-based executors
The [PEP-3184](https://peps.python.org/pep-3148/) executor standard allows us to create an interface for executor objects and provide intelligent context for their execution. Pydantic validators allow the dynamic validation of executor initialization and execution based on signature inspection.

Before you start, make sure you're using Pydantic >= 1.9.0. 1.8 has all sorts of bugs with json encoder propagation. 

In [1]:
# imports
import contextlib
import copy
import json
import pickle
import inspect
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Tuple
from types import FunctionType, MethodType
from pydantic import BaseModel, Field, root_validator, validate_arguments, validator, ValidationError, Extra
from pydantic.generics import GenericModel



from xopt.pydantic import validate_and_compose_signature


logger = logging.getLogger("__name__")

# Print code
from IPython.display import display, Markdown
def sdisplay(obj):
    spec = inspect.getsource(obj)
    display(Markdown(f"```python \n {spec} \n ```"))


## GENERICS

Because the executor classes take many forms, we'll be making use of Pydantic's generic class composition for executor type interpolation. We are able to do this by creating a placeholder TypeVar. Here, this is names ObjType, because the executor classes make use of a generalizable loading approach that could be extented to objects generally.

In [2]:
ObjType = TypeVar("ObjType")

## JSON Encoders

Pydantic does not propogate JSON encoders to child classes, so we'll define a set of common encoders:

In [3]:
JSON_ENCODERS = {
    # function/method type distinguished for class members and not recognized as callables
    FunctionType: lambda x: f"{x.__module__}.{x.__qualname__}",
    MethodType: lambda x: f"{x.__module__}.{x.__qualname__}",
    Callable: lambda x: f"{x.__module__}.{type(x).__qualname__}",
    type: lambda x: f"{x.__module__}.{x.__name__}",
    # for encoding instances of the ObjType}
    ObjType: lambda x: f"{x.__module__}.{x.__class__.__qualname__}",
}

## Utility functions for validating signatures and getting callables from strings

Central to generalizablity between executors is the ability to validate signatures args/kwargs against the executor class. 

In [4]:
from xopt.pydantic import get_callable_from_string

#sdisplay(get_callable_from_string)

In [5]:
def test_fn(x, y=4, *args, m, **kwargs):
    return x

validate_and_compose_signature(test_fn, y=5, x=2, hi=4)

Kwargs_test_fn(args=[], m=<class 'inspect._empty'>, kwarg_order=['x', 'y', 'm'], x=2, y=5)

## Representing callables as Pydantic models
Representing callables as pydantic models allows us to take advantage of both pydantic serialization to json and pydantic's validation hooks for the kwarg validation upon creation, with possibility of delaying load. Here `CallableModel`, we can provide initialization kwargs for a to-be-instantiated-later object and reap the benefit of additional kwarg validation. 

In [6]:
from xopt.pydantic import CallableModel

Let's test the callables on example function and class:

In [7]:
def test_function(x: int, y: int = 5):
    return x + y


class TestClass:
    def __init__(self, x, y):
        self.x = x
        self.y = y

In [8]:
fn = CallableModel(callable=test_function, kwargs={"x":1, "y":3})

fn.signature

Kwargs_test_function(args=[], kwarg_order=['x', 'y'], x=1, y=3)

In [9]:
fn = CallableModel(callable=test_function, args=(1,3,))

fn.signature

Kwargs_test_function(args=[], kwarg_order=['x', 'y'], x=1, y=3)

In [10]:
fn = CallableModel(callable=test_function, args=(1,), kwargs={"y":3})

fn.signature

Kwargs_test_function(args=[], kwarg_order=['x', 'y'], x=1, y=3)

Schema shows us the generated model:

In [11]:
fn.signature.schema()

{'title': 'Kwargs_test_function',
 'type': 'object',
 'properties': {'args': {'title': 'Args',
   'default': [],
   'type': 'array',
   'items': {}},
  'kwarg_order': {'title': 'Kwarg Order',
   'default': ['x', 'y'],
   'type': 'array',
   'items': {}},
  'x': {'title': 'X', 'default': 1, 'type': 'integer'},
  'y': {'title': 'Y', 'default': 3, 'type': 'integer'}}}

In [12]:
# dict rep
fn_dict = fn.dict()
fn_dict

{'callable': <function __main__.test_function(x: int, y: int = 5)>,
 'signature': {'args': [], 'x': 1, 'y': 3}}

In [13]:
# load from dict
fn_from_dict = CallableModel(**fn.dict()) 
fn_from_dict()

4

In [14]:
# json representation
fn.json() 

'{"callable": "__main__.test_function", "signature": {"args": [], "x": 1, "y": 3}}'

In [15]:
# callable from json
fn_from_json = CallableModel.parse_raw(fn.json())
fn_from_json()

4

# With Classes

In [16]:
# Class kwargs passed after
parameterized_class = CallableModel(callable=TestClass, kwargs={"x":1, "y":3})
test_class_obj = parameterized_class()
assert isinstance(test_class_obj, (TestClass,))

In [17]:
# dict rep
parameterized_class_dict = parameterized_class.dict()
parameterized_class_dict

{'callable': __main__.TestClass, 'signature': {'args': [], 'x': 1, 'y': 3}}

In [18]:
# from dict
parameterized_class_from_dict = CallableModel(**parameterized_class_dict)
parameterized_class_from_dict

CallableModel(callable=<class '__main__.TestClass'>, signature=Kwargs_TestClass(args=[], kwarg_order=['x', 'y'], x=1, y=3))

In [19]:
parameterized_class_from_dict_obj = parameterized_class_from_dict()
assert isinstance(parameterized_class_from_dict_obj, (TestClass,))

In [20]:
#json 
parameterized_class_json = parameterized_class.json()
parameterized_class_json

'{"callable": "__main__.TestClass", "signature": {"args": [], "x": 1, "y": 3}}'

In [21]:
parameterized_class_from_json = CallableModel.parse_raw(parameterized_class_json)
test_class_obj = parameterized_class_from_json()
assert isinstance(test_class_obj, (TestClass,))

We can use the callables to construct a dynamic object loader. The generic type allows us to use this same method for any executor. The syntax: `ObjLoader[ThreadPoolExecutor]` composes a new class entirely, this one specific to the `ThreadPoolExecutor`. 

In [22]:
from xopt.pydantic import ObjLoader

Let's test object loader on our `TestClass`:

In [23]:
# create type
TestClassLoader = ObjLoader[TestClass]

obj_loader = TestClassLoader(kwargs={"x":1, "y":3})
loaded = obj_loader.load()
loaded

<__main__.TestClass at 0x7fe957c80dc0>

Can do this for a generic object like `ThreadPoolExecutor`:

In [24]:
# create Type
TPELoader = ObjLoader[ThreadPoolExecutor]

tpe_loader = TPELoader(kwargs={"max_workers":1})
tpe = tpe_loader.load()
tpe
tpe_loader_json  = tpe_loader.json()
tpe_loader_json
tpe_loader_from_json = TPELoader.parse_raw(tpe_loader_json)


# shutdown tpe
tpe.shutdown()


## Executors
The previous classes were an attempt to demonstrate generic utility. The Executors to follow will build off of those common utilities to parameterize generic executors complying with the pep-3148 standard (the callables have been typified in case of deviation). Likewise, the following BaseExecutor outlines common executor fields and methods.

In [25]:
from xopt.pydantic import BaseExecutor, NormalExecutor

## Normal, ContextExecutor
Now, we subclass base to create two executors: `NormalExecutor`, and `ContextExecutor`. In the case that the user would like to create a persistent executor passed to the Evaluator, they would use the NormalExecutor. The ContextExecutor provides a context manager to dynamically create executor instances during execution.

Create some NormalExecutors: (must manually shutdown)

In [26]:
# ThreadPool
# create type
NormTPExecutor = NormalExecutor[ThreadPoolExecutor]

tpe_exec = NormTPExecutor(kwargs={"max_workers":1})
# submit
tpe_exec.submit(fn=test_function, x=1, y=8)

<Future at 0x7fe957c4af40 state=finished returned int>

In [27]:
# map
tpe_exec.map(test_function, ((1, 4), (3, 4)))

<generator object Executor.map.<locals>.result_iterator at 0x7fe957bf10b0>

In [28]:
tpe_exec.shutdown()

In [29]:
# Dask
from distributed import Client
from distributed.cfexecutor import ClientExecutor

# Using an existing executor
client = Client(silence_logs=logging.ERROR)
executor = client.get_executor()

# create type
NormalDaskExecutor =  NormalExecutor[type(executor)]

dask_executor = NormalDaskExecutor(executor=executor)
dask_executor.submit(fn=test_function, x=1, y=8)

<Future at 0x7fe9561b4e20 state=pending>

In [30]:
dask_executor_json = dask_executor.json()
dask_executor_json

'{"loader": {"object": null, "loader": {"callable": "distributed.cfexecutor.ClientExecutor", "signature": {"args": [], "client": "inspect._empty"}}, "object_type": "distributed.cfexecutor.ClientExecutor"}, "submit_callable": "submit", "map_callable": "map", "shutdown_callable": "shutdown", "executor": "distributed.cfexecutor.ClientExecutor"}'

In [31]:
dask_executor.shutdown()

In [32]:
# this raises error because client not passed...
# dask_executor_from_json = NormalDaskExecutor.parse_raw(dask_executor_json)

Context managers handle shutdown for us:

In [33]:
# ContexExecutor with context handling on submission and no executor persistence
class ContextExecutor(
    BaseExecutor[ObjType],
    Generic[ObjType],
    arbitrary_types_allowed=True,
    json_encoders=JSON_ENCODERS,
):
    @contextlib.contextmanager
    def context(self):

        try:
            self.executor = self.loader.load()
            yield self.executor

        finally:
            self.shutdown()
            self.executor = None

    def submit(self, fn, **kwargs) -> Future:
        with self.context() as ctxt:
            submit_fn = getattr(ctxt, self.submit_callable)
            return submit_fn(fn, **kwargs)
        
    def map(self, fn, iter: Iterable) -> Iterable[Future]:
        with self.context() as ctxt:
            map_fn = getattr(ctxt, self.map_callable)
            return map_fn(fn, iter)


Create some ContextExecutors

In [34]:
# ThreadPoolExecutor
# create type

ContextTPExecutor = ContextExecutor[ThreadPoolExecutor]

context_exec = ContextTPExecutor(kwargs={"max_workers":1})
context_exec.submit(fn=test_function, x=1, y=8)

<Future at 0x7fe9541cc1f0 state=finished returned int>

In [35]:
context_exec.map(test_function, ((1, 4), (3, 4)))

<generator object Executor.map.<locals>.result_iterator at 0x7fe9541bd4a0>

In [36]:
context_exec_json = context_exec.json()
context_exec_json

'{"loader": {"object": null, "loader": {"callable": "concurrent.futures.thread.ThreadPoolExecutor", "signature": {"args": [], "initializer": null, "initargs": null, "max_workers": 1, "thread_name_prefix": ""}}, "object_type": "concurrent.futures.thread.ThreadPoolExecutor"}, "submit_callable": "submit", "map_callable": "map", "shutdown_callable": "shutdown", "executor": null}'

In [37]:
context_exec_from_json = ContextTPExecutor.parse_raw(
        context_exec_json
    )
context_exec_from_json.submit(fn=test_function, x=1, y=8)

<Future at 0x7fe9541cc9d0 state=finished returned int>

In [38]:
context_exec_from_json.map(test_function, ((1, 4), (3, 4)))

<generator object Executor.map.<locals>.result_iterator at 0x7fe9541bd890>

Some executors are generated with Clients that manage sessions:
** will require gathering results before shutdown...

In [39]:
import yaml

def evaluate(inputs, y=5, z=None):
    return {'result': inputs['x'] + y }

fn = CallableModel(callable=evaluate, kwargs={"y":100})
fn_json = fn.json(exclude_none=True)


print(yaml.dump(yaml.safe_load(fn_json)))

callable: __main__.evaluate
signature:
  args: []
  inputs: inspect._empty
  y: 100



In [40]:
fn_from_json = CallableModel.parse_raw(fn_json)
fn_from_json

CallableModel(callable=<function evaluate at 0x7fe954214ca0>, signature=Kwargs_evaluate(args=[], z=None, kwarg_order=['inputs', 'y', 'z'], inputs='inspect._empty', y=100))

In [41]:
fn_from_json({"x":5}, z=2)

{'result': 105}