Skip to content

Commit

Permalink
Merge pull request #189 from seomoz/spawned_from
Browse files Browse the repository at this point in the history
Spawned from
  • Loading branch information
myronmarston committed May 19, 2014
2 parents 258f283 + cd64bff commit a05faff
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 90 deletions.
12 changes: 9 additions & 3 deletions lib/qless/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def hash

# A Qless job
class Job < BaseJob
attr_reader :jid, :expires_at, :state, :queue_name, :worker_name, :failure
attr_reader :jid, :expires_at, :state, :queue_name, :worker_name, :failure, :spawned_from_jid
attr_reader :klass_name, :tracked, :dependencies, :dependents
attr_reader :original_retries, :retries_left, :raw_queue_history
attr_reader :state_changed
Expand Down Expand Up @@ -84,6 +84,7 @@ def perform
def self.build(client, klass, attributes = {})
defaults = {
'jid' => Qless.generate_jid,
'spawned_from_jid' => nil,
'data' => {},
'klass' => klass.to_s,
'priority' => 0,
Expand Down Expand Up @@ -115,8 +116,8 @@ def self.middlewares_on(job_klass)
def initialize(client, atts)
super(client, atts.fetch('jid'))
%w{jid data priority tags state tracked
failure dependencies dependents}.each do |att|
instance_variable_set("@#{att}".to_sym, atts.fetch(att))
failure dependencies dependents spawned_from_jid}.each do |att|
instance_variable_set(:"@#{att}", atts.fetch(att))
end

# Parse the data string
Expand Down Expand Up @@ -195,9 +196,14 @@ def initially_put_at
@initially_put_at ||= history_timestamp('put', :min)
end

def spawned_from
@spawned_from ||= @client.jobs[@spawned_from_jid]
end

def to_hash
{
jid: jid,
spawned_from_jid: spawned_from_jid,
expires_at: expires_at,
state: state,
queue_name: queue_name,
Expand Down
84 changes: 44 additions & 40 deletions lib/qless/lua/qless-lib.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Current SHA: f7ef735105ade320fef8f621bf264851f246924a
-- Current SHA: a9ba61342d31e1671e268c2a2d3bac1269795e87
-- This is a generated file
-------------------------------------------------------------------------------
-- Forward declarations to make everything happy
Expand Down Expand Up @@ -266,8 +266,7 @@ function Qless.tag(now, command, ...)
redis.call('zincrby', 'ql:tags', 1, tag)
end

tags = cjson.encode(tags)
redis.call('hset', QlessJob.ns .. jid, 'tags', tags)
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags))
return tags
else
error('Tag(): Job ' .. jid .. ' does not exist')
Expand All @@ -293,8 +292,7 @@ function Qless.tag(now, command, ...)
local results = {}
for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end

tags = cjson.encode(results)
redis.call('hset', QlessJob.ns .. jid, 'tags', tags)
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results))
return results
else
error('Tag(): Job ' .. jid .. ' does not exist')
Expand Down Expand Up @@ -492,32 +490,33 @@ function QlessJob:data(...)
local job = redis.call(
'hmget', QlessJob.ns .. self.jid, 'jid', 'klass', 'state', 'queue',
'worker', 'priority', 'expires', 'retries', 'remaining', 'data',
'tags', 'failure')
'tags', 'failure', 'spawned_from_jid')

-- Return nil if we haven't found it
if not job[1] then
return nil
end

