Skip to content

Commit

Permalink
Merge pull request #60 from seomoz/cbf/include-jid-in-error-message
Browse files Browse the repository at this point in the history
Include relevant jid in all error messages related to job invalid state.
  • Loading branch information
b4hand committed Jul 31, 2015
2 parents 6a1ab6b + e1de3b1 commit ee1ca9e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
37 changes: 22 additions & 15 deletions job.lua
Expand Up @@ -102,14 +102,15 @@ function QlessJob:complete(now, worker, queue, raw_data, ...)
'priority', 'retries', 'queue'))

if lastworker == false then
error('Complete(): Job does not exist')
error('Complete(): Job ' .. self.jid .. ' does not exist')
elseif (state ~= 'running') then
error('Complete(): Job is not currently running: ' .. state)
error('Complete(): Job ' .. self.jid .. ' is not currently running: ' ..
state)
elseif lastworker ~= worker then
error('Complete(): Job has been handed out to another worker: ' ..
tostring(lastworker))
error('Complete(): Job ' .. self.jid ..
' has been handed out to another worker: ' .. tostring(lastworker))
elseif queue ~= current_queue then
error('Complete(): Job running in another queue: ' ..
error('Complete(): Job ' .. self.jid .. ' running in another queue: ' ..
tostring(current_queue))
end

Expand Down Expand Up @@ -342,11 +343,12 @@ function QlessJob:fail(now, worker, group, message, data)

-- If the job has been completed, we cannot fail it
if not state then
error('Fail(): Job does not exist')
error('Fail(): Job ' .. self.jid .. 'does not exist')
elseif state ~= 'running' then
error('Fail(): Job not currently running: ' .. state)
error('Fail(): Job ' .. self.jid .. 'not currently running: ' .. state)
elseif worker ~= oldworker then
error('Fail(): Job running with another worker: ' .. oldworker)
error('Fail(): Job ' .. self.jid .. ' running with another worker: ' ..
oldworker)
end

-- Send out a log message
Expand Down Expand Up @@ -439,11 +441,13 @@ function QlessJob:retry(now, queue, worker, delay, group, message)

-- If this isn't the worker that owns
if oldworker == false then
error('Retry(): Job does not exist')
error('Retry(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
error('Retry(): Job is not currently running: ' .. state)
error('Retry(): Job ' .. self.jid .. ' is not currently running: ' ..
state)
elseif oldworker ~= worker then
error('Retry(): Job has been given to another worker: ' .. oldworker)
error('Retry(): Job ' .. self.jid ..
' has been given to another worker: ' .. oldworker)
end

-- For each of these, decrement their retries. If any of them
Expand Down Expand Up @@ -624,11 +628,14 @@ function QlessJob:heartbeat(now, worker, data)
redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state'))
if job_worker == false then
-- This means the job doesn't exist
error('Heartbeat(): Job does not exist')
error('Heartbeat(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
error('Heartbeat(): Job not currently running: ' .. state)
error(
'Heartbeat(): Job ' .. self.jid .. ' not currently running: ' .. state)
elseif job_worker ~= worker or #job_worker == 0 then
error('Heartbeat(): Job given out to another worker: ' .. job_worker)
error(
'Heartbeat(): Job ' .. self.jid ..
' given out to another worker: ' .. job_worker)
else
-- Otherwise, optionally update the user data, and the heartbeat
if data then
Expand Down Expand Up @@ -699,7 +706,7 @@ function QlessJob:timeout(now)
local queue_name, state, worker = unpack(redis.call('hmget',
QlessJob.ns .. self.jid, 'queue', 'state', 'worker'))
if queue_name == nil then
error('Timeout(): Job does not exist')
error('Timeout(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
error('Timeout(): Job ' .. self.jid .. ' not running')
else
Expand Down
2 changes: 1 addition & 1 deletion test/test_job.py
Expand Up @@ -252,7 +252,7 @@ def test_cancel_running(self):
self.lua('pop', 1, 'queue', 'worker', 10)
self.lua('heartbeat', 2, 'jid', 'worker', {})
self.lua('cancel', 3, 'jid')
self.assertRaisesRegexp(redis.ResponseError, r'Job does not exist',
self.assertRaisesRegexp(redis.ResponseError, r'Job jid does not exist',
self.lua, 'heartbeat', 4, 'jid', 'worker', {})

def test_cancel_retries(self):
Expand Down

0 comments on commit ee1ca9e

Please sign in to comment.