Skip to content

Commit

Permalink
fix(lib): implement concurrent executor task
Browse files Browse the repository at this point in the history
Add a proper implementation for the `app.lib.concurrent.ConcurrentExecutor` task. This will allow the app to run multiple tasks concurrently.
  • Loading branch information
kennedykori committed Sep 21, 2022
1 parent 5da2221 commit bdebffb
Show file tree
Hide file tree
Showing 7 changed files with 428 additions and 60 deletions.
3 changes: 2 additions & 1 deletion app/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
TransportClosedError,
TransportError,
)
from .mixins import InitFromMapping, ToMapping, ToTask
from .mixins import Disposable, InitFromMapping, ToMapping, ToTask
from .task import Task
from .transport import Transport, TransportOptions

__all__ = [
"AbstractDomainObject",
"DataSource",
"DataSourceType",
"Disposable",
"ExtractMetadata",
"IDRClientException",
"IdentifiableDomainObject",
Expand Down
3 changes: 2 additions & 1 deletion app/lib/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .common import Chainable, Consumer, Pipeline
from .concurrent import ConcurrentExecutor
from .concurrent import ConcurrentExecutor, completed_successfully
from .pandas import ChunkDataFrame
from .sql import SimpleSQLSelect, SQLTask

Expand All @@ -11,4 +11,5 @@
"Pipeline",
"SQLTask",
"SimpleSQLSelect",
"completed_successfully",
]
170 changes: 146 additions & 24 deletions app/lib/tasks/concurrent.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import sys
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from functools import reduce
from logging import getLogger
from typing import (
Any,
Callable,
Generic,
List,
MutableSequence,
Optional,
Sequence,
TypeVar,
cast,
)

from app.core import Task
from app.core import Disposable, IDRClientException, Task

# =============================================================================
# TYPES
Expand All @@ -21,46 +22,167 @@

_RT = TypeVar("_RT")

if sys.version_info >= (3, 9): # pragma: no branch
Accumulator = Callable[
[MutableSequence[Future[_RT]], Future[_RT]],
MutableSequence[Future[_RT]],
]
else: # pragma: no branch
Accumulator = Callable[
[MutableSequence[Future], Future], MutableSequence[Future]
]


# =============================================================================
# CONSTANTS
# =============================================================================


_LOGGER = getLogger(__name__)


# =============================================================================
# HELPERS
# =============================================================================


def completed_successfully(future: Future) -> bool:
"""
Checks if a :class:`future <Future>` completed successfully and returns
``True`` if so and ``False`` otherwise. In this context a *future* is
considered to have completed successfully if it wasn't cancelled and no
exception was raised on it's callee.
:param future: A future instance to check for successful completion.
:return: ``True` if the future completed successfully, ``False`` otherwise.
"""
return bool(
future.done() and not future.cancelled() and future.exception() is None
)


class ConcurrentExecutorDisposedError(IDRClientException):
"""
An exception indicating that an erroneous usage of a disposed
:class:`concurrent executor <ConcurrentExecutor>` was made.
"""

def __init__(
self, message: Optional[str] = "ConcurrentExecutor disposed."
):
super().__init__(message=message)


# =============================================================================
# TASK
# =============================================================================


class ConcurrentExecutor(Generic[_IN, _RT], Task[_IN, _RT]):
class ConcurrentExecutor(
Generic[_IN, _RT], Task[_IN, MutableSequence["Future[_RT]"]], Disposable
):
"""
A :class:`task <Task>` that takes multiple tasks with a common input and
executes them concurrently. The output of the task is a ``MutableSequence``
of :class:`futures <Future>` representing the execution of each of those
tasks.
.. note::
By default, this task uses a :class:`ThreadPoolExecutor` to run the
tasks concurrently which is more suitable for `I/O-bound` tasks but
poorly suited for computation intensive tasks. To work with the later
kind of tasks, a :class:`ProcessPoolExecutor` is recommended. This
can be passed to the constructor of this class when initializing it
using the `executor` parameter.
For more details, see the official python
`threading docs <threading_docs>`_.
.. _threading_docs: https://docs.python.org/3/library/threading.html
"""

