Skip to content

Commit

Permalink
Updated docstrings and comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sunil Acharya (sunil.acharya@intel.com) <sunil.acharya@intel.com>
  • Loading branch information
sun1lach committed Dec 10, 2022
1 parent 78f7c6a commit 1d61ff5
Showing 1 changed file with 76 additions and 2 deletions.
78 changes: 76 additions & 2 deletions openfl/utilities/fed_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,25 @@


class CustomThread(Thread):
'''
The CustomThread object implements `threading.Thread` class.
Allows extensibility and stores the returned result from threaded execution.
Attributes:
target (function): decorated function
name (str): Name of the decorated function
*args (tuple): Arguments passed as a parameter to decorated function.
**kwargs (dict): Keyword arguments passed as a parameter to decorated function.
'''
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
Thread.__init__(self, group, target, name, args, kwargs)
self._result = None

def run(self):
'''
`run()` Invoked by `thread.start()`
'''
if self._target is not None:
self._result = self._target(*self._args, **self._kwargs)

Expand All @@ -24,7 +38,17 @@ def result(self):


class PrepareTask():

'''
`PrepareTask` class stores the decorated function metadata and instantiates
either the `asyncio` or `thread` tasks to handle asynchronous
and synchronous execution of the decorated function respectively.
Attributes:
target (function): decorated function
timeout (int): Timeout duration in second(s).
*args (tuple): Arguments passed as a parameter to decorated function.
**kwargs (dict): Keyword arguments passed as a parameter to decorated function.
'''
def __init__(self, target_fn, timeout, args, kwargs) -> None:
self._target_fn = target_fn
self._fn_name = target_fn.__name__
Expand All @@ -33,7 +57,16 @@ def __init__(self, target_fn, timeout, args, kwargs) -> None:
self._kwargs = kwargs

async def async_execute(self):
'''Handles asynchronous execution of the
decorated function referenced by `self._target_fn`.
Raises:
asyncio.TimeoutError: If the async execution exceeds permitted time limit.
Exception: Captures generic exceptions.
Returns:
Any: The returned value from `task.results()` depends on the decorated function.
'''
task = asyncio.create_task(
name=self._fn_name,
coro=self._target_fn(*self._args, **self._kwargs)
Expand All @@ -42,48 +75,81 @@ async def async_execute(self):
try:
await asyncio.wait_for(task, timeout=self._max_timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Timeout after {self._max_timeout} second(s), Exception method: ({self._fn_name})") # noqa
raise asyncio.TimeoutError(f"Timeout after {self._max_timeout} second(s), Exception method: ({self._fn_name})") # noqa
except Exception:
raise Exception(f"Generic Exception: {self._fn_name}")

return task.result()

def sync_execute(self):
'''Handles synchronous execution of the
decorated function referenced by `self._target_fn`.
Raises:
TimeoutError: If the synchronous execution exceeds permitted time limit.
Returns:
Any: The returned value from `task.results()` depends on the decorated function.
'''
task = CustomThread(target=self._target_fn,
name=self._fn_name,
args=self._args,
kwargs=self._kwargs)
task.start()
# Execution continues if the decorated function completes within the timelimit.
# If the execution exceeds time limit then
# the spawned thread is force joined to current/main thread.
task.join(self._max_timeout)

# If the control is back to current/main thread
# and the spawned thread is still alive then timeout and raise exception.
if task.is_alive():
raise TimeoutError(f"Timeout after {self._max_timeout} second(s), Exception method: ({self._fn_name})") # noqa

return task.result()


class SyncAsyncTaskDecoFactory:
'''
`Sync` and `Async` Task decorator factory allows creation of
concrete implementation of `wrapper` interface and `contextmanager` to
setup a common functionality/resources shared by `async_wrapper` and `sync_wrapper`.
'''

@contextmanager
def wrapper(self, func, *args, **kwargs):
yield

def __call__(self, func):
'''
Call to `@fedtiming()` executes `__call__()` method
delegated from the derived class `fedtiming` implementing `SyncAsyncTaskDecoFactory`.
'''

# Closures
self.is_coroutine = asyncio.iscoroutinefunction(func)
str_fmt = "{} Method ({}); Co-routine {}"

@wraps(func)
def sync_wrapper(*args, **kwargs):
'''
Wrapper for synchronous execution of decorated function.
'''
logger.debug(str_fmt.format("sync", func.__name__, self.is_coroutine))
with self.wrapper(func, *args, **kwargs):
return self.task.sync_execute()

@wraps(func)
async def async_wrapper(*args, **kwargs):
'''
Wrapper for asynchronous execution of decorated function.
'''
logger.debug(str_fmt.format("async", func.__name__, self.is_coroutine))
with self.wrapper(func, *args, **kwargs):
return await self.task.async_execute()

# Identify if the decorated function is `async` or `sync` and return appropriate wrapper.
if self.is_coroutine:
return async_wrapper
return sync_wrapper
Expand All @@ -95,6 +161,14 @@ def __init__(self, timeout):

@contextmanager
def wrapper(self, func, *args, **kwargs):
'''
Concrete implementation of setup and teardown logic, yields the control back to
`async_wrapper` or `sync_wrapper` function call.
Raises:
Exception: Captures the exception raised by `async_wrapper`
or `sync_wrapper` and terminates the execution.
'''
self.task = PrepareTask(
target_fn=func,
timeout=self.timeout,
Expand Down

0 comments on commit 1d61ff5

Please sign in to comment.