Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customized concurrent handling #3654

Merged
merged 33 commits into from Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
46f5355
add customized concurrent handling
Apr 9, 2023
e3245c1
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 9, 2023
1ccfeae
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 10, 2023
a8bf427
add some requested changes
Apr 11, 2023
ecf2721
add docstrings
Apr 13, 2023
f9d8f30
some more improvements
Apr 14, 2023
ab95f79
exclude making new tasks when concurrent_updates is 0
Apr 17, 2023
2be64f2
requested changes
Apr 19, 2023
249caa4
Merge branch 'master' of github.com:python-telegram-bot/python-telegr…
Apr 19, 2023
88d11ba
add tests and requested changes
Apr 25, 2023
99e556e
fix tests and docstrings
Apr 25, 2023
19d47fe
add slots
Apr 25, 2023
c922b9c
extend test coverage
Apr 27, 2023
5f4ab1d
fix tests
May 2, 2023
f450e20
Merge branch 'master' into customized-concurrent-handling
clot27 May 2, 2023
a2bf83d
Merge branch 'master' into customized-concurrent-handling
clot27 May 7, 2023
3c8f4e5
add tests and some requested changes
May 7, 2023
9c0fa9a
fix tests
May 13, 2023
5b7c4a0
really fix them this time
May 13, 2023
f3a6585
Pimp my tests :)
Bibo-Joshi May 17, 2023
7a30965
Merge branch 'master' into customized-concurrent-handling
Bibo-Joshi May 17, 2023
d535c46
Fix failing tests
Bibo-Joshi May 17, 2023
bb7caf8
fix docs
May 18, 2023
b60c119
add test for SUP with 'is'
May 19, 2023
39b1adb
add final requested changes
May 20, 2023
6cc2102
Fine tune tests
Bibo-Joshi May 20, 2023
764a5d0
Add some versioning directives
Bibo-Joshi May 20, 2023
59db8d3
Merge branch 'master' into customized-concurrent-handling
Bibo-Joshi May 20, 2023
9aeeb08
typo
Bibo-Joshi May 20, 2023
168eac4
address final review
May 30, 2023
550431c
fix pre-commit
May 30, 2023
d03f81e
Update docs/source/telegram.ext.simpleupdateprocessor.rst
clot27 Jun 2, 2023
429c023
Merge branch 'master' into customized-concurrent-handling
clot27 Jun 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/telegram.ext.baseupdateprocessor.rst
@@ -0,0 +1,6 @@
BaseUpdateProcessor
===================

.. autoclass:: telegram.ext.BaseUpdateProcessor
:members:
:show-inheritance:
6 changes: 6 additions & 0 deletions docs/source/telegram.ext.simpleupdateprocessor.rst
@@ -0,0 +1,6 @@
SimpleUpdateProcessor
=====================

.. autoclass:: telegram.ext.SimpleUpdateProcessor
:members:
:show-inheritance:
3 changes: 3 additions & 0 deletions telegram/ext/__init__.py
Expand Up @@ -25,6 +25,7 @@
"ApplicationHandlerStop",
"BaseHandler",
"BasePersistence",
"BaseUpdateProcessor",
"BaseRateLimiter",
clot27 marked this conversation as resolved.
Show resolved Hide resolved
"CallbackContext",
"CallbackDataCache",
Expand All @@ -51,6 +52,7 @@
"PreCheckoutQueryHandler",
"PrefixHandler",
"ShippingQueryHandler",
"SimpleUpdateProcessor",
"StringCommandHandler",
"StringRegexHandler",
"TypeHandler",
Expand All @@ -63,6 +65,7 @@
from ._applicationbuilder import ApplicationBuilder
from ._basepersistence import BasePersistence, PersistenceInput
from ._baseratelimiter import BaseRateLimiter
from ._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor
from ._callbackcontext import CallbackContext
from ._callbackdatacache import CallbackDataCache, InvalidCallbackData
from ._callbackqueryhandler import CallbackQueryHandler
Expand Down
37 changes: 25 additions & 12 deletions telegram/ext/_application.py
Expand Up @@ -57,6 +57,7 @@
from telegram._utils.warnings import warn
from telegram.error import TelegramError
from telegram.ext._basepersistence import BasePersistence
from telegram.ext._baseupdateprocessor import BaseUpdateProcessor
from telegram.ext._contexttypes import ContextTypes
from telegram.ext._extbot import ExtBot
from telegram.ext._handler import BaseHandler
Expand Down Expand Up @@ -259,7 +260,7 @@ def __init__(
update_queue: "asyncio.Queue[object]",
updater: Optional[Updater],
job_queue: JQ,
concurrent_updates: Union[bool, int],
update_processor: "BaseUpdateProcessor",
persistence: Optional[BasePersistence[UD, CD, BD]],
context_types: ContextTypes[CCT, UD, CD, BD],
post_init: Optional[
Expand Down Expand Up @@ -297,14 +298,10 @@ def __init__(
self.post_stop: Optional[
Callable[["Application[BT, CCT, UD, CD, BD, JQ]"], Coroutine[Any, Any, None]]
] = post_stop

if isinstance(concurrent_updates, int) and concurrent_updates < 0:
raise ValueError("`concurrent_updates` must be a non-negative integer!")
if concurrent_updates is True:
concurrent_updates = 256
self._concurrent_updates_sem = asyncio.BoundedSemaphore(concurrent_updates or 1)
self._concurrent_updates: int = concurrent_updates or 0

self._concurrent_updates_sem = asyncio.BoundedSemaphore(
update_processor.max_concurrent_updates or 1
)
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self._update_processor = update_processor
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self.bot_data: BD = self.context_types.bot_data()
self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data)
self._chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data)
Expand Down Expand Up @@ -359,9 +356,12 @@ def concurrent_updates(self) -> int:
""":obj:`int`: The number of concurrent updates that will be processed in parallel. A
value of ``0`` indicates updates are *not* being processed concurrently.

Note:
This is now just a shortcut to `processor.max_concurrent_updates`.
clot27 marked this conversation as resolved.
Show resolved Hide resolved

.. seealso:: :wiki:`Concurrency`
"""
return self._concurrent_updates
return self._update_processor.max_concurrent_updates

@property
def job_queue(self) -> Optional["JobQueue[CCT]"]:
Expand All @@ -379,6 +379,14 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]:
)
return self._job_queue

