Skip to content

Commit

Permalink
Introduce BaseUpdateProcessor for Customized Concurrent Handling of…
Browse files Browse the repository at this point in the history
… Updates (#3654)

Co-authored-by: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com>
  • Loading branch information
clot27 and Bibo-Joshi committed Jun 2, 2023
1 parent 4c8d733 commit bf54599
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 72 deletions.
6 changes: 6 additions & 0 deletions docs/source/telegram.ext.baseupdateprocessor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BaseUpdateProcessor
===================

.. autoclass:: telegram.ext.BaseUpdateProcessor
:members:
:show-inheritance:
2 changes: 2 additions & 0 deletions docs/source/telegram.ext.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ telegram.ext package
telegram.ext.application
telegram.ext.applicationbuilder
telegram.ext.applicationhandlerstop
telegram.ext.baseupdateprocessor
telegram.ext.callbackcontext
telegram.ext.contexttypes
telegram.ext.defaults
telegram.ext.extbot
telegram.ext.job
telegram.ext.jobqueue
telegram.ext.simpleupdateprocessor
telegram.ext.updater
telegram.ext.handlers-tree.rst
telegram.ext.persistence-tree.rst
Expand Down
6 changes: 6 additions & 0 deletions docs/source/telegram.ext.simpleupdateprocessor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SimpleUpdateProcessor
=====================

.. autoclass:: telegram.ext.SimpleUpdateProcessor
:members:
:show-inheritance:
3 changes: 3 additions & 0 deletions telegram/ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"BaseHandler",
"BasePersistence",
"BaseRateLimiter",
"BaseUpdateProcessor",
"CallbackContext",
"CallbackDataCache",
"CallbackQueryHandler",
Expand All @@ -51,6 +52,7 @@
"PreCheckoutQueryHandler",
"PrefixHandler",
"ShippingQueryHandler",
"SimpleUpdateProcessor",
"StringCommandHandler",
"StringRegexHandler",
"TypeHandler",
Expand All @@ -63,6 +65,7 @@
from ._applicationbuilder import ApplicationBuilder
from ._basepersistence import BasePersistence, PersistenceInput
from ._baseratelimiter import BaseRateLimiter
from ._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor
from ._callbackcontext import CallbackContext
from ._callbackdatacache import CallbackDataCache, InvalidCallbackData
from ._callbackqueryhandler import CallbackQueryHandler
Expand Down
52 changes: 35 additions & 17 deletions telegram/ext/_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from telegram._utils.warnings import warn
from telegram.error import TelegramError
from telegram.ext._basepersistence import BasePersistence
from telegram.ext._baseupdateprocessor import BaseUpdateProcessor
from telegram.ext._contexttypes import ContextTypes
from telegram.ext._extbot import ExtBot
from telegram.ext._handler import BaseHandler
Expand Down Expand Up @@ -228,12 +229,11 @@ class Application(Generic[BT, CCT, UD, CD, BD, JQ], AsyncContextManager["Applica
"_chat_data",
"_chat_ids_to_be_deleted_in_persistence",
"_chat_ids_to_be_updated_in_persistence",
"_concurrent_updates",
"_concurrent_updates_sem",
"_conversation_handler_conversations",
"_initialized",
"_job_queue",
"_running",
"_update_processor",
"_user_data",
"_user_ids_to_be_deleted_in_persistence",
"_user_ids_to_be_updated_in_persistence",
Expand All @@ -259,7 +259,7 @@ def __init__(
update_queue: "asyncio.Queue[object]",
updater: Optional[Updater],
job_queue: JQ,
concurrent_updates: Union[bool, int],
update_processor: "BaseUpdateProcessor",
persistence: Optional[BasePersistence[UD, CD, BD]],
context_types: ContextTypes[CCT, UD, CD, BD],
post_init: Optional[
Expand Down Expand Up @@ -297,14 +297,7 @@ def __init__(
self.post_stop: Optional[
Callable[["Application[BT, CCT, UD, CD, BD, JQ]"], Coroutine[Any, Any, None]]
] = post_stop

if isinstance(concurrent_updates, int) and concurrent_updates < 0:
raise ValueError("`concurrent_updates` must be a non-negative integer!")
if concurrent_updates is True:
concurrent_updates = 256
self._concurrent_updates_sem = asyncio.BoundedSemaphore(concurrent_updates or 1)
self._concurrent_updates: int = concurrent_updates or 0

self._update_processor = update_processor
self.bot_data: BD = self.context_types.bot_data()
self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data)
self._chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data)
Expand Down Expand Up @@ -359,9 +352,13 @@ def concurrent_updates(self) -> int:
""":obj:`int`: The number of concurrent updates that will be processed in parallel. A
value of ``0`` indicates updates are *not* being processed concurrently.
.. versionchanged:: NEXT.VERSION
This is now just a shortcut to :attr:`update_processor.max_concurrent_updates
<telegram.ext.BaseUpdateProcessor.max_concurrent_updates>`.
.. seealso:: :wiki:`Concurrency`
"""
return self._concurrent_updates
return self._update_processor.max_concurrent_updates

@property
def job_queue(self) -> Optional["JobQueue[CCT]"]:
Expand All @@ -379,12 +376,25 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]:
)
return self._job_queue

@property
def update_processor(self) -> "BaseUpdateProcessor":
""":class:`telegram.ext.BaseUpdateProcessor`: The update processor used by this
application.
.. seealso:: :wiki:`Concurrency`
.. versionadded:: NEXT.VERSION
"""
return self._update_processor

async def initialize(self) -> None:
"""Initializes the Application by initializing:
* The :attr:`bot`, by calling :meth:`telegram.Bot.initialize`.
* The :attr:`updater`, by calling :meth:`telegram.ext.Updater.initialize`.
* The :attr:`persistence`, by loading persistent conversations and data.
* The :attr:`update_processor` by calling
:meth:`telegram.ext.BaseUpdateProcessor.initialize`.
Does *not* call :attr:`post_init` - that is only done by :meth:`run_polling` and
:meth:`run_webhook`.
Expand All @@ -397,6 +407,8 @@ async def initialize(self) -> None:
return

await self.bot.initialize()
await self._update_processor.initialize()

if self.updater:
await self.updater.initialize()

Expand Down Expand Up @@ -429,6 +441,7 @@ async def shutdown(self) -> None:
* :attr:`updater` by calling :meth:`telegram.ext.Updater.shutdown`
* :attr:`persistence` by calling :meth:`update_persistence` and
:meth:`BasePersistence.flush`
* :attr:`update_processor` by calling :meth:`telegram.ext.BaseUpdateProcessor.shutdown`
Does *not* call :attr:`post_shutdown` - that is only done by :meth:`run_polling` and
:meth:`run_webhook`.
Expand All @@ -447,6 +460,8 @@ async def shutdown(self) -> None:
return

await self.bot.shutdown()
await self._update_processor.shutdown()

if self.updater:
await self.updater.shutdown()

Expand Down Expand Up @@ -1060,11 +1075,15 @@ async def _update_fetcher(self) -> None:

_LOGGER.debug("Processing update %s", update)

if self._concurrent_updates:
if self._update_processor.max_concurrent_updates > 1:
# We don't await the below because it has to be run concurrently
self.create_task(self.__process_update_wrapper(update), update=update)
self.create_task(
self.__process_update_wrapper(update),
update=update,
)
else:
await self.__process_update_wrapper(update)

except asyncio.CancelledError:
# This may happen if the application is manually run via application.start() and
# then a KeyboardInterrupt is sent. We must prevent this loop to die since
Expand All @@ -1075,9 +1094,8 @@ async def _update_fetcher(self) -> None:
)

async def __process_update_wrapper(self, update: object) -> None:
async with self._concurrent_updates_sem:
await self.process_update(update)
self.update_queue.task_done()
await self._update_processor.process_update(update, self.process_update(update))
self.update_queue.task_done()

async def process_update(self, update: object) -> None:
"""Processes a single update and marks the update to be updated by the persistence later.
Expand Down
41 changes: 33 additions & 8 deletions telegram/ext/_applicationbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue
from telegram._utils.types import DVInput, DVType, FilePathInput, ODVInput
from telegram.ext._application import Application
from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor
from telegram.ext._contexttypes import ContextTypes
from telegram.ext._extbot import ExtBot
from telegram.ext._jobqueue import JobQueue
Expand Down Expand Up @@ -127,7 +128,7 @@ class ApplicationBuilder(Generic[BT, CCT, UD, CD, BD, JQ]):
"_base_file_url",
"_base_url",
"_bot",
"_concurrent_updates",
"_update_processor",
"_connect_timeout",
"_connection_pool_size",
"_context_types",
Expand Down Expand Up @@ -198,7 +199,9 @@ def __init__(self: "InitApplicationBuilder"):
self._context_types: DVType[ContextTypes] = DefaultValue(ContextTypes())
self._application_class: DVType[Type[Application]] = DefaultValue(Application)
self._application_kwargs: Dict[str, object] = {}
self._concurrent_updates: Union[int, DefaultValue[bool]] = DEFAULT_FALSE
self._update_processor: "BaseUpdateProcessor" = SimpleUpdateProcessor(
max_concurrent_updates=1
)
self._updater: ODVInput[Updater] = DEFAULT_NONE
self._post_init: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
self._post_shutdown: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
Expand Down Expand Up @@ -306,7 +309,7 @@ def build(
bot=bot,
update_queue=update_queue,
updater=updater,
concurrent_updates=DefaultValue.get_value(self._concurrent_updates),
update_processor=self._update_processor,
job_queue=job_queue,
persistence=persistence,
context_types=DefaultValue.get_value(self._context_types),
Expand Down Expand Up @@ -902,7 +905,9 @@ def update_queue(self: BuilderType, update_queue: "Queue[object]") -> BuilderTyp
self._update_queue = update_queue
return self

def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int]) -> BuilderType:
def concurrent_updates(
self: BuilderType, concurrent_updates: Union[bool, int, "BaseUpdateProcessor"]
) -> BuilderType:
"""Specifies if and how many updates may be processed concurrently instead of one by one.
If not called, updates will be processed one by one.
Expand All @@ -917,14 +922,34 @@ def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int])
.. seealso:: :attr:`telegram.ext.Application.concurrent_updates`
Args:
concurrent_updates (:obj:`bool` | :obj:`int`): Passing :obj:`True` will allow for
``256`` updates to be processed concurrently. Pass an integer to specify a
different number of updates that may be processed concurrently.
concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseUpdateProcessor`): Passing
:obj:`True` will allow for ``256`` updates to be processed concurrently using
:class:`telegram.ext.SimpleUpdateProcessor`. Pass an integer to specify a different
number of updates that may be processed concurrently. Pass an instance of
:class:`telegram.ext.BaseUpdateProcessor` to use that instance for handling updates
concurrently.
.. versionchanged:: NEXT.VERSION
Now accepts :class:`BaseUpdateProcessor` instances.
Returns:
:class:`ApplicationBuilder`: The same builder with the updated argument.
"""
self._concurrent_updates = concurrent_updates
# Check if concurrent updates is bool and convert to integer
if concurrent_updates is True:
concurrent_updates = 256
elif concurrent_updates is False:
concurrent_updates = 1

# If `concurrent_updates` is an integer, create a `SimpleUpdateProcessor`
# instance with that integer value; otherwise, raise an error if the value
# is negative
if isinstance(concurrent_updates, int):
concurrent_updates = SimpleUpdateProcessor(concurrent_updates)

# Assign default value of concurrent_updates if it is instance of
# `BaseUpdateProcessor`
self._update_processor: BaseUpdateProcessor = concurrent_updates # type: ignore[no-redef]
return self

def job_queue(
Expand Down

0 comments on commit bf54599

Please sign in to comment.