Skip to content

Commit

Permalink
move from_unix and to_unix to utils file
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbrooks committed Feb 20, 2014
1 parent 52198e8 commit 1ea0930
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 47 deletions.
34 changes: 7 additions & 27 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
import warnings

import calendar
from datetime import datetime, timedelta
from itertools import repeat

Expand All @@ -13,30 +12,11 @@

from redis import WatchError

from .utils import from_unix, to_unix

logger = logging.getLogger(__name__)


# utcnow, utcformat, and utcparse from rq.utils
def utcnow():
return datetime.utcnow()


def utcformat(dt):
return dt.strftime(u"%Y-%m-%dT%H:%M:%SZ")

# from_unix from times.from_unix()
def from_unix(string):
return datetime.utcfromtimestamp(float(string))


# to_unix from times.to_unix()
def to_unix(dt):
"""Converts a datetime object to unixtime"""
return calendar.timegm(dt.utctimetuple())



class Scheduler(object):
scheduler_key = 'rq:scheduler'
scheduled_jobs_key = 'rq:scheduler:scheduled_jobs'
Expand Down Expand Up @@ -134,11 +114,11 @@ def enqueue_in(self, time_delta, func, *args, **kwargs):
"""
Similar to ``enqueue_at``, but accepts a timedelta instead of datetime object.
The job's scheduled execution time will be calculated by adding the timedelta
to utcnow().
to datetime.utcnow().
"""
job = self._create_job(func, args=args, kwargs=kwargs)
self.connection._zadd(self.scheduled_jobs_key,
to_unix(utcnow() + time_delta),
to_unix(datetime.utcnow() + time_delta),
job.id)
return job

Expand Down Expand Up @@ -243,7 +223,7 @@ def epoch_to_datetime(epoch):
elif isinstance(until, datetime):
until = to_unix(until)
elif isinstance(until, timedelta):
until = to_unix((utcnow() + until))
until = to_unix((datetime.utcnow() + until))
job_ids = self.connection.zrangebyscore(self.scheduled_jobs_key, 0,
until, withscores=with_times,
score_cast_func=epoch_to_datetime)
Expand All @@ -270,7 +250,7 @@ def get_jobs_to_queue(self, with_times=False):
If with_times is True a list of tuples consisting of the job instance and
it's scheduled execution time is returned.
"""
return self.get_jobs(to_unix(utcnow()), with_times=with_times)
return self.get_jobs(to_unix(datetime.utcnow()), with_times=with_times)

def get_queue_for_job(self, job):
"""
Expand All @@ -292,7 +272,7 @@ def enqueue_job(self, job):
# If job is a repeated job, decrement counter
if repeat:
job.meta['repeat'] = int(repeat) - 1
job.enqueued_at = utcnow()
job.enqueued_at = datetime.utcnow()
job.save()

queue = self.get_queue_for_job(job)
Expand All @@ -305,7 +285,7 @@ def enqueue_job(self, job):
if job.meta['repeat'] == 0:
return
self.connection._zadd(self.scheduled_jobs_key,
to_unix(utcnow()) + int(interval),
to_unix(datetime.utcnow()) + int(interval),
job.id)

def enqueue_jobs(self):
Expand Down
13 changes: 13 additions & 0 deletions rq_scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import calendar
from datetime import datetime

# from_unix from times.from_unix()
def from_unix(string):
"""Convert a unix timestamp into a utc datetime"""
return datetime.utcfromtimestamp(float(string))


# to_unix from times.to_unix()
def to_unix(dt):
"""Converts a datetime object to unixtime"""
return calendar.timegm(dt.utctimetuple())
40 changes: 20 additions & 20 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from rq.job import Job
import warnings
from rq_scheduler import Scheduler
from rq_scheduler.scheduler import utcnow, utcformat, to_unix
from rq_scheduler.utils import to_unix

from tests import RQTestCase

