Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customized concurrent handling #3654

Merged
merged 33 commits into from Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
46f5355
add customized concurrent handling
Apr 9, 2023
e3245c1
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 9, 2023
1ccfeae
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 10, 2023
a8bf427
add some requested changes
Apr 11, 2023
ecf2721
add docstrings
Apr 13, 2023
f9d8f30
some more improvements
Apr 14, 2023
ab95f79
exclude making new tasks when concurrent_updates is 0
Apr 17, 2023
2be64f2
requested changes
Apr 19, 2023
249caa4
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 19, 2023
88d11ba
add tests and requested changes
Apr 25, 2023
99e556e
fix tests and docstrings
Apr 25, 2023
19d47fe
add slots
Apr 25, 2023
c922b9c
extend test coverage
Apr 27, 2023
5f4ab1d
fix tests
May 2, 2023
f450e20
Merge branch 'master' into customized-concurrent-handling
clot27 May 2, 2023
a2bf83d
Merge branch 'master' into customized-concurrent-handling
clot27 May 7, 2023
3c8f4e5
add tests and some requested changes
May 7, 2023
9c0fa9a
fix tests
May 13, 2023
5b7c4a0
really fix them this time
May 13, 2023
f3a6585
Pimp my tests :)
Bibo-Joshi May 17, 2023
7a30965
Merge branch 'master' into customized-concurrent-handling
Bibo-Joshi May 17, 2023
d535c46
Fix failing tests
Bibo-Joshi May 17, 2023
bb7caf8
fix docs
May 18, 2023
b60c119
add test for SUP with 'is'
May 19, 2023
39b1adb
add final requested changes
May 20, 2023
6cc2102
Fine tune tests
Bibo-Joshi May 20, 2023
764a5d0
Add some versioning directives
Bibo-Joshi May 20, 2023
59db8d3
Merge branch 'master' into customized-concurrent-handling
Bibo-Joshi May 20, 2023
9aeeb08
typo
Bibo-Joshi May 20, 2023
168eac4
address final review
May 30, 2023
550431c
fix pre-commit
May 30, 2023
d03f81e
Update docs/source/telegram.ext.simpleupdateprocessor.rst
clot27 Jun 2, 2023
429c023
Merge branch 'master' into customized-concurrent-handling
clot27 Jun 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/telegram.ext.BaseProcessor.rst
clot27 marked this conversation as resolved.
Show resolved Hide resolved
@@ -0,0 +1,6 @@
BaseProcessor
======================
clot27 marked this conversation as resolved.
Show resolved Hide resolved

.. autoclass:: telegram.ext.BaseProcessor
:members:
:show-inheritance:
3 changes: 2 additions & 1 deletion telegram/ext/__init__.py
Expand Up @@ -25,6 +25,7 @@
"ApplicationHandlerStop",
"BaseHandler",
"BasePersistence",
"BaseProcessor",
clot27 marked this conversation as resolved.
Show resolved Hide resolved
"BaseRateLimiter",
"CallbackContext",
"CallbackDataCache",
Expand Down Expand Up @@ -59,7 +60,7 @@

