RQ Scheduler is a small package that adds job scheduling capabilities to RQ, a Redis based Python queuing library.
You can install RQ Scheduler via pip:
pip install rq-scheduler
Or you can download the latest stable package from PyPI.
Schedule a job involves doing two different things:
- Putting a job in the scheduler
- Running a scheduler that will move scheduled jobs into queues when the time comes
There are two ways you can schedule a job. The first is using RQ Scheduler's enqueue_at
:
from redis import Redis from rq_scheduler import Scheduler from datetime import datetime scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue # Puts a job into the scheduler. The API is similar to RQ except that it # takes a datetime object as first argument. So for example to schedule a # job to run on Jan 1st 2020 we do: scheduler.enqueue_at(datetime(2020, 1, 1), func) # Date time should be in UTC # Here's another example scheduling a job to run at a specific date and time (in UTC), # complete with args and kwargs. scheduler.enqueue_at(datetime(2020, 1, 1, 3, 4), func, foo, bar=baz)
The second way is using enqueue_in
. Instead of taking a datetime
object,
this method expects a timedelta
and schedules the job to run at
X seconds/minutes/hours/days/weeks later. For example, if we want to monitor how
popular a tweet is a few times during the course of the day, we could do something like:
from datetime import timedelta # Schedule a job to run 10 minutes, 1 hour and 1 day later scheduler.enqueue_in(timedelta(minutes=10), count_retweets, tweet_id) scheduler.enqueue_in(timedelta(hours=1), count_retweets, tweet_id) scheduler.enqueue_in(timedelta(days=1), count_retweets, tweet_id)
IMPORTANT: You should always use UTC datetime when working with RQ Scheduler.
As of version 0.3, RQ Scheduler also supports creating periodic and repeated jobs.
You can do this via the schedule
method. Note that this feature needs
RQ >= 0.3.1.
This is how you do it:
scheduler.schedule( scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone func=func, # Function to be queued args=[arg1, arg2], # Arguments passed into function when executed kwargs={'foo': 'bar'}, # Keyword arguments passed into function when executed interval=60, # Time before the function is called again, in seconds repeat=10, # Repeat this number of times (None means repeat forever) uniq=False # If True - do not queue job if it is already present in queue )
IMPORTANT NOTE: If you set up a repeated job, you must make sure that you either do not set a result_ttl value or you set a value larger than the interval. Otherwise, the entry with the job details will expire and the job will not get re-scheduled.
As of version 0.6.0, RQ Scheduler also supports creating Cron Jobs, which you can use for
repeated jobs to run periodically at fixed times, dates or intervals, for more info check
https://en.wikipedia.org/wiki/Cron. You can do this via the cron
method.
This is how you do it:
scheduler.cron( cron_string, # A cron string (e.g. "0 0 * * 0") func=func, # Function to be queued args=[arg1, arg2], # Arguments passed into function when executed kwargs={'foo': 'bar'}, # Keyword arguments passed into function when executed repeat=10 # Repeat this number of times (None means repeat forever) queue_name=queue_name # In which queue the job should be put in )
Sometimes you need to know which jobs have already been scheduled. You can get a
list of enqueued jobs with the get_jobs
method:
list_of_job_instances = scheduler.get_jobs()
In it's simplest form (as seen in the above example) this method returns a list of all job instances that are currently scheduled for execution.
Additionally the method takes two optional keyword arguments until
and
with_times
. The first one specifies up to which point in time scheduled jobs
should be returned. It can be given as either a datetime / timedelta instance
or an integer denoting the number of seconds since epoch (1970-01-01 00:00:00).
The second argument is a boolen that determines whether the scheduled execution
time should be returned along with the job instances.
Example:
# get all jobs until 2012-11-30 10:00:00 list_of_job_instances = scheduler.get_jobs(until=datetime(2012, 10, 30, 10)) # get all jobs for the next hour list_of_job_instances = scheduler.get_jobs(until=timedelta(hours=1)) # get all jobs with execution times jobs_and_times = scheduler.get_jobs(with_times=True) # returns a list of tuples: # [(<rq.job.Job object at 0x123456789>, datetime.datetime(2012, 11, 25, 12, 30)), ...]
You can check whether a specific job instance or job id is scheduled for
execution using the familiar python in
operator:
if job_instance in scheduler: # Do something # or if job_id in scheduler: # Do something
To cancel a job, simply pass a Job
or a job id to scheduler.cancel
:
scheduler.cancel(job)
Note that this method returns None
whether the specified job was found or not.
RQ Scheduler comes with a script rqscheduler
that runs a scheduler
process that polls Redis once every minute and move scheduled jobs to the
relevant queues when they need to be executed:
# This runs a scheduler process using the default Redis connection rqscheduler
If you want to use a different Redis server you could also do:
rqscheduler --host localhost --port 6379 --db 0
The script accepts these arguments:
-H
or--host
: Redis server to connect to-p
or--port
: port to connect to-d
or--db
: Redis db to use-P
or--password
: password to connect to Redis-b
or--burst
: runs in burst mode (enqueue scheduled jobs whose execution time is in the past and quit)-i INTERVAL
or--interval INTERVAL
: How often the scheduler checks for new jobs to add to the queue (in seconds, can be floating-point for more precision).
The arguments pull default values from environment variables with the
same names but with a prefix of RQ_REDIS_
.
- Added scheduler.count(). Thanks @smaccona!
- scheduler.get_jobs() now supports pagination. Thanks @smaccona!
- Better ttl and result_ttl defaults for jobs created by scheduler.cron. Thanks @csaba-stylight and @lechup!
- Added scheduler.cron() capability. Thanks @petervtzand!
- scheduler.schedule() now accepts id and ttl kwargs. Thanks @mbodock!
- Travis CI fixes. Thanks Steven Kryskalla!
- Modified default logging configuration. You can pass in the
-v
or--verbose
argument torqscheduler
script for more verbose logging. - RQ Scheduler now registers Queue name when a new job is scheduled. Thanks @alejandrodob !
- You can now schedule jobs with string references like
scheduler.schedule(scheduled_time=now, func='foo.bar')
. Thanks @SirScott ! rqscheduler
script now accepts floating point intervals. Thanks Alexander Pikovsky!
- IMPORTANT! Job timestamps are now stored and interpreted in UTC format. If you have existing scheduled jobs, you should probably change their timestamp to UTC before upgrading to 0.5.0. Thanks @michaelbrooks!
- You can now configure Redis connection via environment variables. Thanks @malthe!
rqscheduler
script now accepts--pid
argument. Thanks @jsoncorwin!
- Supports Python 3!
Scheduler.schedule
now allows jobtimeout
to be specifiedrqscheduler
allows Redis connection to be specified via--url
argumentrqscheduler
now accepts--path
argument
- Scheduler key is not set to expire a few seconds after the next scheduling
operation. This solves the issue of
rqscheduler
refusing to start after an unexpected shut down.
- Support
StrictRedis
- Scheduler related job attributes (
interval
andrepeat
) are now stored injob.meta
introduced in RQ 0.3.4
- You can now check whether a job is scheduled for execution using
job in scheduler
syntax - Added
scheduler.get_jobs
method scheduler.enqueue
andscheduler.enqueue_periodic
will now raise a DeprecationWarning, please usescheduler.schedule
instead
- Periodic jobs now require RQ >= 0.3.1
- Added the capability to create periodic (cron) and repeated job using
scheduler.enqueue