@property
def processor(self) -> Optional["BaseUpdateProcessor"]:
""":class:`telegram.ext.BaseProcessor`: The processor used by this application.
clot27 marked this conversation as resolved.
Show resolved Hide resolved

.. seealso:: :wiki:`Concurrency`
"""
return self._update_processor

async def initialize(self) -> None:
"""Initializes the Application by initializing:

Expand Down Expand Up @@ -1059,9 +1067,14 @@ async def _update_fetcher(self) -> None:

_LOGGER.debug("Processing update %s", update)

if self._concurrent_updates:
if self._update_processor and self._update_processor.max_concurrent_updates > 0:
clot27 marked this conversation as resolved.
Show resolved Hide resolved
# We don't await the below because it has to be run concurrently
self.create_task(self.__process_update_wrapper(update), update=update)
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self.create_task(
self._update_processor.do_process_update(
update, self.process_update(update), self
),
update=update,
)
else:
await self.__process_update_wrapper(update)

Expand Down
38 changes: 31 additions & 7 deletions telegram/ext/_applicationbuilder.py
Expand Up @@ -36,6 +36,7 @@
from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue
from telegram._utils.types import DVInput, DVType, FilePathInput, ODVInput
from telegram.ext._application import Application
from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor
from telegram.ext._contexttypes import ContextTypes
from telegram.ext._extbot import ExtBot
from telegram.ext._jobqueue import JobQueue
Expand Down Expand Up @@ -198,7 +199,9 @@ def __init__(self: "InitApplicationBuilder"):
self._context_types: DVType[ContextTypes] = DefaultValue(ContextTypes())
self._application_class: DVType[Type[Application]] = DefaultValue(Application)
self._application_kwargs: Dict[str, object] = {}
self._concurrent_updates: Union[int, DefaultValue[bool]] = DEFAULT_FALSE
self._concurrent_updates: Union[
int, DefaultValue[bool], "BaseUpdateProcessor"
] = DEFAULT_FALSE
clot27 marked this conversation as resolved.
Show resolved Hide resolved
self._updater: ODVInput[Updater] = DEFAULT_NONE
self._post_init: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
self._post_shutdown: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None
Expand Down Expand Up @@ -306,7 +309,7 @@ def build(
bot=bot,
update_queue=update_queue,
updater=updater,
concurrent_updates=DefaultValue.get_value(self._concurrent_updates),
update_processor=DefaultValue.get_value(self._concurrent_updates),
clot27 marked this conversation as resolved.
Show resolved Hide resolved
job_queue=job_queue,
persistence=persistence,
context_types=DefaultValue.get_value(self._context_types),
Expand Down Expand Up @@ -901,7 +904,9 @@ def update_queue(self: BuilderType, update_queue: "Queue[object]") -> BuilderTyp
self._update_queue = update_queue
return self

def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int]) -> BuilderType:
def concurrent_updates(
self: BuilderType, concurrent_updates: Union[bool, int, "BaseUpdateProcessor"]
) -> BuilderType:
"""Specifies if and how many updates may be processed concurrently instead of one by one.
If not called, updates will be processed one by one.

Expand All @@ -916,14 +921,33 @@ def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int])
.. seealso:: :attr:`telegram.ext.Application.concurrent_updates`

