From 058183b477a4643bea947e963fbd7c2eaab1e997 Mon Sep 17 00:00:00 2001 From: Dan Lecocq Date: Sun, 29 Apr 2012 14:50:01 -0700 Subject: [PATCH] Renaming client.complete, etc. -> client.jobs.complete; client.job(id) -> client.jobs[id] --- qless/__init__.py | 71 ++++++++------- test.py | 220 +++++++++++++++++++++++----------------------- 2 files changed, 148 insertions(+), 143 deletions(-) diff --git a/qless/__init__.py b/qless/__init__.py index 8ad6cdb..19a3c7b 100755 --- a/qless/__init__.py +++ b/qless/__init__.py @@ -14,6 +14,43 @@ logger.addHandler(handler) logger.setLevel(logging.FATAL) +class Jobs(object): + def __init__(self, client): + self.client = client + + def complete(self, offset=0, count=25): + return self.client._jobs([], ['complete', offset, count]) + + def tracked(self): + results = json.loads(self.client._track([], [])) + results['jobs'] = [Job(self, **j) for j in results['jobs']] + return results + + def tagged(self, tag, offset=0, count=25): + return json.loads(self.client._tag([], ['get', tag, offset, count])) + + def failed(self, group=None, start=0, limit=25): + '''Failed(0, [group, [start, [limit]]]) + --------------------------------------- + If no type is provided, this returns a JSON blob of the counts of the various + types of failures known. If a type is provided, it will report up to `limit` + from `start` of the jobs affected by that issue. __Returns__ a JSON blob.''' + if not group: + return json.loads(self.client._failed([], [])) + else: + results = json.loads(self.client._failed([], [group, start, limit])) + results['jobs'] = [Job(self.client, **j) for j in results['jobs']] + return results + + def __getitem__(self, id): + '''Get(0, id) + ---------- + Get the data associated with a job''' + results = self.client._get([], [id]) + if not results: + return None + return Job(self.client, **json.loads(results)) + class client(object): def __init__(self, host='localhost', port=6379, hostname = None, **kwargs): import os @@ -25,6 +62,7 @@ def __init__(self, host='localhost', port=6379, hostname = None, **kwargs): # instances simultaneously. self.redis = redis.Redis(host, port, **kwargs) self.config = Config(self) + self.jobs = Jobs(self) # Client's lua scripts for cmd in [ 'cancel', 'complete', 'depends', 'fail', 'failed', 'get', 'getconfig', 'heartbeat', 'jobs', 'peek', @@ -39,46 +77,13 @@ def queues(self, queue=None): return json.loads(self._queues([], [time.time(), queue])) return json.loads(self._queues([], [time.time()])) - def tracked(self): - results = json.loads(self._track([], [])) - results['jobs'] = [Job(self, **j) for j in results['jobs']] - return results - - def tagged(self, tag, offset=0, count=25): - return json.loads(self._tag([], ['get', tag, offset, count])) - def tags(self, offset=0, count=100): return json.loads(self._tag([], ['top', offset, count])) - def complete(self, offset=0, count=25): - return self._jobs([], ['complete', offset, count]) - - def failed(self, group=None, start=0, limit=25): - '''Failed(0, [group, [start, [limit]]]) - --------------------------------------- - If no type is provided, this returns a JSON blob of the counts of the various - types of failures known. If a type is provided, it will report up to `limit` - from `start` of the jobs affected by that issue. __Returns__ a JSON blob.''' - if not group: - return json.loads(self._failed([], [])) - else: - results = json.loads(self._failed([], [group, start, limit])) - results['jobs'] = [Job(self, **j) for j in results['jobs']] - return results - def workers(self, worker=None): if worker: return json.loads(self._workers([], [time.time(), worker])) return json.loads(self._workers([], [time.time()])) - - def job(self, id): - '''Get(0, id) - ---------- - Get the data associated with a job''' - results = self._get([], [id]) - if not results: - return None - return Job(self, **json.loads(results)) from lua import lua from job import Job diff --git a/test.py b/test.py index d7e3ffb..1c3b196 100755 --- a/test.py +++ b/test.py @@ -73,9 +73,9 @@ def test_depends_put(self): job = self.q.pop() jid = self.q.put(qless.Job, {'test': 'depends_put'}, depends=[job.jid]) self.assertEqual(self.q.pop(), None) - self.assertEqual(self.client.job(jid).state, 'depends') + self.assertEqual(self.client.jobs[jid].state, 'depends') job.complete() - self.assertEqual(self.client.job(jid).state, 'waiting') + self.assertEqual(self.client.jobs[jid].state, 'waiting') self.assertEqual(self.q.pop().jid, jid) # Let's try this dance again, but with more job dependencies @@ -102,11 +102,11 @@ def test_depends_complete(self): b = self.q.put(qless.Job, {'test': 'depends_complete'}) job = self.q.pop() job.complete('testing', depends=[b]) - self.assertEqual(self.client.job(a).state, 'depends') + self.assertEqual(self.client.jobs[a].state, 'depends') jobs = self.q.pop(20) self.assertEqual(len(jobs), 1) jobs[0].complete() - self.assertEqual(self.client.job(a).state, 'waiting') + self.assertEqual(self.client.jobs[a].state, 'waiting') job = self.q.pop() self.assertEqual(job.jid, a) @@ -149,7 +149,7 @@ def test_depends_canceled(self): # a job that others depend on, you should have an exception thrown a = self.q.put(qless.Job, {'test': 'test_depends_canceled'}) b = self.q.put(qless.Job, {'test': 'test_depends_canceled'}, depends=[a]) - self.client.job(b).cancel() + self.client.jobs[b].cancel() job = self.q.pop() self.assertEqual(job.jid, a) self.assertEqual(job.complete(), 'complete') @@ -157,7 +157,7 @@ def test_depends_canceled(self): a = self.q.put(qless.Job, {'test': 'cancel_dependency'}) b = self.q.put(qless.Job, {'test': 'cancel_dependency'}, depends=[a]) - self.assertRaises(Exception, self.client.job(a).cancel) + self.assertRaises(Exception, self.client.jobs[a].cancel) def test_depends_complete_advance(self): # If we make B depend on A, and then move A through several @@ -196,11 +196,11 @@ def test_move_dependency(self): # popped before we can describe its dependencies a = self.q.put(qless.Job, {'test': 'move_dependency'}) b = self.q.put(qless.Job, {'test': 'move_dependency'}, depends=[a]) - self.client.job(b).move('other') - self.assertEqual(self.client.job(b).state, 'depends') + self.client.jobs[b].move('other') + self.assertEqual(self.client.jobs[b].state, 'depends') self.assertEqual(self.other.pop(), None) self.q.pop().complete() - self.assertEqual(self.client.job(b).state, 'waiting') + self.assertEqual(self.client.jobs[b].state, 'waiting') self.assertEqual(self.other.pop().jid, b) def test_add_dependency(self): @@ -208,7 +208,7 @@ def test_add_dependency(self): # we should be able to add more dependencies. If it's not, then # we can't a = self.q.put(qless.Job, {'test': 'add_dependency'}) - b = self.client.job(self.q.put(qless.Job, {'test': 'add_dependency'}, depends=[a])) + b = self.client.jobs[self.q.put(qless.Job, {'test': 'add_dependency'}, depends=[a])] c = self.q.put(qless.Job, {'test': 'add_dependency'}) self.assertEqual(b.depend(c), True) @@ -224,42 +224,42 @@ def test_add_dependency(self): # If the job's put, but waiting, we can't add dependencies a = self.q.put(qless.Job, {'test': 'add_dependency'}) b = self.q.put(qless.Job, {'test': 'add_dependency'}) - self.assertEqual(self.client.job(a).depend(b), False) + self.assertEqual(self.client.jobs[a].depend(b), False) job = self.q.pop() self.assertEqual(job.depend(b), False) job.fail('what', 'something') - self.assertEqual(self.client.job(job.jid).depend(b), False) + self.assertEqual(self.client.jobs[job.jid].depend(b), False) def test_remove_dependency(self): # If we have a job that already depends on others, then we should # we able to remove them. If it's not dependent on any, then we can't. a = self.q.put(qless.Job, {'test': 'remove_dependency'}) - b = self.client.job(self.q.put(qless.Job, {'test': 'remove_dependency'}, depends=[a])) + b = self.client.jobs[self.q.put(qless.Job, {'test': 'remove_dependency'}, depends=[a])] self.assertEqual(len(self.q.pop(20)), 1) b.undepend(a) self.assertEqual(self.q.pop().jid, b.jid) # Make sure we removed the dependents from the first one, as well - self.assertEqual(self.client.job(a).dependents, []) + self.assertEqual(self.client.jobs[a].dependents, []) # Let's try removing /all/ dependencies jids = [self.q.put(qless.Job, {'test': 'remove_dependency'}) for i in range(10)] - b = self.client.job(self.q.put(qless.Job, {'test': 'remove_dependency'}, depends=jids)) + b = self.client.jobs[self.q.put(qless.Job, {'test': 'remove_dependency'}, depends=jids)] self.assertEqual(len(self.q.pop(20)), 10) b.undepend(all=True) - self.assertEqual(self.client.job(b.jid).state, 'waiting') + self.assertEqual(self.client.jobs[b.jid].state, 'waiting') # Let's make sure that each of the jobs we removed as dependencies also go their # dependencies removed, too. for jid in jids: - self.assertEqual(self.client.job(jid).dependents, []) + self.assertEqual(self.client.jobs[jid].dependents, []) # If the job's put, but waiting, we can't add dependencies a = self.q.put(qless.Job, {'test': 'add_dependency'}) b = self.q.put(qless.Job, {'test': 'add_dependency'}) - self.assertEqual(self.client.job(a).undepend(b), False) + self.assertEqual(self.client.jobs[a].undepend(b), False) job = self.q.pop() self.assertEqual(job.undepend(b), False) job.fail('what', 'something') - self.assertEqual(self.client.job(job.jid).undepend(b), False) + self.assertEqual(self.client.jobs[job.jid].undepend(b), False) def test_jobs_depends(self): # When we have jobs that have dependencies, we should be able to @@ -283,7 +283,7 @@ def test_retry(self): job.retry() # Pop it off again self.assertEqual(self.q.jobs.scheduled(), []) - self.assertEqual(self.client.job(job.jid).state, 'waiting') + self.assertEqual(self.client.jobs[job.jid].state, 'waiting') job = self.q.pop() self.assertNotEqual(job, None) self.assertEqual(job.original_retries, job.retries_left + 1) @@ -291,28 +291,28 @@ def test_retry(self): job.retry(60) self.assertEqual(self.q.pop(), None) self.assertEqual(self.q.jobs.scheduled(), [jid]) - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(job.original_retries, job.retries_left + 2) self.assertEqual(job.state, 'scheduled') def test_retry_fail(self): # Make sure that if we exhaust a job's retries, that it fails jid = self.q.put(qless.Job, {'test': 'test_retry_fail'}, retries=2) - self.assertEqual(self.client.failed(), {}) + self.assertEqual(self.client.jobs.failed(), {}) self.assertEqual(self.q.pop().retry(), 1) self.assertEqual(self.q.pop().retry(), 0) self.assertEqual(self.q.pop().retry(), -1) - self.assertEqual(self.client.failed(), { + self.assertEqual(self.client.jobs.failed(), { 'failed-retries-testing': 1 }) def test_retry_error(self): # These are some of the conditions under which we cannot retry a job - job = self.client.job(self.q.put(qless.Job, {'test': 'test_retry_error'})) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'test_retry_error'})] self.assertEqual(job.retry(), False) self.q.pop().fail('foo', 'bar') - self.assertEqual(self.client.job(job.jid).retry(), False) - self.client.job(job.jid).move('testing') + self.assertEqual(self.client.jobs[job.jid].retry(), False) + self.client.jobs[job.jid].move('testing') job = self.q.pop(); job.worker = 'foobar' self.assertEqual(job.retry(), False) job.worker = self.client.worker @@ -341,7 +341,7 @@ def test_priority(self): a = self.q.put(qless.Job, {'test': 'test_priority'}, priority = -10) b = self.q.put(qless.Job, {'test': 'test_priority'}) self.assertEqual(self.q.peek().jid, a) - job = self.client.job(b) + job = self.client.jobs[b] job.priority = -20 self.assertEqual(len(self.q), 2) self.assertEqual(self.q.peek().jid, b) @@ -364,42 +364,42 @@ class TestTag(TestQless): # 3) When a job expires or is canceled, it should be removed from the # set of jobs with that tag def test_tag(self): - job = self.client.job(self.q.put(qless.Job, {'test': 'tag'})) - self.assertEqual(self.client.tagged('foo'), {'total': 0, 'jobs': {}}) - self.assertEqual(self.client.tagged('bar'), {'total': 0, 'jobs': {}}) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'tag'})] + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}}) job.tag('foo') - self.assertEqual(self.client.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) - self.assertEqual(self.client.tagged('bar'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}}) job.tag('bar') - self.assertEqual(self.client.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) - self.assertEqual(self.client.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) job.untag('foo') - self.assertEqual(self.client.tagged('foo'), {'total': 0, 'jobs': {}}) - self.assertEqual(self.client.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) job.untag('bar') - self.assertEqual(self.client.tagged('foo'), {'total': 0, 'jobs': {}}) - self.assertEqual(self.client.tagged('bar'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}}) def test_preserve_order(self): - job = self.client.job(self.q.put(qless.Job, {'test': 'preserve_order'})) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'preserve_order'})] tags = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'] for i in range(len(tags)): job.tag(tags[i]) - self.assertEqual(self.client.job(job.jid).tags, tags[0:i+1]) + self.assertEqual(self.client.jobs[job.jid].tags, tags[0:i+1]) # Now let's take a select few out job.untag('a', 'c', 'e', 'g') - self.assertEqual(self.client.job(job.jid).tags, ['b', 'd', 'f', 'h']) + self.assertEqual(self.client.jobs[job.jid].tags, ['b', 'd', 'f', 'h']) def test_cancel_expire(self): # First, we'll cancel a job - job = self.client.job(self.q.put(qless.Job, {'test': 'cancel_expire'})) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'cancel_expire'})] job.tag('foo', 'bar') - self.assertEqual(self.client.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) - self.assertEqual(self.client.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) job.cancel() - self.assertEqual(self.client.tagged('foo'), {'total': 0, 'jobs': {}}) - self.assertEqual(self.client.tagged('bar'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}}) # Now, we'll have a job expire from completion self.client.config.set('jobs-history-count', 0) @@ -407,25 +407,25 @@ def test_cancel_expire(self): job = self.q.pop() self.assertNotEqual(job, None) job.tag('foo', 'bar') - self.assertEqual(self.client.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) - self.assertEqual(self.client.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]}) job.complete() - self.assertEqual(self.client.job(job.jid), None) - self.assertEqual(self.client.tagged('foo'), {'total': 0, 'jobs': {}}) - self.assertEqual(self.client.tagged('bar'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs[job.jid], None) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}}) # If the job no longer exists, attempts to tag it should not add to the set job.tag('foo', 'bar') - self.assertEqual(self.client.tagged('foo'), {'total': 0, 'jobs': {}}) - self.assertEqual(self.client.tagged('bar'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}}) def test_tag_put(self): # We should make sure that we can tag a job when we initially put it, too - self.assertEqual(self.client.tagged('foo'), {'total': 0, 'jobs': {}}) - self.assertEqual(self.client.tagged('bar'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}}) jid = self.q.put(qless.Job, {'test': 'tag_put'}, tags=['foo', 'bar']) - self.assertEqual(self.client.tagged('foo'), {'total': 1, 'jobs': [jid]}) - self.assertEqual(self.client.tagged('bar'), {'total': 1, 'jobs': [jid]}) + self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [jid]}) + self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [jid]}) def test_tag_top(self): # 1) Make sure that it only includes tags with more than one job associated with it @@ -437,13 +437,13 @@ def test_tag_top(self): self.assertEqual(self.client.tags(), {}) jids = [self.q.put(qless.Job, {}, tags=['foo']) for i in range(10)] self.assertEqual(self.client.tags(), ['foo']) - jobs = [self.client.job(jid).cancel() for jid in jids] + jobs = [self.client.jobs[jid].cancel() for jid in jids] self.assertEqual(self.client.tags(), {}) # Add only one back a = self.q.put(qless.Job, {}, tags=['foo']) self.assertEqual(self.client.tags(), {}) # Add a second, and then tag it - b = self.client.job(self.q.put(qless.Job, {})) + b = self.client.jobs[self.q.put(qless.Job, {})] b.tag('foo') self.assertEqual(self.client.tags(), ['foo']) b.untag('foo') @@ -465,15 +465,15 @@ def test_fail_failed(self): # in the failed endpoint # 4) Ensure that the job still has its original queue self.assertEqual(len(self.q), 0, 'Start with an empty queue') - self.assertEqual(len(self.client.failed()), 0) + self.assertEqual(len(self.client.jobs.failed()), 0) jid = self.q.put(qless.Job, {'test': 'fail_failed'}) job = self.q.pop() job.fail('foo', 'Some sort of message') self.assertEqual(self.q.pop(), None) - self.assertEqual(self.client.failed(), { + self.assertEqual(self.client.jobs.failed(), { 'foo': 1 }) - results = self.client.failed('foo') + results = self.client.jobs.failed('foo') self.assertEqual(results['total'], 1) job = results['jobs'][0] self.assertEqual(job.jid , jid) @@ -497,7 +497,7 @@ def test_pop_fail(self): # 3) Heartbeat to job fails # 4) Complete job fails self.assertEqual(len(self.q), 0, 'Start with an empty queue') - self.assertEqual(len(self.client.failed()), 0) + self.assertEqual(len(self.client.jobs.failed()), 0) jid = self.q.put(qless.Job, {'test': 'pop_fail'}) job = self.q.pop() self.assertNotEqual(job, None) @@ -505,28 +505,28 @@ def test_pop_fail(self): self.assertEqual(len(self.q), 0) self.assertEqual(job.heartbeat(), False) self.assertEqual(job.complete() , False) - self.assertEqual(self.client.failed(), { + self.assertEqual(self.client.jobs.failed(), { 'foo': 1 }) - results = self.client.failed('foo') + results = self.client.jobs.failed('foo') self.assertEqual(results['total'], 1) self.assertEqual(results['jobs'][0].jid, jid) def test_fail_state(self): # We shouldn't be able to fail a job that's in any state but # running - self.assertEqual(len(self.client.failed()), 0) - job = self.client.job(self.q.put(qless.Job, {'test': 'fail_state'})) + self.assertEqual(len(self.client.jobs.failed()), 0) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'fail_state'})] self.assertEqual(job.fail('foo', 'Some sort of message'), False) - self.assertEqual(len(self.client.failed()), 0) - job = self.client.job(self.q.put(qless.Job, {'test': 'fail_state'}, delay=60)) + self.assertEqual(len(self.client.jobs.failed()), 0) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'fail_state'}, delay=60)] self.assertEqual(job.fail('foo', 'Some sort of message'), False) - self.assertEqual(len(self.client.failed()), 0) + self.assertEqual(len(self.client.jobs.failed()), 0) jid = self.q.put(qless.Job, {'test': 'fail_complete'}) job = self.q.pop() job.complete() self.assertEqual(job.fail('foo', 'Some sort of message'), False) - self.assertEqual(len(self.client.failed()), 0) + self.assertEqual(len(self.client.jobs.failed()), 0) def test_put_failed(self): # In this test, we want to make sure that if we put a job @@ -541,10 +541,10 @@ def test_put_failed(self): jid = self.q.put(qless.Job, {'test': 'put_failed'}) job = self.q.pop() job.fail('foo', 'some message') - self.assertEqual(self.client.failed(), {'foo':1}) + self.assertEqual(self.client.jobs.failed(), {'foo':1}) job.move('testing') self.assertEqual(len(self.q), 1) - self.assertEqual(self.client.failed(), {}) + self.assertEqual(self.client.jobs.failed(), {}) def test_complete_failed(self): # No matter if a job has been failed before or not, then we @@ -556,7 +556,7 @@ def test_complete_failed(self): job.move('testing') job = self.q.pop() self.assertEqual(job.complete(), 'complete') - self.assertEqual(self.client.job(jid).failure, {}) + self.assertEqual(self.client.jobs[jid].failure, {}) class TestEverything(TestQless): def test_config(self): @@ -580,7 +580,7 @@ def test_put_get(self): # 2) get job # 3) delete job jid = self.q.put(qless.Job, {'test': 'put_get'}) - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(job.priority , 0) self.assertEqual(job.data , {'test': 'put_get'}) self.assertEqual(job.tags , []) @@ -648,7 +648,7 @@ def test_data_access(self): # 4) Pop a job, check job['test'] self.assertEqual(len(self.q), 0, 'Start with an empty queue') jid = self.q.put(qless.Job, {'test': 'data_access'}) - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(job['test'], 'data_access') job = self.q.peek() self.assertEqual(job['test'], 'data_access') @@ -719,11 +719,11 @@ def test_put_pop_complete_history(self): # 4) Complete job, check history self.assertEqual(len(self.q), 0, 'Start with an empty queue') jid = self.q.put(qless.Job, {'test': 'put_history'}) - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(math.floor(job.history[0]['put']), math.floor(time.time())) # Now pop it job = self.q.pop() - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(math.floor(job.history[0]['popped']), math.floor(time.time())) def test_move_queue(self): @@ -737,7 +737,7 @@ def test_move_queue(self): self.assertEqual(len(self.other), 0, 'Start with an empty queue "other"') jid = self.q.put(qless.Job, {'test': 'move_queues'}) self.assertEqual(len(self.q), 1, 'Put failed') - job = self.client.job(jid) + job = self.client.jobs[jid] job.move('other') self.assertEqual(len(self.q ), 0, 'Move failed') self.assertEqual(len(self.other), 1, 'Move failed') @@ -774,9 +774,9 @@ def test_move_non_destructive(self): self.assertEqual(len(self.q ), 0, 'Start with an empty queue') self.assertEqual(len(self.other), 0, 'Start with an empty queue "other"') jid = self.q.put(qless.Job, {'test': 'move_non_destructive'}, tags=['foo', 'bar'], priority=5) - before = self.client.job(jid) + before = self.client.jobs[jid] before.move('other') - after = self.client.job(jid) + after = self.client.jobs[jid] self.assertEqual(before.tags , ['foo', 'bar']) self.assertEqual(before.priority, 5) self.assertEqual(before.tags , after.tags) @@ -831,7 +831,7 @@ def test_heartbeat_state(self): # 3) Ensure we cannot heartbeat that job self.assertEqual(len(self.q), 0, 'Start with an empty queue') jid = self.q.put(qless.Job, {'test': 'heartbeat_state'}) - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(job.heartbeat(), False) def test_peek_pop_empty(self): @@ -901,11 +901,11 @@ def test_cancel(self): # 4) Ensure that we can't get data for it self.assertEqual(len(self.q), 0, 'Start with an empty queue') jid = self.q.put(qless.Job, {'test': 'cancel'}) - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(len(self.q), 1) job.cancel() self.assertEqual(len(self.q), 0) - self.assertEqual(self.client.job(jid), None) + self.assertEqual(self.client.jobs[jid], None) def test_cancel_heartbeat(self): # In this test, we want to make sure that when we cancel @@ -923,7 +923,7 @@ def test_cancel_heartbeat(self): self.assertEqual(len(self.q), 0) self.assertEqual(job.heartbeat(), False) self.assertEqual(job.complete() , False) - self.assertEqual(self.client.job(jid), None) + self.assertEqual(self.client.jobs[jid], None) def test_cancel_fail(self): # In this test, we want to make sure that if we fail a job @@ -937,9 +937,9 @@ def test_cancel_fail(self): jid = self.q.put(qless.Job, {'test': 'cancel_fail'}) job = self.q.pop() job.fail('foo', 'some message') - self.assertEqual(self.client.failed(), {'foo': 1}) + self.assertEqual(self.client.jobs.failed(), {'foo': 1}) job.cancel() - self.assertEqual(self.client.failed(), {}) + self.assertEqual(self.client.jobs.failed(), {}) def test_complete(self): # In this test, we want to make sure that a job that has been @@ -955,18 +955,18 @@ def test_complete(self): job = self.q.pop() self.assertNotEqual(job, None) self.assertEqual(job.complete(), 'complete') - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(math.floor(job.history[-1]['done']), math.floor(time.time())) self.assertEqual(job.state , 'complete') self.assertEqual(job.worker , '') self.assertEqual(job.queue_name, '') self.assertEqual(len(self.q), 0) - self.assertEqual(self.client.complete(), [jid]) + self.assertEqual(self.client.jobs.complete(), [jid]) # Now, if we move job back into a queue, we shouldn't see any # completed jobs anymore job.move('testing') - self.assertEqual(self.client.complete(), []) + self.assertEqual(self.client.jobs.complete(), []) def test_complete_advance(self): # In this test, we want to make sure that a job that has been @@ -983,7 +983,7 @@ def test_complete_advance(self): job = self.q.pop() self.assertNotEqual(job, None) self.assertEqual(job.complete('testing'), 'waiting') - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(len(job.history), 2) self.assertEqual(math.floor(job.history[-2]['done']), math.floor(time.time())) self.assertEqual(math.floor(job.history[-1]['put' ]), math.floor(time.time())) @@ -1010,7 +1010,7 @@ def test_complete_fail(self): self.assertNotEqual(bjob, None) self.assertEqual(ajob.complete(), False) self.assertEqual(bjob.complete(), 'complete') - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(math.floor(job.history[-1]['done']), math.floor(time.time())) self.assertEqual(job.state , 'complete') self.assertEqual(job.worker , '') @@ -1025,7 +1025,7 @@ def test_complete_state(self): # 3) Attempt to complete the job, ensure it fails self.assertEqual(len(self.q), 0, 'Start with an empty queue') jid = self.q.put(qless.Job, {'test': 'complete_fail'}) - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(job.complete('testing'), False) def test_complete_queues(self): @@ -1188,15 +1188,15 @@ def test_track(self): # 2) Put, Track a job, check # 3) Untrack job, check # 4) Track job, cancel, check - self.assertEqual(self.client.tracked(), {'expired':{}, 'jobs':[]}) - job = self.client.job(self.q.put(qless.Job, {'test':'track'})) + self.assertEqual(self.client.jobs.tracked(), {'expired':{}, 'jobs':[]}) + job = self.client.jobs[self.q.put(qless.Job, {'test':'track'})] job.track() - self.assertEqual(len(self.client.tracked()['jobs']), 1) + self.assertEqual(len(self.client.jobs.tracked()['jobs']), 1) job.untrack() - self.assertEqual(len(self.client.tracked()['jobs']), 0) + self.assertEqual(len(self.client.jobs.tracked()['jobs']), 0) job.track() job.cancel() - self.assertEqual(len(self.client.tracked()['expired']), 1) + self.assertEqual(len(self.client.jobs.tracked()['expired']), 1) def test_track_tracked(self): # When peeked, popped, failed, etc., qless should know when a @@ -1205,13 +1205,13 @@ def test_track_tracked(self): # => 2) Peek, ensure tracked # => 3) Pop, ensure tracked # => 4) Fail, check failed, ensure tracked - job = self.client.job(self.q.put(qless.Job, {'test': 'track_tracked'})) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'track_tracked'})] job.track() self.assertEqual(self.q.peek().tracked, True) job = self.q.pop() self.assertEqual(job.tracked, True) job.fail('foo', 'bar') - self.assertEqual(self.client.failed('foo')['jobs'][0].tracked, True) + self.assertEqual(self.client.jobs.failed('foo')['jobs'][0].tracked, True) def test_track_untracked(self): # When peeked, popped, failed, etc., qless should know when a @@ -1220,12 +1220,12 @@ def test_track_untracked(self): # => 2) Peek, ensure tracked # => 3) Pop, ensure tracked # => 4) Fail, check failed, ensure tracked - job = self.client.job(self.q.put(qless.Job, {'test': 'track_tracked'})) + job = self.client.jobs[self.q.put(qless.Job, {'test': 'track_tracked'})] self.assertEqual(self.q.peek().tracked, False) job = self.q.pop() self.assertEqual(job.tracked, False) job.fail('foo', 'bar') - self.assertEqual(self.client.failed('foo')['jobs'][0].tracked, False) + self.assertEqual(self.client.jobs.failed('foo')['jobs'][0].tracked, False) def test_retries(self): # In this test, we want to make sure that jobs are given a @@ -1236,19 +1236,19 @@ def test_retries(self): # 3) Lose the heartbeat as many times # 4) Verify there are failures # 5) Verify the queue is empty - self.assertEqual(self.client.failed(), {}) + self.assertEqual(self.client.jobs.failed(), {}) self.q.put(qless.Job, {'test':'retries'}, retries=2) # Easier to lose the heartbeat lock self.client.config.set('heartbeat', -10) self.assertNotEqual(self.q.pop(), None) - self.assertEqual(self.client.failed(), {}) + self.assertEqual(self.client.jobs.failed(), {}) self.assertNotEqual(self.q.pop(), None) - self.assertEqual(self.client.failed(), {}) + self.assertEqual(self.client.jobs.failed(), {}) self.assertNotEqual(self.q.pop(), None) - self.assertEqual(self.client.failed(), {}) + self.assertEqual(self.client.jobs.failed(), {}) # This one should do it self.assertEqual(self.q.pop(), None) - self.assertEqual(self.client.failed(), {'failed-retries-testing':1}) + self.assertEqual(self.client.jobs.failed(), {'failed-retries-testing':1}) def test_retries_complete(self): # In this test, we want to make sure that jobs have their number @@ -1265,7 +1265,7 @@ def test_retries_complete(self): job = self.q.pop() self.assertEqual(job.retries_left, 1) job.complete() - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(job.retries_left, 2) def test_retries_put(self): @@ -1283,7 +1283,7 @@ def test_retries_put(self): job = self.q.pop() self.assertEqual(job.retries_left, 1) job.move('testing') - job = self.client.job(jid) + job = self.client.jobs[jid] self.assertEqual(job.retries_left, 2) def test_stats_failed(self): @@ -1557,7 +1557,7 @@ def test_running_stalled_scheduled_depends(self): r = [job.complete() for job in jobs] jids = [job.jid for job in jobs] jids.reverse() - self.assertEqual(self.client.complete(0, 10) + self.client.complete(10, 10), jids) + self.assertEqual(self.client.jobs.complete(0, 10) + self.client.jobs.complete(10, 10), jids) jids = [self.q.put(qless.Job, {'test': 'rssd'}, delay=60) for i in range(20)] self.assertEqual(set(self.q.jobs.scheduled(0, 10) + self.q.jobs.scheduled(10, 10)), set(jids))