Skip to content

Commit

Permalink
type annotate _threads._pool and ._team to fix no-return-any in t.p.t…
Browse files Browse the repository at this point in the history
…hreadpool
  • Loading branch information
graingert committed Aug 29, 2023
1 parent d120676 commit 97f26fc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ module = [
'twisted.runner.test.test_procmontap',
'twisted.runner.procmontap',
'twisted.python.threadpool',
'twisted._threads._pool',
'twisted._threads._team',
'twisted.python.test.test_zippath',
'twisted.python.test.test_win32',
'twisted.python.test.test_versions',
Expand Down
17 changes: 14 additions & 3 deletions src/twisted/_threads/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,24 @@

from queue import Queue
from threading import Lock, Thread, local as LocalStorage
from typing import Callable, Optional

from typing_extensions import Protocol

from twisted.python.log import err
from ._ithreads import IWorker
from ._team import Team
from ._threadworker import LockWorker, ThreadWorker


def pool(currentLimit, threadFactory=Thread):
class _ThreadFactory(Protocol):
def __call__(self, *, target: Callable[..., object]) -> Thread:
pass

Check warning on line 25 in src/twisted/_threads/_pool.py

View check run for this annotation

Codecov / codecov/patch

src/twisted/_threads/_pool.py#L25

Added line #L25 was not covered by tests


def pool(
currentLimit: Callable[[], int], threadFactory: _ThreadFactory = Thread
) -> Team:
"""
Construct a L{Team} that spawns threads as a thread pool, with the given
limiting function.
Expand Down Expand Up @@ -45,10 +56,10 @@ def pool(currentLimit, threadFactory=Thread):
@return: a new L{Team}.
"""

def startThread(target):
def startThread(target: Callable[..., object]) -> None:
return threadFactory(target=target).start()

def limitedWorkerCreator():
def limitedWorkerCreator() -> Optional[IWorker]:
stats = team.statistics()
if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit():
return None
Expand Down
44 changes: 27 additions & 17 deletions src/twisted/_threads/_team.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@


from collections import deque
from typing import Callable, Optional, Set

from zope.interface import implementer

from . import IWorker
from ._convenience import Quit
from ._ithreads import IExclusiveWorker


class Statistics:
Expand All @@ -32,7 +34,9 @@ class Statistics:
@type backloggedWorkCount: L{int}
"""

def __init__(self, idleWorkerCount, busyWorkerCount, backloggedWorkCount):
def __init__(
self, idleWorkerCount: int, busyWorkerCount: int, backloggedWorkCount: int
) -> None:
self.idleWorkerCount = idleWorkerCount
self.busyWorkerCount = busyWorkerCount
self.backloggedWorkCount = backloggedWorkCount
Expand Down Expand Up @@ -70,7 +74,12 @@ class Team:
next available opportunity; set in the coordinator.
"""

def __init__(self, coordinator, createWorker, logException):
def __init__(
self,
coordinator: IExclusiveWorker,
createWorker: Callable[[], Optional[IWorker]],
logException: Callable[[], None],
):
"""
@param coordinator: an L{IExclusiveWorker} which will coordinate access
to resources on this L{Team}; that is to say, an
Expand All @@ -91,21 +100,21 @@ def __init__(self, coordinator, createWorker, logException):
self._logException = logException

# Don't touch these except from the coordinator.
self._idle = set()
self._idle: Set[IWorker] = set()
self._busyCount = 0
self._pending = deque()
self._pending: "deque[Callable[..., object]]" = deque()
self._shouldQuitCoordinator = False
self._toShrink = 0

def statistics(self):
def statistics(self) -> Statistics:
"""
Gather information on the current status of this L{Team}.
@return: a L{Statistics} describing the current state of this L{Team}.
"""
return Statistics(len(self._idle), self._busyCount, len(self._pending))

def grow(self, n):
def grow(self, n: int) -> None:
"""
Increase the the number of idle workers by C{n}.
Expand All @@ -115,14 +124,14 @@ def grow(self, n):
self._quit.check()

@self._coordinator.do
def createOneWorker():
def createOneWorker() -> None:
for x in range(n):
worker = self._createWorker()
if worker is None:
return
self._recycleWorker(worker)

def shrink(self, n=None):
def shrink(self, n: Optional[int] = None) -> None:
"""
Decrease the number of idle workers by C{n}.
Expand All @@ -133,7 +142,7 @@ def shrink(self, n=None):
self._quit.check()
self._coordinator.do(lambda: self._quitIdlers(n))

def _quitIdlers(self, n=None):
def _quitIdlers(self, n: Optional[int] = None) -> None:
"""
The implmentation of C{shrink}, performed by the coordinator worker.
Expand All @@ -149,7 +158,7 @@ def _quitIdlers(self, n=None):
if self._shouldQuitCoordinator and self._busyCount == 0:
self._coordinator.quit()

def do(self, task):
def do(self, task: Callable[..., object]) -> None:
"""
Perform some work in a worker created by C{createWorker}.
Expand All @@ -158,7 +167,7 @@ def do(self, task):
self._quit.check()
self._coordinator.do(lambda: self._coordinateThisTask(task))

def _coordinateThisTask(self, task):
def _coordinateThisTask(self, task: Callable[..., object]) -> None:
"""
Select a worker to dispatch to, either an idle one or a new one, and
perform it.
Expand All @@ -174,21 +183,22 @@ def _coordinateThisTask(self, task):
# to create workers.
self._pending.append(task)
return
not_none_worker = worker
self._busyCount += 1

@worker.do
def doWork():
def doWork() -> None:
try:
task()
except BaseException:
self._logException()

@self._coordinator.do
def idleAndPending():
def idleAndPending() -> None:
self._busyCount -= 1
self._recycleWorker(worker)
self._recycleWorker(not_none_worker)

def _recycleWorker(self, worker):
def _recycleWorker(self, worker: IWorker) -> None:
"""
Called only from coordinator.
Expand All @@ -209,14 +219,14 @@ def _recycleWorker(self, worker):
self._idle.remove(worker)
worker.quit()

def quit(self):
def quit(self) -> None:
"""
Stop doing work and shut down all idle workers.
"""
self._quit.set()
# In case all the workers are idle when we do this.

@self._coordinator.do
def startFinishing():
def startFinishing() -> None:
self._shouldQuitCoordinator = True
self._quitIdlers()
4 changes: 2 additions & 2 deletions src/twisted/python/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def workers(self) -> int:
@rtype: L{int}
"""
stats = self._team.statistics()
return stats.idleWorkerCount + stats.busyWorkerCount # type: ignore[no-any-return]
return stats.idleWorkerCount + stats.busyWorkerCount

@property
def working(self) -> list[None]:
Expand Down Expand Up @@ -161,7 +161,7 @@ def qsize(q) -> int:
worker.
@rtype: L{int}
"""
return self._team.statistics().backloggedWorkCount # type: ignore[no-any-return]
return self._team.statistics().backloggedWorkCount

return NotAQueue()

Expand Down

0 comments on commit 97f26fc

Please sign in to comment.