From 9c8d6efe7a6a1fd925ef47271372feeccc0a7e3b Mon Sep 17 00:00:00 2001 From: Bibo-Joshi <22366557+Bibo-Joshi@users.noreply.github.com> Date: Fri, 2 Jun 2023 22:17:46 +0200 Subject: [PATCH] Make Integration of `APScheduler` into `JobQueue` More Explicit (#3695) --- telegram/ext/_jobqueue.py | 68 +++++++++++++++++++++++++++++--------- tests/ext/test_jobqueue.py | 26 +++++++++++++++ 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/telegram/ext/_jobqueue.py b/telegram/ext/_jobqueue.py index 2b4b3d2a7ba..654eeb9d1f4 100644 --- a/telegram/ext/_jobqueue.py +++ b/telegram/ext/_jobqueue.py @@ -158,6 +158,30 @@ def application(self) -> "Application[Any, CCT, Any, Any, Any, JobQueue[CCT]]": return application raise RuntimeError("The application instance is no longer alive.") + @staticmethod + async def job_callback(job_queue: "JobQueue[CCT]", job: "Job[CCT]") -> None: + """This method is used as a callback for the APScheduler jobs. + + More precisely, the ``func`` argument of :class:`apscheduler.job.Job` is set to this method + and the ``arg`` argument (representing positional arguments to ``func``) is set to a tuple + containing the :class:`JobQueue` itself and the :class:`~telegram.ext.Job` instance. + + Tip: + This method is a static method rather than a bound method. This makes the arguments + more transparent and allows for easier handling of PTBs integration of APScheduler + when utilizing advanced features of APScheduler. + + Hint: + This method is effectively a wrapper for :meth:`telegram.ext.Job.run`. + + .. versionadded:: NEXT.VERSION + + Args: + job_queue (:class:`JobQueue`): The job queue that created the job. + job (:class:`~telegram.ext.Job`): The job to run. + """ + await job.run(job_queue.application) + def run_once( self, callback: JobCallback[CCT], @@ -230,11 +254,11 @@ async def callback(context: CallbackContext) date_time = self._parse_time_input(when, shift_day=True) j = self.scheduler.add_job( - job.run, + self.job_callback, name=name, trigger="date", run_date=date_time, - args=(self.application,), + args=(self, job), timezone=date_time.tzinfo or self.scheduler.timezone, **job_kwargs, ) @@ -356,9 +380,9 @@ async def callback(context: CallbackContext) interval = interval.total_seconds() j = self.scheduler.add_job( - job.run, + self.job_callback, trigger="interval", - args=(self.application,), + args=(self, job), start_date=dt_first, end_date=dt_last, seconds=interval, @@ -433,9 +457,9 @@ async def callback(context: CallbackContext) job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id) j = self.scheduler.add_job( - job.run, + self.job_callback, trigger="cron", - args=(self.application,), + args=(self, job), name=name, day="last" if day == -1 else day, hour=when.hour, @@ -523,9 +547,9 @@ async def callback(context: CallbackContext) job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id) j = self.scheduler.add_job( - job.run, + self.job_callback, name=name, - args=(self.application,), + args=(self, job), trigger="cron", day_of_week=",".join([self._CRON_MAPPING[d] for d in days]), hour=time.hour, @@ -585,7 +609,7 @@ async def callback(context: CallbackContext) name = name or callback.__name__ job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id) - j = self.scheduler.add_job(job.run, args=(self.application,), name=name, **job_kwargs) + j = self.scheduler.add_job(self.job_callback, args=(self, job), name=name, **job_kwargs) job._job = j # pylint: disable=protected-access return job @@ -625,10 +649,7 @@ def jobs(self) -> Tuple["Job[CCT]", ...]: Returns: Tuple[:class:`Job`]: Tuple of all *scheduled* jobs. """ - return tuple( - Job._from_aps_job(job) # pylint: disable=protected-access - for job in self.scheduler.get_jobs() - ) + return tuple(Job.from_aps_job(job) for job in self.scheduler.get_jobs()) def get_jobs_by_name(self, name: str) -> Tuple["Job[CCT]", ...]: """Returns a tuple of all *pending/scheduled* jobs with the given name that are currently @@ -821,8 +842,25 @@ def next_t(self) -> Optional[datetime.datetime]: return self.job.next_run_time @classmethod - def _from_aps_job(cls, job: "APSJob") -> "Job[CCT]": - return job.func.__self__ + def from_aps_job(cls, aps_job: "APSJob") -> "Job[CCT]": + """Provides the :class:`telegram.ext.Job` that is associated with the given APScheduler + job. + + Tip: + This method can be useful when using advanced APScheduler features along with + :class:`telegram.ext.JobQueue`. + + .. versionadded:: NEXT.VERSION + + Args: + aps_job (:class:`apscheduler.job.Job`): The APScheduler job + + Returns: + :class:`telegram.ext.Job` + """ + ext_job = aps_job.args[1] + ext_job._job = aps_job # pylint: disable=protected-access + return ext_job def __getattr__(self, item: str) -> object: try: diff --git a/tests/ext/test_jobqueue.py b/tests/ext/test_jobqueue.py index b77d8b087ab..e95feaf3286 100644 --- a/tests/ext/test_jobqueue.py +++ b/tests/ext/test_jobqueue.py @@ -615,3 +615,29 @@ async def callback(_): else: await asyncio.sleep(0.1) # unfortunately we will get a CancelledError here assert task.done() + + async def test_from_aps_job(self, job_queue): + job = job_queue.run_once(self.job_run_once, 0.1, name="test_job") + aps_job = job_queue.scheduler.get_job(job.id) + + tg_job = Job.from_aps_job(aps_job) + assert tg_job is job + assert tg_job.job is aps_job + + async def test_from_aps_job_missing_reference(self, job_queue): + """We manually create a ext.Job and an aps job such that the former has no reference to the + latter. Then we test that Job.from_aps_job() still sets the reference correctly. + """ + job = Job(self.job_run_once) + aps_job = job_queue.scheduler.add_job( + func=job_queue.job_callback, + args=(job_queue, job), + trigger="interval", + seconds=2, + id="test_id", + ) + + assert job.job is None + tg_job = Job.from_aps_job(aps_job) + assert tg_job is job + assert tg_job.job is aps_job