Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customized concurrent handling #3654

Merged
merged 33 commits into from Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
46f5355
add customized concurrent handling
Apr 9, 2023
e3245c1
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 9, 2023
1ccfeae
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 10, 2023
a8bf427
add some requested changes
Apr 11, 2023
ecf2721
add docstrings
Apr 13, 2023
f9d8f30
some more improvements
Apr 14, 2023
ab95f79
exclude making new tasks when concurrent_updates is 0
Apr 17, 2023
2be64f2
requested changes
Apr 19, 2023
249caa4
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 19, 2023
88d11ba
add tests and requested changes
Apr 25, 2023
99e556e
fix tests and docstrings
Apr 25, 2023
19d47fe
add slots
Apr 25, 2023
c922b9c
extend test coverage
Apr 27, 2023
5f4ab1d
fix tests
May 2, 2023
f450e20
Merge branch 'master' into customized-concurrent-handling
clot27 May 2, 2023
a2bf83d
Merge branch 'master' into customized-concurrent-handling
clot27 May 7, 2023
3c8f4e5
add tests and some requested changes
May 7, 2023
9c0fa9a
fix tests
May 13, 2023
5b7c4a0
really fix them this time
May 13, 2023
f3a6585
Pimp my tests :)
Bibo-Joshi May 17, 2023
7a30965
Merge branch 'master' into customized-concurrent-handling
Bibo-Joshi May 17, 2023
d535c46
Fix failing tests
Bibo-Joshi May 17, 2023
bb7caf8
fix docs
May 18, 2023
b60c119
add test for SUP with 'is'
May 19, 2023
39b1adb
add final requested changes
May 20, 2023
6cc2102
Fine tune tests
Bibo-Joshi May 20, 2023
764a5d0
Add some versioning directives
Bibo-Joshi May 20, 2023
59db8d3
Merge branch 'master' into customized-concurrent-handling
Bibo-Joshi May 20, 2023
9aeeb08
typo
Bibo-Joshi May 20, 2023
168eac4
address final review
May 30, 2023
550431c
fix pre-commit
May 30, 2023
d03f81e
Update docs/source/telegram.ext.simpleupdateprocessor.rst
clot27 Jun 2, 2023
429c023
Merge branch 'master' into customized-concurrent-handling
clot27 Jun 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/telegram.ext.baseupdateprocessor.rst
@@ -0,0 +1,6 @@
BaseUpdateProcessor
===================

.. autoclass:: telegram.ext.BaseUpdateProcessor
:members:
:show-inheritance:
2 changes: 2 additions & 0 deletions docs/source/telegram.ext.rst
Expand Up @@ -9,12 +9,14 @@ telegram.ext package
telegram.ext.application
telegram.ext.applicationbuilder
telegram.ext.applicationhandlerstop
telegram.ext.baseupdateprocessor
telegram.ext.callbackcontext
telegram.ext.contexttypes
telegram.ext.defaults
telegram.ext.extbot
telegram.ext.job
telegram.ext.jobqueue
telegram.ext.simpleupdateprocessor
telegram.ext.updater
telegram.ext.handlers-tree.rst
telegram.ext.persistence-tree.rst
Expand Down
7 changes: 7 additions & 0 deletions docs/source/telegram.ext.simpleupdateprocessor.rst
@@ -0,0 +1,7 @@
SimpleUpdateProcessor
=====================

