Skip to content

Commit

Permalink
Added initial rework of the concurrent.futures module (#5646)
Browse files Browse the repository at this point in the history
* Added initial rework of the concurrent.futures module

* Minor fixes

* Fixed some generics & changed to collections over typing for some types

* Switched thread to use queues instead of multiprocessing.queues

* More fixes

* More fixes following results from running tests locally

* Tmp commit of changes

* Minor flake8 fix

* Fixing some issues

* Fixed a weakref.ref issue

* Fixed one more weakref issue

* Fixed some issues with required version

* Fixed more python min version requirements

* More min version fixes

* Fixed misc error in workflow regarded outdated pip

* Replaced any usage of Optional and Union with proper form as described in the contributions guide

* Fixed issue with using Callable definition

* Fixed last seen issues as per review

* Fixed some basic issues & more proper import calls

* Update stdlib/concurrent/futures/process.pyi

Co-authored-by: Sebastian Rittau <srittau@rittau.biz>

* Update stdlib/concurrent/futures/process.pyi

Co-authored-by: Sebastian Rittau <srittau@rittau.biz>

* Minor fixes

* More minor fixes

* Fixed up some issues & cleaned up imports

* Removed usage of Union

* Changed wait method to use Set of Future to work with mypy-primer for Optuna repo

* Reverted change to wait method and DoneAndNotDoneFutures class

* Fixed DoneAndNotDoneFutures again

Co-authored-by: Akuli <akuviljanen17@gmail.com>
Co-authored-by: Sebastian Rittau <srittau@rittau.biz>
  • Loading branch information
3 people committed Sep 3, 2021
1 parent 1af6810 commit ffaa0ea
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 26 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ jobs:
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Update pip
run: python -m pip install -U pip
- name: Install dependencies
run: pip install $(grep mypy== requirements-tests-py3.txt)
- name: Run stubtest
Expand Down
5 changes: 4 additions & 1 deletion stdlib/concurrent/futures/_base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import sys
import threading
from _typeshed import Self
from abc import abstractmethod
from collections.abc import Container, Iterable, Iterator, Sequence, Set
from logging import Logger
from typing import Any, Callable, Container, Generic, Iterable, Iterator, Protocol, Sequence, Set, TypeVar, overload
from typing import Any, Callable, Generic, Protocol, TypeVar, overload

if sys.version_info >= (3, 9):
from types import GenericAlias
Expand All @@ -16,6 +17,8 @@ RUNNING: str
CANCELLED: str
CANCELLED_AND_NOTIFIED: str
FINISHED: str
_FUTURE_STATES: list[str]
_STATE_TO_DESCRIPTION_MAP: dict[str, str]
LOGGER: Logger

class Error(Exception): ...
Expand Down
157 changes: 148 additions & 9 deletions stdlib/concurrent/futures/process.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,133 @@
import sys
from typing import Any, Callable, Tuple
from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence
from multiprocessing.connection import Connection
from multiprocessing.context import BaseContext, Process
from multiprocessing.queues import Queue, SimpleQueue
from threading import Lock, Semaphore, Thread
from types import TracebackType
from typing import Any, Callable, Generic, Tuple, TypeVar
from weakref import ref

from ._base import Executor
from ._base import Executor, Future

EXTRA_QUEUED_CALLS: Any
_threads_wakeups: MutableMapping[Any, Any]
_global_shutdown: bool

class _ThreadWakeup:
_closed: bool
_reader: Connection
_writer: Connection
def __init__(self) -> None: ...
def close(self) -> None: ...
def wakeup(self) -> None: ...
def clear(self) -> None: ...

def _python_exit() -> None: ...

EXTRA_QUEUED_CALLS: int

_MAX_WINDOWS_WORKERS: int

class _RemoteTraceback(Exception):
tb: str
def __init__(self, tb: TracebackType) -> None: ...
def __str__(self) -> str: ...

class _ExceptionWithTraceback:
exc: BaseException
tb: TracebackType
def __init__(self, exc: BaseException, tb: TracebackType) -> None: ...
def __reduce__(self) -> str | Tuple[Any, ...]: ...

def _rebuild_exc(exc: Exception, tb: str) -> Exception: ...

_S = TypeVar("_S")

class _WorkItem(Generic[_S]):
future: Future[_S]
fn: Callable[..., _S]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...

class _ResultItem:
work_id: int
exception: Exception
result: Any
def __init__(self, work_id: int, exception: Exception | None = ..., result: Any | None = ...) -> None: ...

class _CallItem:
work_id: int
fn: Callable[..., Any]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, work_id: int, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...

if sys.version_info >= (3, 7):
class _SafeQueue(Queue[Future[Any]]):
pending_work_items: dict[int, _WorkItem[Any]]
shutdown_lock: Lock
thread_wakeup: _ThreadWakeup
if sys.version_info >= (3, 9):
def __init__(
self,
max_size: int | None = ...,
*,
ctx: BaseContext,
pending_work_items: dict[int, _WorkItem[Any]],
shutdown_lock: Lock,
thread_wakeup: _ThreadWakeup,
) -> None: ...
else:
def __init__(
self, max_size: int | None = ..., *, ctx: BaseContext, pending_work_items: dict[int, _WorkItem[Any]]
) -> None: ...
def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ...

def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any, ...], None, None]: ...
def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ...
def _sendback_result(
result_queue: SimpleQueue[_WorkItem[Any]], work_id: int, result: Any | None = ..., exception: Exception | None = ...
) -> None: ...

if sys.version_info >= (3, 7):
def _process_worker(
call_queue: Queue[_CallItem],
result_queue: SimpleQueue[_ResultItem],
initializer: Callable[..., None] | None,
initargs: Tuple[Any, ...],
) -> None: ...

