Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Timing Fed Components #530

Merged
merged 8 commits into from
Dec 14, 2022
Merged
Binary file added docs/images/timeout_design.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
131 changes: 131 additions & 0 deletions docs/source/utilities/timeouts.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
.. # Copyright (C) 2020-2022 Intel Corporation
.. # SPDX-License-Identifier: Apache-2.0

*******************************************************
|productName| Component Timeouts
*******************************************************

.. _comp_timeout_overview:

Overview
========

This feature allows decorating any arbitrary synchronous and/or asynchronous functions using :code:`@fedtiming(timeout=<seconds>)`.
The decorated functions is then monitored and gets terminated right after the execution time exceeds the user specified or default timeout value.

`openfl.utilities.fed_timer.py`

.. note::

The `fedtiming` class, `SyncAsyncTaskDecoFactory` factory class, custom synchronous and asynchronous execution of decorated function is in-place. The end to end implementation of |productName| Component timeouts feature is still in beta mode and would undergo design and implementation changes before the complete feature is made available. Appreciate any feedbacks or issues.


.. _comp_timeout_design:

Class Diagram
===========================

An overview of this workflow is shown below.

.. figure:: /images/timeout_design.png

.. class:: center
Overview of the component timeout class diagram



.. _comp_timeout_flow_of_execution:

Flow of execution
===================

#. [Step A] Decorate any sync or async function :code:`@fedtiming(timeout=<seconds>)` to monitor its execution time and terminate after `timeout=<seconds>` value.


.. code-block:: console

@fedtiming(timeout=5)
def some_sync_function():
pass

| This decorated function execution gets terminated after `5 seconds`.

.. code-block:: console

@fedtiming(timeout=10)
async def some_async_function():
await some_long_running_operation()

| This decorated function execution gets terminated after `10 seconds`.

#. [Step B] Concrete `fedtiming` class:

**During Compile time:** Decorated functions are evaluated like below.

**Synchronous Example:**

.. code-block:: console

some_sync_function = fedtiming(timeout=5)(some_sync_function)

then

some_sync_function() *is equivalent to* sync_wrapper().

inside the sync_wrapper: the decorated function `some_sync_function` and `timeout` variables are stored as a closure variable.

**Aynchronous Example:**

.. code-block:: console

some_async_function = fedtiming(timeout=5)(some_async_function)

then

some_async_function() *is equivalent to* async_wrapper().

inside the async_wrapper: the decorated function `some_async_function` and `timeout` variables are stored as a closure variable.


#. [Step C] `SyncAsyncTaskDecoFactory` class

`fedtiming(some_sync_function)` internally calls the parent class `SyncAsyncTaskDecoFactory` :code:`__call__(some_sync_function)` method.

The :code:`__call__()` method immediately returns either the `sync_wrapper` or `async_wrapper` depending on whether the sync or async method was decorated.


**During Runtime:**

The prepared `some_sync_function` or `some_async_function` when called internally with its respective parameters.

.. code-block:: console

some_sync_function(*args, **kwargs) -> sync_wrapper(*args, **kwargs)
some_async_function(*args, **kwargs) -> async_wrapper(*args, **kwargs)


#. [Step D] `PrepareTask` class

Delegates the decorated sync or async function to be executed synchronously or asynchronously using `CustomThread` or `asyncio`.

Contains the defination for the function `sync_execute` and `async_execute`.

#. [Step E] Execution of delegated methods:

The delegated function is executed synchronously or asynchronously and the result is returned back in the call chain.
The final output from the `thread` or `asyncio` task is returned as a result of a decorated function execution.

In this `CustomThread` or `asyncio.wait_for()` execution, the timeout is enforced which terminates the running function after a set period of time and an exception is called that tracebacks to the caller.

.. _comp_timeout_upcoming_feature:

Upcoming Changes
===================

**Above design reflects current implementation.**

Upcoming changes include:

1. Dynamic timeout parameters updates for all decorated functions during runtime. Removal of `timeout` parameter `@fedtiming(timeout=<?>)`.

2. Add a callback parameter that defines a post timeout teardown logic and a way gracefully terminate executing function.
6 changes: 5 additions & 1 deletion docs/source/utilities/utilities.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ The following are utilities available in Open Federated Learning (|productName|)
:doc:`splitters_data`
Split your data to run your federation from a single dataset.

:doc:`timeouts`
Decorate methods to enforce timeout on it's execution.

.. toctree::
:maxdepth: 1
:hidden:

