Skip to content

Commit

Permalink
Make Integration of APScheduler into JobQueue More Explicit (#3695)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bibo-Joshi committed Jun 2, 2023
1 parent bf54599 commit 9c8d6ef
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 15 deletions.
68 changes: 53 additions & 15 deletions telegram/ext/_jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,30 @@ def application(self) -> "Application[Any, CCT, Any, Any, Any, JobQueue[CCT]]":
return application
raise RuntimeError("The application instance is no longer alive.")

@staticmethod
async def job_callback(job_queue: "JobQueue[CCT]", job: "Job[CCT]") -> None:
"""This method is used as a callback for the APScheduler jobs.
More precisely, the ``func`` argument of :class:`apscheduler.job.Job` is set to this method
and the ``arg`` argument (representing positional arguments to ``func``) is set to a tuple
containing the :class:`JobQueue` itself and the :class:`~telegram.ext.Job` instance.
Tip:
This method is a static method rather than a bound method. This makes the arguments
more transparent and allows for easier handling of PTBs integration of APScheduler
when utilizing advanced features of APScheduler.
Hint:
This method is effectively a wrapper for :meth:`telegram.ext.Job.run`.
.. versionadded:: NEXT.VERSION
Args:
job_queue (:class:`JobQueue`): The job queue that created the job.
job (:class:`~telegram.ext.Job`): The job to run.
"""
await job.run(job_queue.application)

def run_once(
self,
callback: JobCallback[CCT],
Expand Down Expand Up @@ -230,11 +254,11 @@ async def callback(context: CallbackContext)
date_time = self._parse_time_input(when, shift_day=True)

j = self.scheduler.add_job(
job.run,
self.job_callback,
name=name,
trigger="date",
run_date=date_time,
args=(self.application,),
args=(self, job),
timezone=date_time.tzinfo or self.scheduler.timezone,
**job_kwargs,
)
Expand Down Expand Up @@ -356,9 +380,9 @@ async def callback(context: CallbackContext)
interval = interval.total_seconds()

j = self.scheduler.add_job(
job.run,
self.job_callback,
trigger="interval",
args=(self.application,),
args=(self, job),
start_date=dt_first,
end_date=dt_last,
seconds=interval,
Expand Down Expand Up @@ -433,9 +457,9 @@ async def callback(context: CallbackContext)
job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id)

j = self.scheduler.add_job(
job.run,
self.job_callback,
trigger="cron",
args=(self.application,),
args=(self, job),
name=name,
day="last" if day == -1 else day,
hour=when.hour,
Expand Down Expand Up @@ -523,9 +547,9 @@ async def callback(context: CallbackContext)
job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id)

j = self.scheduler.add_job(
job.run,
self.job_callback,
name=name,
args=(self.application,),
args=(self, job),
trigger="cron",
day_of_week=",".join([self._CRON_MAPPING[d] for d in days]),
hour=time.hour,
Expand Down Expand Up @@ -585,7 +609,7 @@ async def callback(context: CallbackContext)
name = name or callback.__name__
job = Job(callback=callback, data=data, name=name, chat_id=chat_id, user_id=user_id)

j = self.scheduler.add_job(job.run, args=(self.application,), name=name, **job_kwargs)
j = self.scheduler.add_job(self.job_callback, args=(self, job), name=name, **job_kwargs)

job._job = j # pylint: disable=protected-access
return job
Expand Down Expand Up @@ -625,10 +649,7 @@ def jobs(self) -> Tuple["Job[CCT]", ...]:
Returns:
Tuple[:class:`Job`]: Tuple of all *scheduled* jobs.
"""
return tuple(
Job._from_aps_job(job) # pylint: disable=protected-access
for job in self.scheduler.get_jobs()
)
return tuple(Job.from_aps_job(job) for job in self.scheduler.get_jobs())

def get_jobs_by_name(self, name: str) -> Tuple["Job[CCT]", ...]:
"""Returns a tuple of all *pending/scheduled* jobs with the given name that are currently
Expand Down Expand Up @@ -821,8 +842,25 @@ def next_t(self) -> Optional[datetime.datetime]:
return self.job.next_run_time

@classmethod
def _from_aps_job(cls, job: "APSJob") -> "Job[CCT]":
return job.func.__self__
def from_aps_job(cls, aps_job: "APSJob") -> "Job[CCT]":
"""Provides the :class:`telegram.ext.Job` that is associated with the given APScheduler
job.
Tip:
This method can be useful when using advanced APScheduler features along with
:class:`telegram.ext.JobQueue`.
.. versionadded:: NEXT.VERSION
Args:
aps_job (:class:`apscheduler.job.Job`): The APScheduler job
Returns:
:class:`telegram.ext.Job`
"""
ext_job = aps_job.args[1]
ext_job._job = aps_job # pylint: disable=protected-access
return ext_job

def __getattr__(self, item: str) -> object:
try:
Expand Down
26 changes: 26 additions & 0 deletions tests/ext/test_jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,29 @@ async def callback(_):
else:
await asyncio.sleep(0.1) # unfortunately we will get a CancelledError here
assert task.done()

async def test_from_aps_job(self, job_queue):
job = job_queue.run_once(self.job_run_once, 0.1, name="test_job")
aps_job = job_queue.scheduler.get_job(job.id)

tg_job = Job.from_aps_job(aps_job)
assert tg_job is job
assert tg_job.job is aps_job

async def test_from_aps_job_missing_reference(self, job_queue):
"""We manually create a ext.Job and an aps job such that the former has no reference to the
latter. Then we test that Job.from_aps_job() still sets the reference correctly.
"""
job = Job(self.job_run_once)
aps_job = job_queue.scheduler.add_job(
func=job_queue.job_callback,
args=(job_queue, job),
trigger="interval",
seconds=2,
id="test_id",
)

assert job.job is None
tg_job = Job.from_aps_job(aps_job)
assert tg_job is job
assert tg_job.job is aps_job

0 comments on commit 9c8d6ef

Please sign in to comment.