local data = {
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = redis.call(
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = redis.call(
'zscore', 'ql:tracked', self.jid) ~= false,
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = math.floor(tonumber(job[9])),
data = job[10],
tags = cjson.decode(job[11]),
history = self:history(),
failure = cjson.decode(job[12] or '{}'),
dependents = redis.call(
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = math.floor(tonumber(job[9])),
data = job[10],
tags = cjson.decode(job[11]),
history = self:history(),
failure = cjson.decode(job[12] or '{}'),
spawned_from_jid = job[13],
dependents = redis.call(
'smembers', QlessJob.ns .. self.jid .. '-dependents'),
dependencies = redis.call(
dependencies = redis.call(
'smembers', QlessJob.ns .. self.jid .. '-dependencies')
}

Expand Down Expand Up @@ -577,9 +576,9 @@ function QlessJob:complete(now, worker, queue, data, ...)
local bin = now - (now % 86400)

-- First things first, we should see if the worker still owns this job
local lastworker, state, priority, retries = unpack(
local lastworker, state, priority, retries, current_queue = unpack(
redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state',
'priority', 'retries', 'dependents'))
'priority', 'retries', 'queue'))

if lastworker == false then
error('Complete(): Job does not exist')
Expand All @@ -588,6 +587,9 @@ function QlessJob:complete(now, worker, queue, data, ...)
elseif lastworker ~= worker then
error('Complete(): Job has been handed out to another worker: ' ..
tostring(lastworker))
elseif queue ~= current_queue then
error('Complete(): Job running in another queue: ' ..
tostring(current_queue))
end

-- Now we can assume that the worker does own the job. We need to
Expand Down Expand Up @@ -1911,7 +1913,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...)
local offset = assert(tonumber(arg[2]),
'Recur(): Arg "offset" not a number: ' .. tostring(arg[2]))
if interval <= 0 then
error('Recur(): Arg "interval" must be greater than or equal to 0')
error('Recur(): Arg "interval" must be greater than 0')
end

-- Read in all the optional parameters. All of these must come in
Expand Down Expand Up @@ -2021,35 +2023,37 @@ function QlessQueue:check_recurring(now, count)
while (score <= now) and (moved < count) do
local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
moved = moved + 1

local child_jid = jid .. '-' .. count

-- Add this job to the list of jobs tagged with whatever tags were
-- supplied
for i, tag in ipairs(_tags) do
redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count)
redis.call('zadd', 'ql:t:' .. tag, now, child_jid)
redis.call('zincrby', 'ql:tags', 1, tag)
end

-- First, let's save its data
local child_jid = jid .. '-' .. count
redis.call('hmset', QlessJob.ns .. child_jid,
'jid' , jid .. '-' .. count,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining', retries,
'time' , string.format("%.20f", score))
'jid' , child_jid,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining' , retries,
'time' , string.format("%.20f", score),
'spawned_from_jid', jid)
Qless.job(child_jid):history(score, 'put', {q = self.name})

-- Now, if a delay was provided, and if it's in the future,
-- then we'll have to schedule it. Otherwise, we're just
-- going to add it to the work queue.
self.work.add(score, priority, jid .. '-' .. count)
self.work.add(score, priority, child_jid)

score = score + interval
self.recurring.add(score, jid)
Expand Down
84 changes: 44 additions & 40 deletions lib/qless/lua/qless.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Current SHA: f7ef735105ade320fef8f621bf264851f246924a
-- Current SHA: a9ba61342d31e1671e268c2a2d3bac1269795e87
-- This is a generated file
local Qless = {
ns = 'ql:'
Expand Down Expand Up @@ -158,8 +158,7 @@ function Qless.tag(now, command, ...)
redis.call('zincrby', 'ql:tags', 1, tag)
end

tags = cjson.encode(tags)
redis.call('hset', QlessJob.ns .. jid, 'tags', tags)
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags))
return tags
else
error('Tag(): Job ' .. jid .. ' does not exist')
Expand All @@ -182,8 +181,7 @@ function Qless.tag(now, command, ...)
local results = {}
for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end

tags = cjson.encode(results)
redis.call('hset', QlessJob.ns .. jid, 'tags', tags)
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results))
return results
else
error('Tag(): Job ' .. jid .. ' does not exist')
Expand Down Expand Up @@ -338,31 +336,32 @@ function QlessJob:data(...)
local job = redis.call(
'hmget', QlessJob.ns .. self.jid, 'jid', 'klass', 'state', 'queue',
'worker', 'priority', 'expires', 'retries', 'remaining', 'data',
'tags', 'failure')
'tags', 'failure', 'spawned_from_jid')

