diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 8c657d57b34..0c2884dc910 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -1251,30 +1251,43 @@ async def process_update(self, update: object) -> None: try: for handler in handlers: check = handler.check_update(update) # Should the handler handle this update? - if not (check is None or check is False): # if yes, - if not context: # build a context if not already built + if check is None or check is False: + continue + + if not context: # build a context if not already built + try: context = self.context_types.context.from_update(update, self) - await context.refresh_data() - coroutine: Coroutine = handler.handle_update(update, self, check, context) - - if not handler.block or ( # if handler is running with block=False, - handler.block is DEFAULT_TRUE - and isinstance(self.bot, ExtBot) - and self.bot.defaults - and not self.bot.defaults.block - ): - self.create_task( - coroutine, - update=update, - name=( - f"Application:{self.bot.id}:process_update_non_blocking" - f":{handler}" + except Exception as exc: + _LOGGER.critical( + ( + "Error while building CallbackContext for update %s. " + "Update will not be processed." ), + update, + exc_info=exc, ) - else: - any_blocking = True - await coroutine - break # Only a max of 1 handler per group is handled + return + await context.refresh_data() + coroutine: Coroutine = handler.handle_update(update, self, check, context) + + if not handler.block or ( # if handler is running with block=False, + handler.block is DEFAULT_TRUE + and isinstance(self.bot, ExtBot) + and self.bot.defaults + and not self.bot.defaults.block + ): + self.create_task( + coroutine, + update=update, + name=( + f"Application:{self.bot.id}:process_update_non_blocking" + f":{handler}" + ), + ) + else: + any_blocking = True + await coroutine + break # Only a max of 1 handler per group is handled # Stop processing with any other handler. except ApplicationHandlerStop: @@ -1808,13 +1821,25 @@ async def process_error( callback, block, ) in self.error_handlers.items(): - context = self.context_types.context.from_error( - update=update, - error=error, - application=self, - job=job, - coroutine=coroutine, - ) + try: + context = self.context_types.context.from_error( + update=update, + error=error, + application=self, + job=job, + coroutine=coroutine, + ) + except Exception as exc: + _LOGGER.critical( + ( + "Error while building CallbackContext for exception %s. " + "Exception will not be processed by error handlers." + ), + error, + exc_info=exc, + ) + return False + if not block or ( # If error handler has `block=False`, create a Task to run cb block is DEFAULT_TRUE and isinstance(self.bot, ExtBot) diff --git a/telegram/ext/_jobqueue.py b/telegram/ext/_jobqueue.py index 1229659f6f9..6edd5a892ea 100644 --- a/telegram/ext/_jobqueue.py +++ b/telegram/ext/_jobqueue.py @@ -31,6 +31,7 @@ except ImportError: APS_AVAILABLE = False +from telegram._utils.logging import get_logger from telegram._utils.repr import build_repr_with_selected_attrs from telegram._utils.types import JSONDict from telegram.ext._extbot import ExtBot @@ -44,6 +45,7 @@ _ALL_DAYS = tuple(range(7)) +_LOGGER = get_logger(__name__, class_name="JobQueue") class JobQueue(Generic[CCT]): @@ -953,7 +955,16 @@ async def _run( self, application: "Application[Any, CCT, Any, Any, Any, JobQueue[CCT]]" ) -> None: try: - context = application.context_types.context.from_job(self, application) + try: + context = application.context_types.context.from_job(self, application) + except Exception as exc: + _LOGGER.critical( + "Error while building CallbackContext for job %s. Job will not be run.", + self._job, + exc_info=exc, + ) + return + await context.refresh_data() await self.callback(context) except Exception as exc: diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 9057dcecaca..78997b2c5de 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -2421,3 +2421,83 @@ async def callback(update, context): assert len(assertions) == 5 for key, value in assertions.items(): assert value, f"assertion '{key}' failed!" + + async def test_process_update_exception_in_building_context(self, monkeypatch, caplog, app): + # Makes sure that exceptions in building the context don't stop the application + exception = ValueError("TestException") + original_from_update = CallbackContext.from_update + + def raise_exception(update, application): + if update == 1: + raise exception + return original_from_update(update, application) + + monkeypatch.setattr(CallbackContext, "from_update", raise_exception) + + received_updates = set() + + async def callback(update, context): + received_updates.add(update) + + app.add_handler(TypeHandler(int, callback)) + + async with app: + with caplog.at_level(logging.CRITICAL): + await app.process_update(1) + + assert received_updates == set() + assert len(caplog.records) == 1 + record = caplog.records[0] + assert record.name == "telegram.ext.Application" + assert record.getMessage().startswith( + "Error while building CallbackContext for update 1" + ) + assert record.levelno == logging.CRITICAL + + # Let's also check that no critical log is produced when the exception is not raised + caplog.clear() + with caplog.at_level(logging.CRITICAL): + await app.process_update(2) + + assert received_updates == {2} + assert len(caplog.records) == 0 + + async def test_process_error_exception_in_building_context(self, monkeypatch, caplog, app): + # Makes sure that exceptions in building the context don't stop the application + exception = ValueError("TestException") + original_from_error = CallbackContext.from_error + + def raise_exception(update, error, application, *args, **kwargs): + if error == 1: + raise exception + return original_from_error(update, error, application, *args, **kwargs) + + monkeypatch.setattr(CallbackContext, "from_error", raise_exception) + + received_errors = set() + + async def callback(update, context): + received_errors.add(context.error) + + app.add_error_handler(callback) + + async with app: + with caplog.at_level(logging.CRITICAL): + await app.process_error(update=None, error=1) + + assert received_errors == set() + assert len(caplog.records) == 1 + record = caplog.records[0] + assert record.name == "telegram.ext.Application" + assert record.getMessage().startswith( + "Error while building CallbackContext for exception 1" + ) + assert record.levelno == logging.CRITICAL + + # Let's also check that no critical log is produced when the exception is not raised + caplog.clear() + with caplog.at_level(logging.CRITICAL): + await app.process_error(update=None, error=2) + + assert received_errors == {2} + assert len(caplog.records) == 0 diff --git a/tests/ext/test_jobqueue.py b/tests/ext/test_jobqueue.py index 0a3723763d9..929591d38b9 100644 --- a/tests/ext/test_jobqueue.py +++ b/tests/ext/test_jobqueue.py @@ -646,3 +646,44 @@ async def test_from_aps_job_missing_reference(self, job_queue): tg_job = Job.from_aps_job(aps_job) assert tg_job is job assert tg_job.job is aps_job + + async def test_run_job_exception_in_building_context( + self, monkeypatch, job_queue, caplog, app + ): + # Makes sure that exceptions in building the context don't stop the application + exception = ValueError("TestException") + original_from_job = CallbackContext.from_job + + def raise_exception(job, application): + if job.data == 1: + raise exception + return original_from_job(job, application) + + monkeypatch.setattr(CallbackContext, "from_job", raise_exception) + + received_jobs = set() + + async def job_callback(context): + received_jobs.add(context.job.data) + + with caplog.at_level(logging.CRITICAL): + job_queue.run_once(job_callback, 0.1, data=1) + await asyncio.sleep(0.2) + + assert received_jobs == set() + assert len(caplog.records) == 1 + record = caplog.records[0] + assert record.name == "telegram.ext.JobQueue" + assert record.getMessage().startswith( + "Error while building CallbackContext for job job_callback" + ) + assert record.levelno == logging.CRITICAL + + # Let's also check that no critical log is produced when the exception is not raised + caplog.clear() + with caplog.at_level(logging.CRITICAL): + job_queue.run_once(job_callback, 0.1, data=2) + await asyncio.sleep(0.2) + + assert received_jobs == {2} + assert len(caplog.records) == 0