Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added initial rework of the concurrent.futures module #5646

Merged
merged 33 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5285d39
Added initial rework of the concurrent.futures module
HunterAP23 Jun 16, 2021
3d8d8ab
Minor fixes
HunterAP23 Jun 16, 2021
69a5a03
Fixed some generics & changed to collections over typing for some types
HunterAP23 Jun 16, 2021
c408496
Switched thread to use queues instead of multiprocessing.queues
HunterAP23 Jun 16, 2021
cf486b1
More fixes
HunterAP23 Jun 16, 2021
e63acf4
More fixes following results from running tests locally
HunterAP23 Jun 17, 2021
8452717
Merge remote-tracking branch 'origin/master' into hunter
Akuli Aug 8, 2021
49150a6
Tmp commit of changes
HunterAP23 Aug 27, 2021
bf01949
Merge branch 'master' of https://github.com/python/typeshed
HunterAP23 Aug 27, 2021
a7b8a87
Minor flake8 fix
HunterAP23 Aug 27, 2021
6c5b37b
Fixing some issues
HunterAP23 Aug 27, 2021
8fe6f7d
Fixed a weakref.ref issue
HunterAP23 Aug 27, 2021
cb7843b
Fixed one more weakref issue
HunterAP23 Aug 27, 2021
6c82496
Fixed some issues with required version
HunterAP23 Aug 28, 2021
bfdbc03
Fixed more python min version requirements
HunterAP23 Aug 28, 2021
c5ec620
More min version fixes
HunterAP23 Aug 28, 2021
4fb3b5c
Fixed misc error in workflow regarded outdated pip
HunterAP23 Aug 28, 2021
6dc70cc
Replaced any usage of Optional and Union with proper form as describe…
HunterAP23 Aug 28, 2021
261beda
Fixed issue with using Callable definition
HunterAP23 Aug 28, 2021
580cdf8
Fixed last seen issues as per review
HunterAP23 Sep 2, 2021
563b89b
Fixed some basic issues & more proper import calls
HunterAP23 Sep 2, 2021
50bbdbc
Merge branch 'master' of https://github.com/python/typeshed
HunterAP23 Sep 2, 2021
c7fec25
Update stdlib/concurrent/futures/process.pyi
HunterAP23 Sep 2, 2021
d83538c
Update stdlib/concurrent/futures/process.pyi
HunterAP23 Sep 2, 2021
26186c0
Minor fixes
HunterAP23 Sep 2, 2021
aa500af
Merge branch 'master' of https://github.com/HunterAP23/typeshed
HunterAP23 Sep 2, 2021
1db3c67
Merge branch 'master' of https://github.com/python/typeshed
HunterAP23 Sep 2, 2021
bc4139d
More minor fixes
HunterAP23 Sep 2, 2021
7bf4226
Fixed up some issues & cleaned up imports
HunterAP23 Sep 2, 2021
9bae25f
Removed usage of Union
HunterAP23 Sep 2, 2021
b350059
Changed wait method to use Set of Future to work with mypy-primer for…
HunterAP23 Sep 2, 2021
001c56e
Reverted change to wait method and DoneAndNotDoneFutures class
HunterAP23 Sep 2, 2021
d7020fe
Fixed DoneAndNotDoneFutures again
HunterAP23 Sep 2, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ jobs:
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Update pip
run: python -m pip install -U pip
- name: Install dependencies
run: pip install $(grep mypy== requirements-tests-py3.txt)
- name: Run stubtest
Expand Down
5 changes: 4 additions & 1 deletion stdlib/concurrent/futures/_base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import sys
import threading
from _typeshed import Self
from abc import abstractmethod
from collections.abc import Container, Iterable, Iterator, Sequence, Set
from logging import Logger
from typing import Any, Callable, Container, Generic, Iterable, Iterator, Protocol, Sequence, Set, TypeVar, overload
from typing import Any, Callable, Generic, Protocol, TypeVar, overload

if sys.version_info >= (3, 9):
from types import GenericAlias
Expand All @@ -16,6 +17,8 @@ RUNNING: str
CANCELLED: str
CANCELLED_AND_NOTIFIED: str
FINISHED: str
_FUTURE_STATES: list[str]
_STATE_TO_DESCRIPTION_MAP: dict[str, str]
LOGGER: Logger