Args:
concurrent_updates (:obj:`bool` | :obj:`int`): Passing :obj:`True` will allow for
``256`` updates to be processed concurrently. Pass an integer to specify a
different number of updates that may be processed concurrently.
concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseUpdateProcessor`): Passing
:obj:`True` will allow for ``256`` updates to be processed concurrently. Pass an
integer to specify a different number of updates that may be processed concurrently,
in that case :class:`telegram.ext.SimpleUpdateProcessor` is used to handle updates.
Pass an instance of :class:`telegram.ext.BaseUpdateProcessor` to use that instance
for handling updates concurrently.
clot27 marked this conversation as resolved.
Show resolved Hide resolved

Returns:
:class:`ApplicationBuilder`: The same builder with the updated argument.
"""
self._concurrent_updates = concurrent_updates
# Check if concurrent updates is bool and convert to integer
if concurrent_updates is True:
concurrent_updates = 256
elif concurrent_updates is False:
concurrent_updates = 0
clot27 marked this conversation as resolved.
Show resolved Hide resolved

# If `concurrent_updates` is an integer, create a `SimpleUpdateProcessor`
# instance with that integer value; otherwise, raise an error if the value
# is negative
if isinstance(concurrent_updates, int):
if concurrent_updates < 0:
raise ValueError("`concurrent_updates` must be a non-negative integer!")
clot27 marked this conversation as resolved.
Show resolved Hide resolved
concurrent_updates = SimpleUpdateProcessor(concurrent_updates)

# Assign default value of concurrent_updates if it is instance of
# `BaseUpdateProcessor`
self._concurrent_updates: BaseUpdateProcessor = concurrent_updates
return self

def job_queue(
Expand Down
140 changes: 140 additions & 0 deletions telegram/ext/_baseupdateprocessor.py
@@ -0,0 +1,140 @@
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2023
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the BaseProcessor class."""
from abc import abstractmethod
from asyncio import BoundedSemaphore
from types import TracebackType
from typing import Any, Awaitable, Optional, Type

from telegram.ext import Application


class BaseUpdateProcessor:
clot27 marked this conversation as resolved.
Show resolved Hide resolved
"""An abstract base class for update processors. You can use this class to implement
your own update processor.

.. seealso:: :wiki:`Concurrency`

.. versionadded:: NEXT.VERSION

Args:
max_concurrent_updates (:obj:`int`): The maximum number of concurrent updates to be
processed. If this number is exceeded, new updates will be queued until the number of
currently processed updates decreases.
clot27 marked this conversation as resolved.
Show resolved Hide resolved

Raises:
:exc:`ValueError`: If `max_concurrent_updates` is a negative integer.
clot27 marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, max_concurrent_updates: int):
self._max_concurrent_updates = max_concurrent_updates
if self.max_concurrent_updates < 0:
clot27 marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("`max_concurrent_updates` must be a non-negative integer!")
self._semaphore = BoundedSemaphore(self.max_concurrent_updates or 1)
clot27 marked this conversation as resolved.
Show resolved Hide resolved

@property
def max_concurrent_updates(self) -> int:
""":obj:`int`: The maximum number of updates that can be processed concurrently."""
return self._max_concurrent_updates

@abstractmethod
async def process_update(
self,
update: object,
coroutine: "Awaitable[Any]",
) -> None:
"""Custom implementation of how to process an update.
clot27 marked this conversation as resolved.
Show resolved Hide resolved

Args:
update (:obj:`object`): The update to be processed.
coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update.
clot27 marked this conversation as resolved.
Show resolved Hide resolved
"""

@abstractmethod
async def initialize(self) -> None:
"""Initializes the Processor so resources can be allocated.
clot27 marked this conversation as resolved.
Show resolved Hide resolved

.. seealso::
:meth:`shutdown`
"""

@abstractmethod
async def shutdown(self) -> None:
"""Shutdown the Processor so resources can be freed.
clot27 marked this conversation as resolved.
Show resolved Hide resolved

.. seealso::
:meth:`initialize`
"""

async def do_process_update(
self,
update: object,
coroutine: "Awaitable[Any]",
application: Application,
) -> None:
"""Calls :meth:`process_update` with a semaphore to limit the number of concurrent
updates.

Args:
update (:obj:`object`): The update to be processed.
coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update.
application (:class:`telegram.ext.Application`): The application instance.
"""
async with self._semaphore:
await self.process_update(update, coroutine)
application.update_queue.task_done()
clot27 marked this conversation as resolved.
Show resolved Hide resolved

async def __aenter__(self) -> "BaseUpdateProcessor":
"""Simple context manager which initializes the Processor."""
try:
await self.initialize()
return self
except Exception as exc:
await self.shutdown()
raise exc
clot27 marked this conversation as resolved.
Show resolved Hide resolved

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Shutdown the Processor from the context manager."""
await self.shutdown()


class SimpleUpdateProcessor(BaseUpdateProcessor):
clot27 marked this conversation as resolved.
Show resolved Hide resolved
async def process_update(
self,
update: object,
coroutine: "Awaitable[Any]",
) -> None:
"""Immediately awaits the coroutine, i.e. does not apply any additional processing.

Args:
update (:obj:`object`): The update to be processed.
coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update.
"""
await coroutine

async def initialize(self) -> None:
"""Does nothing."""

async def shutdown(self) -> None:
"""Does nothing."""