Skip to content

Commit

Permalink
Expose a spawned_from_jid attribute of a job.
Browse files Browse the repository at this point in the history
This returns the jid of the recurring job that
spawned a particular job, if there is one.
  • Loading branch information
myronmarston committed May 19, 2014
1 parent 591b6d3 commit 75fbdd9
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 40 deletions.
35 changes: 18 additions & 17 deletions job.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,33 @@ function QlessJob:data(...)
local job = redis.call(
'hmget', QlessJob.ns .. self.jid, 'jid', 'klass', 'state', 'queue',
'worker', 'priority', 'expires', 'retries', 'remaining', 'data',
'tags', 'failure')
'tags', 'failure', 'spawned_from_jid')

-- Return nil if we haven't found it
if not job[1] then
return nil
end

local data = {
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = redis.call(
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = redis.call(
'zscore', 'ql:tracked', self.jid) ~= false,
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = math.floor(tonumber(job[9])),
data = job[10],
tags = cjson.decode(job[11]),
history = self:history(),
failure = cjson.decode(job[12] or '{}'),
dependents = redis.call(
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = math.floor(tonumber(job[9])),
data = job[10],
tags = cjson.decode(job[11]),
history = self:history(),
failure = cjson.decode(job[12] or '{}'),
spawned_from_jid = job[13],
dependents = redis.call(
'smembers', QlessJob.ns .. self.jid .. '-dependents'),
dependencies = redis.call(
dependencies = redis.call(
'smembers', QlessJob.ns .. self.jid .. '-dependencies')
}

Expand Down
25 changes: 13 additions & 12 deletions queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -764,18 +764,19 @@ function QlessQueue:check_recurring(now, count)

-- First, let's save its data
redis.call('hmset', QlessJob.ns .. child_jid,
'jid' , child_jid,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining', retries,
'time' , string.format("%.20f", score))
'jid' , child_jid,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining' , retries,
'time' , string.format("%.20f", score),
'spawned_from_jid', jid)
Qless.job(child_jid):history(score, 'put', {q = self.name})

-- Now, if a delay was provided, and if it's in the future,
Expand Down
3 changes: 2 additions & 1 deletion test/test_fail.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def test_basic(self):
'state': 'failed',
'tags': {},
'tracked': False,
'worker': u''})
'worker': u'',
'spawned_from_jid': False})

def test_put(self):
'''Can put a job that has been failed'''
Expand Down
3 changes: 2 additions & 1 deletion test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def test_basic(self):
'state': 'complete',
'tags': {},
'tracked': False,
'worker': u''})
'worker': u'',
'spawned_from_jid': False})

def test_advance(self):
'''Can complete and advance a job in one fell swooop'''
Expand Down
9 changes: 6 additions & 3 deletions test/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def test_lose_lock(self):
'state': 'running',
'tags': {},
'tracked': False,
'worker': 'another'}])
'worker': 'another',
'spawned_from_jid': False}])
# When we try to heartbeat, it should raise an exception
self.assertRaisesRegexp(redis.ResponseError, r'given out to another',
self.lua, 'heartbeat', 1000, 'jid', 'worker', {})
Expand Down Expand Up @@ -274,7 +275,8 @@ def test_retry_group_message(self):
'state': 'failed',
'tags': {},
'tracked': False,
'worker': u''})
'worker': u'',
'spawned_from_jid': False})

def test_retry_delay(self):
'''Can retry a job with a delay and then it's considered scheduled'''
Expand Down Expand Up @@ -323,7 +325,8 @@ def test_retry_failed_retries(self):
'state': 'failed',
'tags': {},
'tracked': False,
'worker': u''
'worker': u'',
'spawned_from_jid': False
})


Expand Down
12 changes: 8 additions & 4 deletions test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ def test_basic(self):
'state': 'waiting',
'tags': {},
'tracked': False,
'worker': u''
'worker': u'',
'spawned_from_jid': False
})

def test_data_as_array(self):
Expand Down Expand Up @@ -395,7 +396,8 @@ def test_move(self):
'state': 'waiting',
'tags': {},
'tracked': False,
'worker': u''})
'worker': u'',
'spawned_from_jid': False})

def test_move_update(self):
'''When moving, ensure data's only changed when overridden'''
Expand Down Expand Up @@ -463,7 +465,8 @@ def test_basic(self):
'state': 'waiting',
'tags': {},
'tracked': False,
'worker': u''
'worker': u'',
'spawned_from_jid': False
}])
# With several jobs in the queue, we should be able to see more
self.lua('put', 2, 'worker', 'foo', 'jid2', 'klass', {}, 0)
Expand Down Expand Up @@ -549,7 +552,8 @@ def test_basic(self):
'state': 'running',
'tags': {},
'tracked': False,
'worker': 'worker'}])
'worker': 'worker',
'spawned_from_jid': False}])

def test_pop_many(self):
'''We should be able to pop off many jobs'''
Expand Down
8 changes: 6 additions & 2 deletions test/test_recurring.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ def test_basic(self):
popped = self.lua('pop', 0, 'queue', 'worker', 10)
self.assertEqual(len(popped), 1)
self.assertEqual(popped[0]['jid'], 'jid-1')
self.assertEqual(popped[0]['spawned_from_jid'], 'jid')

# If we wait 59 seconds, there won't be a job, but at 60, yes
popped = self.lua('pop', 59, 'queue', 'worker', 10)
self.assertEqual(len(popped), 0)
popped = self.lua('pop', 61, 'queue', 'worker', 10)
self.assertEqual(len(popped), 1)
self.assertEqual(popped[0]['jid'], 'jid-2')
self.assertEqual(popped[0]['spawned_from_jid'], 'jid')

def test_offset(self):
'''We can set an offset from now for jobs to recur on'''
Expand Down Expand Up @@ -260,7 +262,8 @@ def test_rerecur_attributes(self):
'state': 'running',
'tags': ['foo'],
'tracked': False,
'worker': 'worker'})
'worker': 'worker',
'spawned_from_jid': 'jid'})
self.lua('recur', 60, 'queue', 'jid', 'class', {'foo': 'bar'},
'interval', 10, 0, 'priority', 5, 'tags', ['bar'], 'retries', 5)
self.assertEqual(self.lua('pop', 60, 'queue', 'worker', 10)[0], {
Expand All @@ -280,7 +283,8 @@ def test_rerecur_attributes(self):
'state': 'running',
'tags': ['bar'],
'tracked': False,
'worker': 'worker'})
'worker': 'worker',
'spawned_from_jid': 'jid'})

def test_rerecur_move(self):
'''Re-recurring a job in a new queue works like a move'''
Expand Down
1 change: 1 addition & 0 deletions test/test_track.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_track(self):
'dependents': {},
'data': '{}',
'remaining': 5,
'spawned_from_jid': False,
'history': [{
'q': 'queue', 'what': 'put', 'when': 0
}]
Expand Down

0 comments on commit 75fbdd9

Please sign in to comment.