if not job[1] then
return nil
end

local data = {
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = redis.call(
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = redis.call(
'zscore', 'ql:tracked', self.jid) ~= false,
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = math.floor(tonumber(job[9])),
data = job[10],
tags = cjson.decode(job[11]),
history = self:history(),
failure = cjson.decode(job[12] or '{}'),
dependents = redis.call(
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = math.floor(tonumber(job[9])),
data = job[10],
tags = cjson.decode(job[11]),
history = self:history(),
failure = cjson.decode(job[12] or '{}'),
spawned_from_jid = job[13],
dependents = redis.call(
'smembers', QlessJob.ns .. self.jid .. '-dependents'),
dependencies = redis.call(
dependencies = redis.call(
'smembers', QlessJob.ns .. self.jid .. '-dependencies')
}

Expand Down Expand Up @@ -401,9 +400,9 @@ function QlessJob:complete(now, worker, queue, data, ...)

local bin = now - (now % 86400)

local lastworker, state, priority, retries = unpack(
local lastworker, state, priority, retries, current_queue = unpack(
redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state',
'priority', 'retries', 'dependents'))
'priority', 'retries', 'queue'))

if lastworker == false then
error('Complete(): Job does not exist')
Expand All @@ -412,6 +411,9 @@ function QlessJob:complete(now, worker, queue, data, ...)
elseif lastworker ~= worker then
error('Complete(): Job has been handed out to another worker: ' ..
tostring(lastworker))
elseif queue ~= current_queue then
error('Complete(): Job running in another queue: ' ..
tostring(current_queue))
end

self:history(now, 'done')
Expand Down Expand Up @@ -1413,7 +1415,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...)
local offset = assert(tonumber(arg[2]),
'Recur(): Arg "offset" not a number: ' .. tostring(arg[2]))
if interval <= 0 then
error('Recur(): Arg "interval" must be greater than or equal to 0')
error('Recur(): Arg "interval" must be greater than 0')
end

if #arg % 2 == 1 then
Expand Down Expand Up @@ -1495,29 +1497,31 @@ function QlessQueue:check_recurring(now, count)
while (score <= now) and (moved < count) do
local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
moved = moved + 1

local child_jid = jid .. '-' .. count

for i, tag in ipairs(_tags) do
redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count)
redis.call('zadd', 'ql:t:' .. tag, now, child_jid)
redis.call('zincrby', 'ql:tags', 1, tag)
end

local child_jid = jid .. '-' .. count
redis.call('hmset', QlessJob.ns .. child_jid,
'jid' , jid .. '-' .. count,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining', retries,
'time' , string.format("%.20f", score))
'jid' , child_jid,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining' , retries,
'time' , string.format("%.20f", score),
'spawned_from_jid', jid)
Qless.job(child_jid):history(score, 'put', {q = self.name})

self.work.add(score, priority, jid .. '-' .. count)
self.work.add(score, priority, child_jid)

score = score + interval
self.recurring.add(score, jid)
Expand Down
2 changes: 1 addition & 1 deletion lib/qless/qless-core
12 changes: 12 additions & 0 deletions spec/integration/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ class NoPerformJob; end
expect(history[1]['what']).to eq('hello')
expect(history[2]['foo']).to eq('bar')
end

it 'returns the source recurring job from `spawned_from`' do
queue.recur('Foo', {}, 1, jid: 'recurring-jid', offset: -1)
recurring_job = client.jobs['recurring-jid']
expect(recurring_job).to be_a(RecurringJob)
expect(queue.pop.spawned_from).to eq(recurring_job)
end

it 'returns nil from `spawned_from` when it is not a recurring job' do
queue.put('Foo', {}, jid: 'jid')
expect(client.jobs['jid'].spawned_from).to be_nil
end
end

describe RecurringJob, :integration do
Expand Down
Loading

0 comments on commit a05faff

Please sign in to comment.