Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/package/concurrent/executor/.pages
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
nav:
- pycommons.base.concurrent.executor: executor.md

title: executor
364 changes: 178 additions & 186 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pycommons/base/concurrent/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .direct import DirectExecutor
from .executors import Executors

__all__ = ["DirectExecutor"]
__all__ = ["DirectExecutor", "Executors"]
55 changes: 55 additions & 0 deletions pycommons/base/concurrent/executor/executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any

from .direct import DirectExecutor
from ...utils import UtilityClass


class Executors(UtilityClass):
"""
The Executors Utility class that contains methods to create different executors.
"""

@classmethod
def get_direct_executor(cls) -> DirectExecutor:
"""
Get the singleton instance of "DirectExecutor" that runs the callable
in the same thread as the caller.

Returns:
The singleton instance of `DirectExecutor`
"""
return DirectExecutor.get_instance()

@classmethod
def new_single_thread_executor(cls, *args: Any, **kwargs: Any) -> ThreadPoolExecutor:
"""
A special threadpool executor where the max number of worker
threads is 1. If multiple tasks are submitted to this executor, they are queued
until the thread becomes idle.

Args:
*args: Arguments for threadpool executor
**kwargs: Keyword Arguments for threadpool executor

Returns:
a new instance of `ThreadPoolExecutor` with number of threads set to 1
"""
return cls.new_fixed_thread_pool_executor(1, *args, **kwargs)

@classmethod
def new_fixed_thread_pool_executor(
cls, n_threads: int, *args: Any, **kwargs: Any
) -> ThreadPoolExecutor:
"""
A fixed threadpool with number of threads set. Can be used within a context

Args:
n_threads: Number of worker threads
*args: Arguments for threadpool executor
**kwargs: Keyword Arguments for threadpool executor

Returns:
A new instance of threadpool executor
"""
return ThreadPoolExecutor(n_threads, *args, **kwargs)
Empty file.
18 changes: 7 additions & 11 deletions tests/pycommons/base/concurrent/executors/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@ def test_direct_executor_executes_runnable_on_the_same_thread(self):
def runnable():
return threading.current_thread()

executor = DirectExecutor.get_instance()

future = executor.submit(runnable)

self.assertEqual(threading.current_thread(), future.result())
with DirectExecutor.get_instance() as executor:
future = executor.submit(runnable)
self.assertEqual(threading.current_thread(), future.result())

def test_direct_executor_executes_runnable_and_throws_exception_on_the_same_thread(self):
def runnable():
raise Exception(threading.current_thread())

executor = DirectExecutor.get_instance()

future = executor.submit(runnable)
raise RuntimeError(threading.current_thread())

self.assertEqual(threading.current_thread(), future.exception().args[0])
with DirectExecutor.get_instance() as executor:
future = executor.submit(runnable)
self.assertEqual(threading.current_thread(), future.exception().args[0])
33 changes: 33 additions & 0 deletions tests/pycommons/base/concurrent/executors/test_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import threading
from typing import List
from unittest import TestCase

from pycommons.base.concurrent.executor import Executors


class TestExecutors(TestCase):
def test_single_thread_executor(self):
def runnable(*args):
assert args[0] is ...
return threading.current_thread()

with Executors.new_single_thread_executor() as executor:
threads: List[threading.Thread] = list(executor.map(runnable, (..., ...)))
self.assertEqual(threads[0].ident, threads[1].ident)

def test_direct_executor_executes_runnable_on_the_same_thread(self):
def runnable():
return threading.current_thread()

with Executors.get_direct_executor() as executor:
future = executor.submit(runnable)
self.assertEqual(threading.current_thread(), future.result())

def test_fixed_threadpool_executor(self):
def runnable(*args):
assert args[0] is ...
return threading.current_thread()

with Executors.new_fixed_thread_pool_executor(1) as executor:
threads: List[threading.Thread] = list(executor.map(runnable, (..., ...)))
self.assertEqual(threads[0].ident, threads[1].ident)