def __init__(
self,
*tasks: Task[Any, Any],
accumulator: Optional[Callable[[_RT, Any], _RT]] = None,
initial_value: _RT = cast(_RT, list()),
*tasks: Task[_IN, _RT],
accumulator: Optional[Accumulator] = None,
initial_value: Optional[MutableSequence["Future[_RT]"]] = None,
executor: Optional[Executor] = None,
):
self._tasks: Sequence[Task[Any, Any]] = tuple(tasks) # TODO: queue??
self._accumulator: Callable[[_RT, Any], _RT] = (
"""
Initialize a new `ConcurrentExecutor` instance with the given
arguments.
:param tasks: The tasks to be executed concurrently.
:param accumulator: An optional callable to collect the results
(futures) of executing the given tasks concurrently. If one is not
provided, then a default is used that simply appends each result to
a ``MutableSequence``.
:param initial_value: An optional ``MutableSequence`` instance to house
the results(futures) of executing the tasks concurrently. Defaults
to an empty list when one isn't given.
:param executor: The :class:`executor <concurrent.futures.Executor>`
instance to use when executing the tasks. A ``ThreadPoolExecutor``
is used by default when one isn't provided.
"""
self._tasks: Sequence[Task[_IN, _RT]] = tuple(tasks)
self._accumulator: Accumulator = (
accumulator or self._default_accumulator
)
self._initial_value: _RT = initial_value
self._initial_value: MutableSequence["Future[_RT]"]
self._initial_value = initial_value or list()
self._executor: Executor = executor or ThreadPoolExecutor()
self._is_disposed: bool = False

def __enter__(self) -> "ConcurrentExecutor":
self._ensure_not_disposed()
return self

@property
def is_disposed(self) -> bool:
return self._is_disposed

@property
def tasks(self) -> Sequence[Task[Any, Any]]:
def tasks(self) -> Sequence[Task[_IN, _RT]]:
return self._tasks

def execute(self, an_input: _IN) -> _RT:
# TODO: Add a proper implementation that executes the tasks
# concurrently.
def dispose(self) -> None:
self._executor.shutdown(wait=True)
self._is_disposed = True

def execute(self, an_input: _IN) -> MutableSequence["Future[_RT]"]:
self._ensure_not_disposed()
return reduce(
lambda _partial, _tsk: self._accumulator(
_partial, _tsk.execute(an_input)
_partial,
self._executor.submit(self._do_execute_task, _tsk, an_input),
),
self.tasks,
self._initial_value,
)

def _ensure_not_disposed(self) -> None:
if self._is_disposed:
raise ConcurrentExecutorDisposedError()

@staticmethod
def _default_accumulator(partial_result: _RT, task_output: Any) -> _RT:
_partial: List[Any] = list(
# An assumption is made here that the default initial value will
# always be a sequence of some kind.
cast(Sequence, partial_result)
)
_partial.append(task_output)
return cast(_RT, _partial)
def _default_accumulator(
partial_results: MutableSequence["Future[_RT]"],
task_output: "Future[_RT]",
) -> MutableSequence["Future[_RT]"]:
partial_results.append(task_output)
return partial_results

@staticmethod
def _do_execute_task(task: Task[_IN, _RT], an_input: _IN) -> _RT:
try:
result: _RT = task.execute(an_input)
except Exception as exp:
_LOGGER.error(
'Error while executing task of type="%s.%s".',
task.__module__,
task.__class__.__name__,
exc_info=exp,
)
raise exp
return result
57 changes: 40 additions & 17 deletions app/use_cases/fetch_metadata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from concurrent.futures import Future, as_completed
from itertools import chain
from logging import getLogger
from typing import Iterable, Mapping, Sequence
Expand All @@ -9,7 +10,7 @@
Task,
Transport,
)
from app.lib import ConcurrentExecutor
from app.lib import ConcurrentExecutor, completed_successfully

# =============================================================================
# CONSTANTS
Expand All @@ -24,7 +25,7 @@
# =============================================================================


class DoFetchDataSources(Task[DataSourceType, Sequence[DataSource]]):
class DoFetchDataSources(Task[Transport, Sequence[DataSource]]):
"""Fetches all the data sources of a given data source type."""