pki
splitters_data
splitters_data
timeouts
191 changes: 191 additions & 0 deletions openfl/utilities/fed_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Copyright (C) 2020-2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Components Timeout Configuration Module"""

import asyncio
import logging
import os
import time

from contextlib import contextmanager
from functools import wraps
from threading import Thread

logger = logging.getLogger(__name__)


class CustomThread(Thread):
'''
The CustomThread object implements `threading.Thread` class.
Allows extensibility and stores the returned result from threaded execution.

Attributes:
target (function): decorated function
name (str): Name of the decorated function
*args (tuple): Arguments passed as a parameter to decorated function.
**kwargs (dict): Keyword arguments passed as a parameter to decorated function.

'''
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
Thread.__init__(self, group, target, name, args, kwargs)
aleksandr-mokrov marked this conversation as resolved.
Show resolved Hide resolved
self._result = None

def run(self):
'''
`run()` Invoked by `thread.start()`
'''
if self._target is not None:
self._result = self._target(*self._args, **self._kwargs)

def result(self):
return self._result


class PrepareTask():
'''
`PrepareTask` class stores the decorated function metadata and instantiates
either the `asyncio` or `thread` tasks to handle asynchronous
and synchronous execution of the decorated function respectively.

Attributes:
target (function): decorated function
timeout (int): Timeout duration in second(s).
*args (tuple): Arguments passed as a parameter to decorated function.
**kwargs (dict): Keyword arguments passed as a parameter to decorated function.
'''
def __init__(self, target_fn, timeout, args, kwargs) -> None:
self._target_fn = target_fn
self._fn_name = target_fn.__name__
self._max_timeout = timeout
self._args = args
self._kwargs = kwargs

async def async_execute(self):
'''Handles asynchronous execution of the
decorated function referenced by `self._target_fn`.

Raises:
asyncio.TimeoutError: If the async execution exceeds permitted time limit.
Exception: Captures generic exceptions.

Returns:
Any: The returned value from `task.results()` depends on the decorated function.
'''
task = asyncio.create_task(
name=self._fn_name,
coro=self._target_fn(*self._args, **self._kwargs)
)

try:
await asyncio.wait_for(task, timeout=self._max_timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Timeout after {self._max_timeout} second(s), "
f"Exception method: ({self._fn_name})")
except Exception:
raise Exception(f"Generic Exception: {self._fn_name}")

return task.result()

def sync_execute(self):
'''Handles synchronous execution of the
decorated function referenced by `self._target_fn`.

Raises:
TimeoutError: If the synchronous execution exceeds permitted time limit.

Returns:
Any: The returned value from `task.results()` depends on the decorated function.
'''
task = CustomThread(target=self._target_fn,
name=self._fn_name,
args=self._args,
kwargs=self._kwargs)
task.start()
# Execution continues if the decorated function completes within the timelimit.
# If the execution exceeds time limit then
# the spawned thread is force joined to current/main thread.
task.join(self._max_timeout)

# If the control is back to current/main thread
# and the spawned thread is still alive then timeout and raise exception.
if task.is_alive():
raise TimeoutError(f"Timeout after {self._max_timeout} second(s), "
f"Exception method: ({self._fn_name})")

return task.result()


class SyncAsyncTaskDecoFactory:
'''
`Sync` and `Async` Task decorator factory allows creation of
concrete implementation of `wrapper` interface and `contextmanager` to
setup a common functionality/resources shared by `async_wrapper` and `sync_wrapper`.

'''

@contextmanager
def wrapper(self, func, *args, **kwargs):
yield

def __call__(self, func):
'''
Call to `@fedtiming()` executes `__call__()` method
delegated from the derived class `fedtiming` implementing `SyncAsyncTaskDecoFactory`.
'''

# Closures
self.is_coroutine = asyncio.iscoroutinefunction(func)
str_fmt = "{} Method ({}); Co-routine {}"

@wraps(func)
def sync_wrapper(*args, **kwargs):
'''
Wrapper for synchronous execution of decorated function.
'''
logger.debug(str_fmt.format("sync", func.__name__, self.is_coroutine))
with self.wrapper(func, *args, **kwargs):
return self.task.sync_execute()

@wraps(func)
async def async_wrapper(*args, **kwargs):
'''
Wrapper for asynchronous execution of decorated function.
'''
logger.debug(str_fmt.format("async", func.__name__, self.is_coroutine))
with self.wrapper(func, *args, **kwargs):
return await self.task.async_execute()

# Identify if the decorated function is `async` or `sync` and return appropriate wrapper.
if self.is_coroutine:
return async_wrapper
return sync_wrapper


class fedtiming(SyncAsyncTaskDecoFactory):
def __init__(self, timeout):
self.timeout = timeout

@contextmanager
def wrapper(self, func, *args, **kwargs):
'''
Concrete implementation of setup and teardown logic, yields the control back to
`async_wrapper` or `sync_wrapper` function call.

Raises:
Exception: Captures the exception raised by `async_wrapper`
or `sync_wrapper` and terminates the execution.
'''
self.task = PrepareTask(
target_fn=func,
timeout=self.timeout,
args=args,
kwargs=kwargs
)
try:
start = time.perf_counter()
yield
logger.info(f"({self.task._fn_name}) Elapsed Time: {time.perf_counter() - start}")
except Exception as e:
logger.exception(f"An exception of type {type(e).__name__} occurred. "
f"Arguments:\n{e.args[0]!r}")
os._exit(status=os.EX_TEMPFAIL)