Skip to content

Commit

Permalink
Merge pull request #14 from mikemill/repeat_job
Browse files Browse the repository at this point in the history
Add repeating jobs
  • Loading branch information
mikemill committed Sep 25, 2016
2 parents 9c82e19 + d60d58b commit a116f2b
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 6 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ You can check if a job or job id is currently scheduled. Example: `job in queue

You can unschedule jobs with `Queue.remove_job`. This removes the job from the scheduler queue but does not remove it from RQ.

### Repeating jobs

Once a job has been enqueued you can set it to repeat with `Queue.repeat` which takes the job and a `datetime.timedelta` object. Additionally you can
pass a `max_runs` value to limit the number of times it will repeat.

If the job is not already in the queue it will be enqueued at the given interval.

In order to ensure that job results cleanup doesn't remove the job (thus breaking the repetition) the job is copied into a new job.
The parent job can be accessed via `Job.parent` for the repeated jobs.

Due to how the jobs are put into the work queues the maximum frequency is controlled by the scheduler's interval.

## Scheduler

In order to move jobs from the schedule queue into the proper RQ queue a scheduler needs to be ran.
Expand Down
14 changes: 14 additions & 0 deletions rq_retry_scheduler/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ def enqueue_job(self, job, pipeline=None, at_front=False):

return job

def repeat_job(self, job, interval, max_runs=None):
job.meta.update({
'interval': interval,
'max_runs': max_runs,
'run_count': 0,
})

if job not in self:
self.enqueue_job_in(interval, job)
else:
job.save()

return job

def scheduled_jobs(self):
num_jobs = self.connection.zcard(self.scheduler_jobs_key)
job_ids = self.connection.zrange(
Expand Down
72 changes: 68 additions & 4 deletions rq_retry_scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
from datetime import datetime
import logging
import time
Expand Down Expand Up @@ -56,17 +57,80 @@ def get_jobs_to_queue(self, score):
except NoSuchJobError:
self.remove_job(job_id)

def delay_job(self, job, time_delta):
amount = int(time_delta.total_seconds())
self.connection.zincrby(self.scheduler_jobs_key, job.id, amount)

def should_repeat_job(self, job):
max_runs = job.meta['max_runs']
run_count = job.meta['run_count']

return max_runs is None or max_runs > run_count

def repeat_job_id(self, job):
return 'repeat_{}_{}'.format(job.id, job.meta.get('run_count', 0) + 1)

def make_repeat_job(self, job):
meta = deepcopy(job.meta)
meta.pop('interval', None)
meta.pop('run_count', None)
meta.pop('max_runs', None)

params = {
'func': job.func,
'args': job.args,
'kwargs': job.kwargs,
'connection': job.connection,
'result_ttl': job.result_ttl,
'ttl': job.ttl,
'id': self.repeat_job_id(job),
'origin': job.origin,
'meta': meta,
}

repeat_job = Job.create(**params)
repeat_job.parent = job
repeat_job.save()

return repeat_job

def handle_job_repeat(self, job, queue):
repeat_job = self.make_repeat_job(job)
self.log.info("Enqueuing job {} to queue {}".format(
repeat_job.id, repeat_job.origin))

queue.enqueue_job(repeat_job)

job.meta['run_count'] += 1

if self.should_repeat_job(job):
self.log.info("Scheduling job {} to repeat in {}".format(
job.id, job.meta['interval']))
self.delay_job(job, job.meta['interval'])
job.save()
else:
self.log.info("Removing job {} from scheduler".format(job.id))
self.remove_job(job.id)

return repeat_job

def is_repeat(self, job):
return 'interval' in job.meta

def enqueue_jobs(self):
self.log.info('Checking for scheduled jobs...')

jobs = self.get_jobs_to_queue(to_unix(self.current_time()))

for job in jobs:
self.log.info(
"Enqueuing job {} to queue {}".format(job.id, job.origin))
queue = self.get_queue(job.origin)
queue.enqueue_job(job)
self.remove_job(job.id)
if self.is_repeat(job):
self.handle_job_repeat(job, queue)
else:
self.log.info(
"Enqueuing job {} to queue {}".format(job.id, job.origin))
queue.enqueue_job(job)
self.remove_job(job.id)

def run(self, burst=False):
self.log.info('Starting RQ Retry Scheduler..')
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def long_description():

setup(
name='rq-retry-scheduler',
version='0.1.0b4',
version='0.1.0b5',
url='https://github.com/mikemill/rq_retry_scheduler',
description='RQ Retry and Scheduler',
long_description=long_description(),
Expand Down
23 changes: 23 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,26 @@ def test_unschedule_job(queue):
queue.unschedule_job(job2.id)

assert job2 not in queue


def test_repeat_job(queue):
td = timedelta(seconds=1)
job = queue.enqueue_in(td, target_function)
assert queue.repeat_job(job, td) is job

j2 = Job.fetch(job.id, connection=job.connection)
assert job == j2
assert j2.meta['interval'] == td
assert j2.meta['max_runs'] is None
assert j2.meta['run_count'] == 0


def test_repeat_job_not_in_queue(queue, mock):
td = timedelta(seconds=1)

job = Job.create(target_function, connection=queue.connection)
s = mock.spy(queue, 'enqueue_job_in')

queue.repeat_job(job, td)
assert job in queue
s.assert_called_with(td, job)
114 changes: 113 additions & 1 deletion tests/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
import pytest
from rq.exceptions import NoSuchJobError
from rq.job import Job
Expand Down Expand Up @@ -127,6 +127,18 @@ def test_enqueue_jobs(scheduler, mock):
assert enqueue.call_count == len(jobs)


def test_enqueue_jobs_with_repeat(scheduler, mock):
job = Job.create(target_function, connection=scheduler.connection)

m = mock.patch.object(scheduler, 'handle_job_repeat')
mock.patch.object(scheduler, 'is_repeat', return_value=True)
mock.patch.object(scheduler, 'get_jobs_to_queue',
return_value=[job])

scheduler.enqueue_jobs()
assert m.called


def test_run(scheduler, mock):
side_effects = [None, Exception('End')]

Expand All @@ -147,3 +159,103 @@ def test_run_burst(scheduler, mock):
scheduler.run(True)

assert not sleep.called


def test_is_repeat(scheduler):
job = Job.create(target_function, connection=scheduler.connection,
origin='unittest')

assert scheduler.is_repeat(job) is False

job.meta['interval'] = 10

assert scheduler.is_repeat(job) is True


def test_delay_job(scheduler, queue):
td = timedelta(seconds=3)
dt = datetime.utcnow()

queue.current_time = lambda: dt

job = queue.enqueue_in(td, target_function)

conn = queue.connection
jobs = conn.zrange(queue.scheduler_jobs_key, 0, -1, withscores=True,
score_cast_func=int)
_, ts = jobs[0]

assert to_unix(dt + td) == ts

scheduler.delay_job(job, td)

jobs = conn.zrange(queue.scheduler_jobs_key, 0, -1, withscores=True,
score_cast_func=int)
_, ts = jobs[0]

assert to_unix(dt + td + td) == ts


def test_should_repeat_job(scheduler):
job = Job.create(target_function, connection=scheduler.connection)
job.meta.update({
'interval': 1,
'max_runs': None,
'run_count': 99999,
})

# (max_runs, run_count, result)
tests = [
(None, 0, True),
(None, 1, True),
(None, 999999999, True),
(1, 0, True),
(1, 1, False),
(1, 2, False),
]

for max_runs, run_count, result in tests:
job.meta['max_runs'] = max_runs
job.meta['run_count'] = run_count
assert scheduler.should_repeat_job(job) is result


def test_handle_job_repeat(scheduler, queue):
td = timedelta(seconds=3)
dt = datetime.utcnow()

scheduler.current_time = lambda: dt
queue.current_time = lambda: dt

max_runs = 3

job = queue.enqueue_in(td, target_function)
queue.repeat_job(job, td, max_runs=max_runs)

work_queue = Queue(job.origin, connection=queue.connection)

assert job in queue
assert job.id not in work_queue.get_job_ids()

for r in range(1, max_runs + 1):
repeat_job = scheduler.handle_job_repeat(job, work_queue)
assert repeat_job not in queue
assert repeat_job.id in work_queue.get_job_ids()
assert job.meta['run_count'] == r

assert job not in queue


def test_make_repat_job(scheduler):
job = Job.create(target_function, args=(1, 2, 3), kwargs={'unit': 'test'},
connection=scheduler.connection)

repeat_job = scheduler.make_repeat_job(job)

assert repeat_job is not job
assert repeat_job.id != job.id
assert repeat_job.id == scheduler.repeat_job_id(job)
assert repeat_job.parent is job
assert repeat_job.func == job.func
assert repeat_job.args == job.args
assert repeat_job.kwargs == job.kwargs

0 comments on commit a116f2b

Please sign in to comment.