def __init__(self, data_source_type: DataSourceType):
Expand All @@ -45,7 +46,7 @@ def execute(self, an_input: Transport) -> Sequence[DataSource]:
return data_sources


class DoFetchExtractMetadata(Task[DataSource, Sequence[ExtractMetadata]]):
class DoFetchExtractMetadata(Task[Transport, Sequence[ExtractMetadata]]):
"""Fetch all the extract metadata of a given data source."""

def __init__(self, data_source: DataSource):
Expand Down Expand Up @@ -86,14 +87,26 @@ def execute(
self, an_input: Sequence[DataSourceType]
) -> Sequence[DataSource]:
_LOGGER.info("Fetching data sources.")
executor: ConcurrentExecutor[
Transport, Sequence[Sequence[DataSource]]
] = ConcurrentExecutor(
*self._data_source_types_to_tasks(an_input), initial_value=list()
executor: ConcurrentExecutor[Transport, Sequence[DataSource]]
executor = ConcurrentExecutor(
*self._data_source_types_to_tasks(an_input)
)
with executor:
futures: Sequence[Future[Sequence[DataSource]]]
futures = executor(self._transport) # noqa
# Focus on completed tasks and ignore the ones that failed.
completed_futures = as_completed(futures)
return tuple(
chain.from_iterable(
map(
lambda _f: _f.result(),
filter(
lambda _f: completed_successfully(_f),
completed_futures,
),
)
)
)
data_sources: Sequence[Sequence[DataSource]]
data_sources = executor(self._transport) # noqa
return tuple(chain.from_iterable(data_sources))

@staticmethod
def _data_source_types_to_tasks(
Expand Down Expand Up @@ -121,14 +134,24 @@ def execute(
self, an_input: Sequence[DataSource]
) -> Sequence[ExtractMetadata]:
_LOGGER.info("Fetching extract metadata.")
executor: ConcurrentExecutor[
Transport, Sequence[Sequence[ExtractMetadata]]
] = ConcurrentExecutor(
*self._data_sources_to_tasks(an_input), initial_value=list()
executor: ConcurrentExecutor[Transport, Sequence[ExtractMetadata]]
executor = ConcurrentExecutor(*self._data_sources_to_tasks(an_input))
with executor:
futures: Sequence[Future[Sequence[ExtractMetadata]]]
futures = executor(self._transport) # noqa
# Focus on completed tasks and ignore the ones that failed.
completed_futures = as_completed(futures)
return tuple(
chain.from_iterable(
map(
lambda _f: _f.result(),
filter(
lambda _f: completed_successfully(_f),
completed_futures,
),
)
)
)
extracts: Sequence[Sequence[ExtractMetadata]]
extracts = executor(self._transport) # noqa
return tuple(chain.from_iterable(extracts))

@staticmethod
def _data_sources_to_tasks(
Expand Down
23 changes: 17 additions & 6 deletions app/use_cases/run_extraction.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from concurrent.futures import Future, as_completed
from itertools import chain, groupby
from logging import getLogger
from typing import Any, Sequence, Tuple

from app.core import DataSource, ExtractMetadata, Task
from app.lib import ConcurrentExecutor
from app.lib import ConcurrentExecutor, completed_successfully

from .types import RunExtractionResult

Expand Down Expand Up @@ -106,14 +107,24 @@ def run_data_source_extracts(
'Running extraction for data source="%s"', str(data_source)
)
with data_source:
executor: ConcurrentExecutor[
DataSource, Sequence[RunExtractionResult]
]
executor: ConcurrentExecutor[DataSource, RunExtractionResult]
executor = ConcurrentExecutor(
*(DoExtract(_extract) for _extract in extracts),
initial_value=list(),
)
return executor(data_source) # noqa
with executor:
futures: Sequence[Future[RunExtractionResult]]
futures = executor(data_source) # noqa
# Focus on completed tasks and ignore the ones that failed.
completed_futures = as_completed(futures)
return tuple(
map(
lambda _f: _f.result(),
filter(
lambda _f: completed_successfully(_f),
completed_futures,
),
)
)


# TODO: Add more tasks here to post process extraction results. E.g, handle
Expand Down
Loading

0 comments on commit bdebffb

Please sign in to comment.