Expand Down Expand Up @@ -72,7 +72,7 @@ def test_create_scheduled_job(self):
"""
Ensure that scheduled jobs are put in the scheduler queue with the right score
"""
scheduled_time = utcnow()
scheduled_time = datetime.utcnow()
job = self.scheduler.enqueue_at(scheduled_time, say_hello)
self.assertEqual(job, Job.fetch(job.id, connection=self.testconn))
self.assertIn(job.id,
Expand All @@ -84,7 +84,7 @@ def test_enqueue_in(self):
"""
Ensure that jobs have the right scheduled time.
"""
right_now = utcnow()
right_now = datetime.utcnow()
time_delta = timedelta(minutes=1)
job = self.scheduler.enqueue_in(time_delta, say_hello)
self.assertIn(job.id,
Expand All @@ -100,7 +100,7 @@ def test_get_jobs(self):
"""
Ensure get_jobs() returns all jobs until the specified time.
"""
now = utcnow()
now = datetime.utcnow()
job = self.scheduler.enqueue_at(now, say_hello)
self.assertIn(job, self.scheduler.get_jobs(now))
future_time = now + timedelta(hours=1)
Expand All @@ -114,7 +114,7 @@ def test_get_jobs_to_queue(self):
"""
Ensure that jobs scheduled the future are not queued.
"""
now = utcnow()
now = datetime.utcnow()
job = self.scheduler.enqueue_at(now, say_hello)
self.assertIn(job, self.scheduler.get_jobs_to_queue())
future_time = now + timedelta(hours=1)
Expand All @@ -128,7 +128,7 @@ def test_enqueue_job(self):
- "enqueued_at" attribute is properly set
- Job appears in the right queue
"""
now = utcnow()
now = datetime.utcnow()
queue_name = 'foo'
scheduler = Scheduler(connection=self.testconn, queue_name=queue_name)

Expand All @@ -143,7 +143,7 @@ def test_enqueue_job(self):
self.assertIn(job, queue.jobs)

def test_job_membership(self):
now = utcnow()
now = datetime.utcnow()
job = self.scheduler.enqueue_at(now, say_hello)
self.assertIn(job, self.scheduler)
self.assertIn(job.id, self.scheduler)
Expand All @@ -166,7 +166,7 @@ def test_change_execution_time(self):
"""
Ensure ``change_execution_time`` is called, ensure that job's score is updated
"""
job = self.scheduler.enqueue_at(utcnow(), say_hello)
job = self.scheduler.enqueue_at(datetime.utcnow(), say_hello)
new_date = datetime(2010, 1, 1)
self.scheduler.change_execution_time(job, new_date)
self.assertEqual(to_unix(new_date),
Expand All @@ -178,11 +178,11 @@ def test_args_kwargs_are_passed_correctly(self):
"""
Ensure that arguments and keyword arguments are properly saved to jobs.
"""
job = self.scheduler.enqueue_at(utcnow(), simple_addition, 1, 1, 1)
job = self.scheduler.enqueue_at(datetime.utcnow(), simple_addition, 1, 1, 1)
self.assertEqual(job.args, (1, 1, 1))
job = self.scheduler.enqueue_at(utcnow(), simple_addition, z=1, y=1, x=1)
job = self.scheduler.enqueue_at(datetime.utcnow(), simple_addition, z=1, y=1, x=1)
self.assertEqual(job.kwargs, {'x': 1, 'y': 1, 'z': 1})
job = self.scheduler.enqueue_at(utcnow(), simple_addition, 1, z=1, y=1)
job = self.scheduler.enqueue_at(datetime.utcnow(), simple_addition, 1, z=1, y=1)
self.assertEqual(job.kwargs, {'y': 1, 'z': 1})
self.assertEqual(job.args, (1,))

Expand All @@ -202,7 +202,7 @@ def test_enqueue_is_deprecated(self):
with warnings.catch_warnings(record=True) as w:
# Enable all warnings
warnings.simplefilter("always")
job = self.scheduler.enqueue(utcnow(), say_hello)
job = self.scheduler.enqueue(datetime.utcnow(), say_hello)
self.assertEqual(1, len(w))
self.assertEqual(w[0].category, DeprecationWarning)

Expand All @@ -213,30 +213,30 @@ def test_enqueue_periodic(self):
with warnings.catch_warnings(record=True) as w:
# Enable all warnings
warnings.simplefilter("always")
job = self.scheduler.enqueue_periodic(utcnow(), 1, None, say_hello)
job = self.scheduler.enqueue_periodic(datetime.utcnow(), 1, None, say_hello)
self.assertEqual(1, len(w))
self.assertEqual(w[0].category, DeprecationWarning)

def test_interval_and_repeat_persisted_correctly(self):
"""
Ensure that interval and repeat attributes get correctly saved in Redis.
"""
job = self.scheduler.schedule(utcnow(), say_hello, interval=10, repeat=11)
job = self.scheduler.schedule(datetime.utcnow(), say_hello, interval=10, repeat=11)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job_from_queue.meta['interval'], 10)
self.assertEqual(job_from_queue.meta['repeat'], 11)

def test_repeat_without_interval_raises_error(self):
# Ensure that an error is raised if repeat is specified without interval
def create_job():
self.scheduler.schedule(utcnow(), say_hello, repeat=11)
self.scheduler.schedule(datetime.utcnow(), say_hello, repeat=11)
self.assertRaises(ValueError, create_job)

def test_job_with_intervals_get_rescheduled(self):
"""
Ensure jobs with interval attribute are put back in the scheduler
"""
time_now = utcnow()
time_now = datetime.utcnow()
interval = 10
job = self.scheduler.schedule(time_now, say_hello, interval=interval)
self.scheduler.enqueue_job(job)
Expand All @@ -258,7 +258,7 @@ def test_job_with_repeat(self):
Ensure jobs with repeat attribute are put back in the scheduler
X (repeat) number of times
"""
time_now = utcnow()
time_now = datetime.utcnow()
interval = 10
# If job is repeated once, the job shouldn't be put back in the queue
job = self.scheduler.schedule(time_now, say_hello, interval=interval, repeat=1)
Expand All @@ -275,7 +275,7 @@ def test_job_with_repeat(self):
self.assertNotIn(job.id,
tl(self.testconn.zrange(self.scheduler.scheduled_jobs_key, 0, 1)))

time_now = utcnow()
time_now = datetime.utcnow()
# Now the same thing using enqueue_periodic
job = self.scheduler.enqueue_periodic(time_now, interval, 1, say_hello)
self.scheduler.enqueue_job(job)
Expand All @@ -295,7 +295,7 @@ def test_missing_jobs_removed_from_scheduler(self):
"""
Ensure jobs that don't exist when queued are removed from the scheduler.
"""
job = self.scheduler.schedule(utcnow(), say_hello)
job = self.scheduler.schedule(datetime.utcnow(), say_hello)
job.cancel()
self.scheduler.get_jobs_to_queue()
self.assertNotIn(job.id, tl(self.testconn.zrange(
Expand All @@ -305,7 +305,7 @@ def test_periodic_jobs_sets_ttl(self):
"""
Ensure periodic jobs set result_ttl to infinite.
"""
job = self.scheduler.schedule(utcnow(), say_hello, interval=5)
job = self.scheduler.schedule(datetime.utcnow(), say_hello, interval=5)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, -1)

Expand Down

0 comments on commit 1ea0930

Please sign in to comment.