class Error(Exception): ...
Expand Down
154 changes: 145 additions & 9 deletions stdlib/concurrent/futures/process.pyi
Original file line number Diff line number Diff line change
@@ -1,28 +1,164 @@
import sys
import threading
import weakref
from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence
from concurrent.futures._base import Executor, Future
from multiprocessing.connection import Connection
from multiprocessing.context import BaseContext, Process
from multiprocessing.queues import Queue, SimpleQueue
from types import TracebackType
from typing import Any, Callable, Tuple

from ._base import Executor
_threads_wakeups: MutableMapping[Any, Any]
_global_shutdown: bool

EXTRA_QUEUED_CALLS: Any
class _ThreadWakeup:
_closed: bool
_reader: Connection
_writer: Connection
def __init__(self) -> None: ...
def close(self) -> None: ...
def wakeup(self) -> None: ...
def clear(self) -> None: ...

def _python_exit() -> None: ...

EXTRA_QUEUED_CALLS: int

_MAX_WINDOWS_WORKERS: int

class _RemoteTraceback(Exception):
tb: str
def __init__(self, tb: TracebackType) -> None: ...
def __str__(self) -> str: ...

class _ExceptionWithTraceback:
exc: BaseException
tb: TracebackType
def __init__(self, exc: BaseException, tb: TracebackType) -> None: ...
def __reduce__(self) -> str | Tuple[Any, ...]: ...

def _rebuild_exc(exc: Exception, tb: str) -> Exception: ...

class _WorkItem(object):
HunterAP23 marked this conversation as resolved.
Show resolved Hide resolved
future: Future[Any]
fn: Callable[..., Any]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, future: Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...

class _ResultItem(object):
work_id: int
exception: Exception
result: Any
def __init__(self, work_id: int, exception: Exception | None = ..., result: Any | None = ...) -> None: ...

class _CallItem(object):
work_id: int
fn: Callable[..., Any]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, work_id: int, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...

if sys.version_info >= (3, 7):
class _SafeQueue(Queue[Future[Any]]):
pending_work_items: MutableMapping[int, _WorkItem]
shutdown_lock: threading.Lock
thread_wakeup: _ThreadWakeup
if sys.version_info >= (3, 9):
def __init__(
self,
max_size: int | None = ...,
*,
ctx: BaseContext,
pending_work_items: MutableMapping[int, _WorkItem],
shutdown_lock: threading.Lock,
thread_wakeup: _ThreadWakeup,
) -> None: ...
else:
def __init__(
self, max_size: int | None = ..., *, ctx: BaseContext, pending_work_items: MutableMapping[int, _WorkItem]
) -> None: ...
def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ...

def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ...
HunterAP23 marked this conversation as resolved.
Show resolved Hide resolved
def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ...
def _sendback_result(
result_queue: SimpleQueue[_WorkItem], work_id: int, result: Any | None = ..., exception: Exception | None = ...
) -> None: ...

if sys.version_info >= (3, 7):
from ._base import BrokenExecutor
def _process_worker(
call_queue: Queue[_CallItem],
result_queue: SimpleQueue[_ResultItem],
initializer: Callable[..., None] | None,
initargs: Tuple[Any, ...],
) -> None: ...

else:
def _process_worker(call_queue: Queue[_CallItem], result_queue: SimpleQueue[_ResultItem]) -> None: ...

if sys.version_info >= (3, 9):
class _ExecutorManagerThread(threading.Thread):
thread_wakeup: _ThreadWakeup
shutdown_lock: threading.Lock
executor_reference: weakref.ref[Any]
processes: MutableMapping[int, Process]
call_queue: Queue[_CallItem]
result_queue: SimpleQueue[_ResultItem]
work_ids_queue: Queue[int]
pending_work_items: MutableMapping[int, _WorkItem]
def __init__(self, executor: ProcessPoolExecutor) -> None: ...
def run(self) -> None: ...
def add_call_item_to_queue(self) -> None: ...
def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ...
def process_result_item(self, result_item: Any) -> None: ...
HunterAP23 marked this conversation as resolved.
Show resolved Hide resolved
def is_shutting_down(self) -> bool: ...
def terminate_broken(self, cause: str) -> None: ...
def flag_executor_shutting_down(self) -> None: ...
def shutdown_workers(self) -> None: ...
def join_executor_internals(self) -> None: ...
def get_n_children_alive(self) -> int: ...

_system_limits_checked: bool
_system_limited: bool | None

def _check_system_limits() -> None: ...
def _chain_from_iterable_of_lists(iterable: Iterable[MutableSequence[Any]]) -> Any: ...

if sys.version_info >= (3, 7):
from concurrent.futures._base import BrokenExecutor
class BrokenProcessPool(BrokenExecutor): ...