.. autoclass:: telegram.ext.SimpleUpdateProcessor
:inherited-members: telegram.ext.BaseUpdateProcessor
clot27 marked this conversation as resolved.
Show resolved Hide resolved
:members:
:show-inheritance:
3 changes: 3 additions & 0 deletions telegram/ext/__init__.py
Expand Up @@ -26,6 +26,7 @@
"BaseHandler",
"BasePersistence",
"BaseRateLimiter",
"BaseUpdateProcessor",
"CallbackContext",
"CallbackDataCache",
"CallbackQueryHandler",
Expand All @@ -51,6 +52,7 @@
"PreCheckoutQueryHandler",
"PrefixHandler",
"ShippingQueryHandler",
"SimpleUpdateProcessor",
"StringCommandHandler",
"StringRegexHandler",
"TypeHandler",
Expand All @@ -63,6 +65,7 @@
from ._applicationbuilder import ApplicationBuilder
from ._basepersistence import BasePersistence, PersistenceInput
from ._baseratelimiter import BaseRateLimiter
from ._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor
from ._callbackcontext import CallbackContext
from ._callbackdatacache import CallbackDataCache, InvalidCallbackData
from ._callbackqueryhandler import CallbackQueryHandler
Expand Down
52 changes: 35 additions & 17 deletions telegram/ext/_application.py
Expand Up @@ -57,6 +57,7 @@
from telegram._utils.warnings import warn
from telegram.error import TelegramError
from telegram.ext._basepersistence import BasePersistence
from telegram.ext._baseupdateprocessor import BaseUpdateProcessor
from telegram.ext._contexttypes import ContextTypes
from telegram.ext._extbot import ExtBot
from telegram.ext._handler import BaseHandler
Expand Down Expand Up @@ -228,12 +229,11 @@ class Application(Generic[BT, CCT, UD, CD, BD, JQ], AsyncContextManager["Applica
"_chat_data",
"_chat_ids_to_be_deleted_in_persistence",
"_chat_ids_to_be_updated_in_persistence",
"_concurrent_updates",
"_concurrent_updates_sem",
"_conversation_handler_conversations",
"_initialized",
"_job_queue",
"_running",
"_update_processor",
"_user_data",
"_user_ids_to_be_deleted_in_persistence",
"_user_ids_to_be_updated_in_persistence",
Expand All @@ -259,7 +259,7 @@ def __init__(
update_queue: "asyncio.Queue[object]",
updater: Optional[Updater],
job_queue: JQ,
concurrent_updates: Union[bool, int],
update_processor: "BaseUpdateProcessor",
persistence: Optional[BasePersistence[UD, CD, BD]],
context_types: ContextTypes[CCT, UD, CD, BD],
post_init: Optional[
Expand Down Expand Up @@ -297,14 +297,7 @@ def __init__(
self.post_stop: Optional[
Callable[["Application[BT, CCT, UD, CD, BD, JQ]"], Coroutine[Any, Any, None]]
] = post_stop

if isinstance(concurrent_updates, int) and concurrent_updates < 0:
raise ValueError("`concurrent_updates` must be a non-negative integer!")
if concurrent_updates is True:
concurrent_updates = 256
self._concurrent_updates_sem = asyncio.BoundedSemaphore(concurrent_updates or 1)
self._concurrent_updates: int = concurrent_updates or 0

self._update_processor = update_processor
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self.bot_data: BD = self.context_types.bot_data()
self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data)
self._chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data)
Expand Down Expand Up @@ -359,9 +352,13 @@ def concurrent_updates(self) -> int:
""":obj:`int`: The number of concurrent updates that will be processed in parallel. A
value of ``0`` indicates updates are *not* being processed concurrently.

.. versionchanged:: NEXT.VERSION
This is now just a shortcut to :attr:`update_processor.max_concurrent_updates
<telegram.ext.BaseUpdateProcessor.max_concurrent_updates>`.

.. seealso:: :wiki:`Concurrency`
"""
return self._concurrent_updates
return self._update_processor.max_concurrent_updates

@property
def job_queue(self) -> Optional["JobQueue[CCT]"]:
Expand All @@ -379,12 +376,25 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]:
)
return self._job_queue

@property
def update_processor(self) -> "BaseUpdateProcessor":
""":class:`telegram.ext.BaseUpdateProcessor`: The update processor used by this
application.

.. seealso:: :wiki:`Concurrency`

.. versionadded:: NEXT.VERSION
"""
return self._update_processor

async def initialize(self) -> None:
"""Initializes the Application by initializing:

* The :attr:`bot`, by calling :meth:`telegram.Bot.initialize`.
* The :attr:`updater`, by calling :meth:`telegram.ext.Updater.initialize`.
* The :attr:`persistence`, by loading persistent conversations and data.
* The :attr:`update_processor` by calling
:meth:`telegram.ext.BaseUpdateProcessor.initialize`.

Does *not* call :attr:`post_init` - that is only done by :meth:`run_polling` and
:meth:`run_webhook`.
Expand All @@ -397,6 +407,8 @@ async def initialize(self) -> None:
return

await self.bot.initialize()
await self._update_processor.initialize()

if self.updater:
await self.updater.initialize()

Expand Down Expand Up @@ -429,6 +441,7 @@ async def shutdown(self) -> None:
* :attr:`updater` by calling :meth:`telegram.ext.Updater.shutdown`
* :attr:`persistence` by calling :meth:`update_persistence` and
:meth:`BasePersistence.flush`
* :attr:`update_processor` by calling :meth:`telegram.ext.BaseUpdateProcessor.shutdown`

Does *not* call :attr:`post_shutdown` - that is only done by :meth:`run_polling` and
:meth:`run_webhook`.
Expand All @@ -447,6 +460,8 @@ async def shutdown(self) -> None:
return

await self.bot.shutdown()
await self._update_processor.shutdown()

if self.updater:
await self.updater.shutdown()

Expand Down Expand Up @@ -1060,11 +1075,15 @@ async def _update_fetcher(self) -> None:

_LOGGER.debug("Processing update %s", update)

if self._concurrent_updates:
if self._update_processor.max_concurrent_updates > 1:
# We don't await the below because it has to be run concurrently
self.create_task(self.__process_update_wrapper(update), update=update)
self.create_task(
self.__process_update_wrapper(update),
update=update,
)
else:
await self.__process_update_wrapper(update)

except asyncio.CancelledError:
# This may happen if the application is manually run via application.start() and
# then a KeyboardInterrupt is sent. We must prevent this loop to die since
Expand All @@ -1075,9 +1094,8 @@ async def _update_fetcher(self) -> None:
)

async def __process_update_wrapper(self, update: object) -> None:
async with self._concurrent_updates_sem:
await self.process_update(update)
self.update_queue.task_done()
await self._update_processor.process_update(update, self.process_update(update))
self.update_queue.task_done()

async def process_update(self, update: object) -> None:
"""Processes a single update and marks the update to be updated by the persistence later.
Expand Down
41 changes: 33 additions & 8 deletions telegram/ext/_applicationbuilder.py
Expand Up @@ -36,6 +36,7 @@
from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue
from telegram._utils.types import DVInput, DVType, FilePathInput, ODVInput
from telegram.ext._application import Application
from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor
from telegram.ext._contexttypes import ContextTypes
from telegram.ext._extbot import ExtBot
from telegram.ext._jobqueue import JobQueue
Expand Down Expand Up @@ -127,7 +128,7 @@ class ApplicationBuilder(Generic[BT, CCT, UD, CD, BD, JQ]):
"_base_file_url",
"_base_url",
"_bot",
"_concurrent_updates",
"_update_processor",
"_connect_timeout",
"_connection_pool_size",
"_context_types",
Expand Down Expand Up @@ -198,7 +199,9 @@ def __init__(self: "InitApplicationBuilder"):
self._context_types: DVType[ContextTypes] = DefaultValue(ContextTypes())
self._application_class: DVType[Type[Application]] = DefaultValue(Application)
self._application_kwargs: Dict[str, object] = {}
self._concurrent_updates: Union[int, DefaultValue[bool]] = DEFAULT_FALSE
self._update_processor: "BaseUpdateProcessor" = SimpleUpdateProcessor(
max_concurrent_updates=1
)
self._updater: ODVInput[Updater] = DEFAULT_NONE
self._post_init: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
self._post_shutdown: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
Expand Down Expand Up @@ -306,7 +309,7 @@ def build(
bot=bot,
update_queue=update_queue,
updater=updater,
concurrent_updates=DefaultValue.get_value(self._concurrent_updates),
update_processor=self._update_processor,
job_queue=job_queue,
persistence=persistence,
context_types=DefaultValue.get_value(self._context_types),
Expand Down Expand Up @@ -902,7 +905,9 @@ def update_queue(self: BuilderType, update_queue: "Queue[object]") -> BuilderTyp
self._update_queue = update_queue
return self

def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int]) -> BuilderType:
def concurrent_updates(
self: BuilderType, concurrent_updates: Union[bool, int, "BaseUpdateProcessor"]
) -> BuilderType:
"""Specifies if and how many updates may be processed concurrently instead of one by one.
If not called, updates will be processed one by one.

Expand All @@ -917,14 +922,34 @@ def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int])
.. seealso:: :attr:`telegram.ext.Application.concurrent_updates`

Args:
concurrent_updates (:obj:`bool` | :obj:`int`): Passing :obj:`True` will allow for
``256`` updates to be processed concurrently. Pass an integer to specify a
different number of updates that may be processed concurrently.
concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseUpdateProcessor`): Passing
:obj:`True` will allow for ``256`` updates to be processed concurrently using
:class:`telegram.ext.SimpleUpdateProcessor`. Pass an integer to specify a different
number of updates that may be processed concurrently. Pass an instance of
:class:`telegram.ext.BaseUpdateProcessor` to use that instance for handling updates
concurrently.

.. versionchanged:: NEXT.VERSION
Now accepts :class:`BaseUpdateProcessor` instances.

Returns:
:class:`ApplicationBuilder`: The same builder with the updated argument.
"""
self._concurrent_updates = concurrent_updates
# Check if concurrent updates is bool and convert to integer
if concurrent_updates is True:
concurrent_updates = 256
elif concurrent_updates is False:
concurrent_updates = 1

# If `concurrent_updates` is an integer, create a `SimpleUpdateProcessor`
# instance with that integer value; otherwise, raise an error if the value
# is negative
if isinstance(concurrent_updates, int):
concurrent_updates = SimpleUpdateProcessor(concurrent_updates)

# Assign default value of concurrent_updates if it is instance of
# `BaseUpdateProcessor`
self._update_processor: BaseUpdateProcessor = concurrent_updates # type: ignore[no-redef]
return self

def job_queue(
Expand Down