else:
def _process_worker(call_queue: Queue[_CallItem], result_queue: SimpleQueue[_ResultItem]) -> None: ...

if sys.version_info >= (3, 9):
class _ExecutorManagerThread(Thread):
thread_wakeup: _ThreadWakeup
shutdown_lock: Lock
executor_reference: ref[Any]
processes: MutableMapping[int, Process]
call_queue: Queue[_CallItem]
result_queue: SimpleQueue[_ResultItem]
work_ids_queue: Queue[int]
pending_work_items: dict[int, _WorkItem[Any]]
def __init__(self, executor: ProcessPoolExecutor) -> None: ...
def run(self) -> None: ...
def add_call_item_to_queue(self) -> None: ...
def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ...
def process_result_item(self, result_item: int | _ResultItem) -> None: ...
def is_shutting_down(self) -> bool: ...
def terminate_broken(self, cause: str) -> None: ...
def flag_executor_shutting_down(self) -> None: ...
def shutdown_workers(self) -> None: ...
def join_executor_internals(self) -> None: ...
def get_n_children_alive(self) -> int: ...

_system_limits_checked: bool
_system_limited: bool | None

def _check_system_limits() -> None: ...
def _chain_from_iterable_of_lists(iterable: Iterable[MutableSequence[Any]]) -> Any: ...

if sys.version_info >= (3, 7):
from ._base import BrokenExecutor
Expand All @@ -12,17 +136,32 @@ if sys.version_info >= (3, 7):
else:
class BrokenProcessPool(RuntimeError): ...

if sys.version_info >= (3, 7):
from multiprocessing.context import BaseContext
class ProcessPoolExecutor(Executor):
class ProcessPoolExecutor(Executor):
_mp_context: BaseContext | None = ...
_initializer: Callable[..., None] | None = ...
_initargs: Tuple[Any, ...] = ...
_executor_manager_thread: _ThreadWakeup
_processes: MutableMapping[int, Process]
_shutdown_thread: bool
_shutdown_lock: Lock
_idle_worker_semaphore: Semaphore
_broken: bool
_queue_count: int
_pending_work_items: dict[int, _WorkItem[Any]]
_cancel_pending_futures: bool
_executor_manager_thread_wakeup: _ThreadWakeup
_result_queue: SimpleQueue[Any]
_work_ids: Queue[Any]
if sys.version_info >= (3, 7):
def __init__(
self,
max_workers: int | None = ...,
mp_context: BaseContext | None = ...,
initializer: Callable[..., None] | None = ...,
initargs: Tuple[Any, ...] = ...,
) -> None: ...

else:
class ProcessPoolExecutor(Executor):
else:
def __init__(self, max_workers: int | None = ...) -> None: ...
if sys.version_info >= (3, 9):
def _start_executor_manager_thread(self) -> None: ...
def _adjust_process_count(self) -> None: ...
64 changes: 48 additions & 16 deletions stdlib/concurrent/futures/thread.pyi
Original file line number Diff line number Diff line change
@@ -1,23 +1,62 @@
import queue
import sys
from typing import Any, Callable, Generic, Iterable, Mapping, Tuple, TypeVar
from collections.abc import Iterable, Mapping, Set
from threading import Lock, Semaphore, Thread
from typing import Any, Callable, Generic, Tuple, TypeVar
from weakref import ref

from ._base import Executor, Future

if sys.version_info >= (3, 7):
from ._base import BrokenExecutor
class BrokenThreadPool(BrokenExecutor): ...
_threads_queues: Mapping[Any, Any]
_shutdown: bool
_global_shutdown_lock: Lock

def _python_exit() -> None: ...

if sys.version_info >= (3, 9):
from types import GenericAlias

_S = TypeVar("_S")

class _WorkItem(Generic[_S]):
future: Future[_S]
fn: Callable[..., _S]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
def run(self) -> None: ...
if sys.version_info >= (3, 9):
def __class_getitem__(cls, item: Any) -> GenericAlias: ...

if sys.version_info >= (3, 7):
def _worker(
executor_reference: ref[Any],
work_queue: queue.SimpleQueue[Any],
initializer: Callable[..., None],
initargs: Tuple[Any, ...],
) -> None: ...

else:
def _worker(executor_reference: ref[Any], work_queue: queue.Queue[Any]) -> None: ...

if sys.version_info >= (3, 7):
from ._base import BrokenExecutor
class BrokenThreadPool(BrokenExecutor): ...

class ThreadPoolExecutor(Executor):
_max_workers: int
_idle_semaphore: Semaphore
_threads: Set[Thread]
_broken: bool
_shutdown: bool
_shutdown_lock: Lock
_thread_name_prefix: str | None = ...
_initializer: Callable[..., None] | None = ...
_initargs: Tuple[Any, ...] = ...
if sys.version_info >= (3, 7):
_work_queue: queue.SimpleQueue[Any]
_work_queue: queue.SimpleQueue[_WorkItem[Any]]
else:
_work_queue: queue.Queue[Any]
_work_queue: queue.Queue[_WorkItem[Any]]
if sys.version_info >= (3, 7):
def __init__(
self,
Expand All @@ -28,13 +67,6 @@ class ThreadPoolExecutor(Executor):
) -> None: ...
else:
def __init__(self, max_workers: int | None = ..., thread_name_prefix: str = ...) -> None: ...

class _WorkItem(Generic[_S]):
future: Future[_S]
fn: Callable[..., _S]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
def run(self) -> None: ...
if sys.version_info >= (3, 9):
def __class_getitem__(cls, item: Any) -> GenericAlias: ...
def _adjust_thread_count(self) -> None: ...
if sys.version_info >= (3, 7):
def _initializer_failed(self) -> None: ...

0 comments on commit ffaa0ea

Please sign in to comment.