else:
class BrokenProcessPool(RuntimeError): ...

if sys.version_info >= (3, 7):
from multiprocessing.context import BaseContext
class ProcessPoolExecutor(Executor):
class ProcessPoolExecutor(Executor):
_mp_context: BaseContext | None = ...
_initializer: Callable[..., None] | None = ...
_initargs: Tuple[Any, ...] = ...
_executor_manager_thread: _ThreadWakeup
_processes: MutableMapping[int, Process]
_shutdown_thread: bool
_shutdown_lock: threading.Lock
_idle_worker_semaphore: threading.Semaphore
_broken: bool
_queue_count: int
_pending_work_items: MutableMapping[int, _WorkItem]
_cancel_pending_futures: bool
_executor_manager_thread_wakeup: _ThreadWakeup
_result_queue: SimpleQueue[Any]
_work_ids: Queue[Any]
if sys.version_info >= (3, 7):
def __init__(
self,
max_workers: int | None = ...,
mp_context: BaseContext | None = ...,
initializer: Callable[..., None] | None = ...,
initargs: Tuple[Any, ...] = ...,
) -> None: ...

else:
class ProcessPoolExecutor(Executor):
else:
def __init__(self, max_workers: int | None = ...) -> None: ...
if sys.version_info >= (3, 9):
def _start_executor_manager_thread(self) -> None: ...
def _adjust_process_count(self) -> None: ...
69 changes: 50 additions & 19 deletions stdlib/concurrent/futures/thread.pyi
Original file line number Diff line number Diff line change
@@ -1,23 +1,61 @@
import queue
import sys
from typing import Any, Callable, Generic, Iterable, Mapping, Tuple, TypeVar
import threading
import weakref
from collections.abc import Iterable, Mapping, Set
from concurrent.futures import _base
from typing import Any, Callable, Tuple

from ._base import Executor, Future
_threads_queues: Mapping[Any, Any]
_shutdown: bool
_global_shutdown_lock: threading.Lock

if sys.version_info >= (3, 7):
from ._base import BrokenExecutor
class BrokenThreadPool(BrokenExecutor): ...
def _python_exit() -> None: ...

if sys.version_info >= (3, 9):
from types import GenericAlias

_S = TypeVar("_S")
class _WorkItem(object):
HunterAP23 marked this conversation as resolved.
Show resolved Hide resolved
future: _base.Future[Any]
fn: Callable[..., Any]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(
self, future: _base.Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]
) -> None: ...
def run(self) -> None: ...
if sys.version_info >= (3, 9):
def __class_getitem__(cls, item: Any) -> GenericAlias: ...

if sys.version_info >= (3, 7):
def _worker(
executor_reference: weakref.ref[Any],
work_queue: queue.SimpleQueue[Any],
initializer: Callable[..., None],
initargs: Tuple[Any, ...],
) -> None: ...

else:
def _worker(executor_reference: weakref.ref[Any], work_queue: queue.Queue[Any]) -> None: ...

class ThreadPoolExecutor(Executor):
if sys.version_info >= (3, 7):
from ._base import BrokenExecutor
class BrokenThreadPool(BrokenExecutor): ...

class ThreadPoolExecutor(_base.Executor):
_max_workers: int
_idle_semaphore: threading.Semaphore
_threads: Set[threading.Thread]
_broken: bool
_shutdown: bool
_shutdown_lock: threading.Lock
_thread_name_prefix: str | None = ...
_initializer: Callable[..., None] | None = ...
_initargs: Tuple[Any, ...] = ...
if sys.version_info >= (3, 7):
_work_queue: queue.SimpleQueue[Any]
_work_queue: queue.SimpleQueue[_WorkItem]
else:
_work_queue: queue.Queue[Any]
_work_queue: queue.Queue[_WorkItem]
if sys.version_info >= (3, 7):
def __init__(
self,
Expand All @@ -28,13 +66,6 @@ class ThreadPoolExecutor(Executor):
) -> None: ...
else:
def __init__(self, max_workers: int | None = ..., thread_name_prefix: str = ...) -> None: ...

class _WorkItem(Generic[_S]):
future: Future[_S]
fn: Callable[..., _S]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
def run(self) -> None: ...
if sys.version_info >= (3, 9):
def __class_getitem__(cls, item: Any) -> GenericAlias: ...
def _adjust_thread_count(self) -> None: ...
if sys.version_info >= (3, 7):
def _initializer_failed(self) -> None: ...