from . import filters
from ._aioratelimiter import AIORateLimiter
from ._application import Application, ApplicationHandlerStop
from ._application import Application, ApplicationHandlerStop, BaseProcessor
from ._applicationbuilder import ApplicationBuilder
from ._basepersistence import BasePersistence, PersistenceInput
from ._baseratelimiter import BaseRateLimiter
Expand Down
97 changes: 87 additions & 10 deletions telegram/ext/_application.py
Expand Up @@ -24,6 +24,8 @@
import logging
import platform
import signal
from abc import abstractmethod
from asyncio import BoundedSemaphore
from collections import defaultdict
from copy import deepcopy
from pathlib import Path
Expand Down Expand Up @@ -259,7 +261,7 @@ def __init__(
update_queue: "asyncio.Queue[object]",
updater: Optional[Updater],
job_queue: JQ,
concurrent_updates: Union[bool, int],
concurrent_updates: Union[bool, int, "BaseProcessor"],
persistence: Optional[BasePersistence[UD, CD, BD]],
context_types: ContextTypes[CCT, UD, CD, BD],
post_init: Optional[
Expand Down Expand Up @@ -297,14 +299,16 @@ def __init__(
self.post_stop: Optional[
Callable[["Application[BT, CCT, UD, CD, BD, JQ]"], Coroutine[Any, Any, None]]
] = post_stop

if isinstance(concurrent_updates, int) and concurrent_updates < 0:
raise ValueError("`concurrent_updates` must be a non-negative integer!")
if concurrent_updates is True:
concurrent_updates = 256
self._concurrent_updates_sem = asyncio.BoundedSemaphore(concurrent_updates or 1)
self._concurrent_updates: int = concurrent_updates or 0

if isinstance(concurrent_updates, int):
if concurrent_updates < 0:
raise ValueError("`concurrent_updates` must be a non-negative integer!")
concurrent_updates = SimpleProcessor(concurrent_updates)
self._concurrent_updates_sem = asyncio.BoundedSemaphore(
concurrent_updates.max_concurrent_updates or 1
)
self._concurrent_updates: BaseProcessor = concurrent_updates
clot27 marked this conversation as resolved.
Show resolved Hide resolved
clot27 marked this conversation as resolved.
Show resolved Hide resolved
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self.bot_data: BD = self.context_types.bot_data()
self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data)
self._chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data)
Expand Down Expand Up @@ -361,7 +365,7 @@ def concurrent_updates(self) -> int:

.. seealso:: :wiki:`Concurrency`
"""
return self._concurrent_updates
return self._concurrent_updates.max_concurrent_updates
clot27 marked this conversation as resolved.
Show resolved Hide resolved

@property
def job_queue(self) -> Optional["JobQueue[CCT]"]:
Expand All @@ -379,6 +383,14 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]:
)
return self._job_queue

@property
def processor(self) -> Optional["BaseProcessor"]:
""":class:`telegram.ext.BaseProcessor`: The processor used by this application.

.. seealso:: :wiki:`Concurrency`
"""
return self._concurrent_updates

async def initialize(self) -> None:
"""Initializes the Application by initializing:

Expand Down Expand Up @@ -1047,7 +1059,6 @@ async def _update_fetcher(self) -> None:
# Continuously fetch updates from the queue. Exit only once the signal object is found.
while True:
update = await self.update_queue.get()

if update is _STOP_SIGNAL:
_logger.debug("Dropping pending updates")
while not self.update_queue.empty():
Expand All @@ -1061,7 +1072,12 @@ async def _update_fetcher(self) -> None:

if self._concurrent_updates:
clot27 marked this conversation as resolved.
Show resolved Hide resolved
# We don't await the below because it has to be run concurrently
self.create_task(self.__process_update_wrapper(update), update=update)
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self.create_task(
self._concurrent_updates.do_process_update(
update, self.process_update(update)
),
update=update,
)
else:
await self.__process_update_wrapper(update)

Expand Down Expand Up @@ -1672,3 +1688,64 @@ async def process_error(

_logger.exception("No error handlers are registered, logging exception.", exc_info=error)
return False


class BaseProcessor:
clot27 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, max_concurrent_updates: int):
self.max_concurrent_updates = max_concurrent_updates
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self.semaphore = BoundedSemaphore(self.max_concurrent_updates or 1)
clot27 marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
async def process_update(
self,
update: object,
coroutine: "Awaitable[Any]",
) -> None:
pass

@abstractmethod
async def initialize(self) -> None:
pass

@abstractmethod
async def shutdown(self) -> None:
pass

async def do_process_update(
self,
update: object,
coroutine: "Awaitable[Any]",
) -> None:
async with self.semaphore:
await self.process_update(update, coroutine)
clot27 marked this conversation as resolved.
Show resolved Hide resolved

async def __aenter__(self) -> "BaseProcessor":
try:
await self.initialize()
return self
except Exception as exc:
await self.shutdown()
raise exc

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
await self.shutdown()


class SimpleProcessor(BaseProcessor):
async def process_update(
self,
update: object,
coroutine: "Awaitable[Any]",
) -> None:
await coroutine

async def initialize(self) -> None:
pass

async def shutdown(self) -> None:
pass
16 changes: 10 additions & 6 deletions telegram/ext/_applicationbuilder.py
Expand Up @@ -35,7 +35,7 @@
from telegram._bot import Bot
from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue
from telegram._utils.types import DVInput, DVType, FilePathInput, ODVInput
from telegram.ext._application import Application
from telegram.ext._application import Application, BaseProcessor
from telegram.ext._contexttypes import ContextTypes
from telegram.ext._extbot import ExtBot
from telegram.ext._jobqueue import JobQueue
Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(self: "InitApplicationBuilder"):
self._context_types: DVType[ContextTypes] = DefaultValue(ContextTypes())
self._application_class: DVType[Type[Application]] = DefaultValue(Application)
self._application_kwargs: Dict[str, object] = {}
self._concurrent_updates: Union[int, DefaultValue[bool]] = DEFAULT_FALSE
self._concurrent_updates: Union[int, DefaultValue[bool], "BaseProcessor"] = DEFAULT_FALSE
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self._updater: ODVInput[Updater] = DEFAULT_NONE
self._post_init: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
self._post_shutdown: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
Expand Down Expand Up @@ -901,7 +901,9 @@ def update_queue(self: BuilderType, update_queue: "Queue[object]") -> BuilderTyp
self._update_queue = update_queue
return self

def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int]) -> BuilderType:
def concurrent_updates(
self: BuilderType, concurrent_updates: Union[bool, int, "BaseProcessor"]
) -> BuilderType:
"""Specifies if and how many updates may be processed concurrently instead of one by one.
If not called, updates will be processed one by one.

Expand All @@ -916,9 +918,11 @@ def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int])
.. seealso:: :attr:`telegram.ext.Application.concurrent_updates`

Args:
concurrent_updates (:obj:`bool` | :obj:`int`): Passing :obj:`True` will allow for
``256`` updates to be processed concurrently. Pass an integer to specify a
different number of updates that may be processed concurrently.
concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseProcessor`): Passing
:obj:`True` will allow for `256`` updates to be processed concurrently. Pass an
clot27 marked this conversation as resolved.
Show resolved Hide resolved
integer to specify a different number of updates that may be processed
concurrently. Pass an instance of `BaseProcessor` to use that instance
clot27 marked this conversation as resolved.
Show resolved Hide resolved
for handling updates concurrently.

Returns:
:class:`ApplicationBuilder`: The same builder with the updated argument.
Expand Down