Permalink
Browse files

Intial support for recurring jobs.

  • Loading branch information...
1 parent 701e1cc commit 3cafc675c4561851dff1b5410888836f1d6ab869 Dan Lecocq committed May 11, 2012
Showing with 256 additions and 8 deletions.
  1. +3 −1 jobs.lua
  2. +53 −1 peek.lua
  3. +54 −3 pop.lua
  4. +6 −3 queues.lua
  5. +140 −0 recur.lua
View
@@ -1,4 +1,4 @@
--- Jobs(0, 'complete' | (('stalled' | 'running' | 'scheduled' | 'depends'), now, queue) [offset, [count]])
+-- Jobs(0, 'complete' | (('stalled' | 'running' | 'scheduled' | 'depends', 'recurring'), now, queue) [offset, [count]])
-- -------------------------------------------------------------------------------------------------------
--
-- Return all the job ids currently considered to be in the provided state
@@ -33,6 +33,8 @@ else
return redis.call('zrange', 'ql:q:' .. queue .. '-scheduled', offset, offset + count - 1)
elseif t == 'depends' then
return redis.call('zrange', 'ql:q:' .. queue .. '-depends', offset, offset + count - 1)
+ elseif t == 'recurring' then
+ return redis.call('zrange', 'ql:q:' .. queue .. '-recur', offset, offset + count - 1)
else
error('Jobs(): Unknown type "' .. t .. '"')
end
View
@@ -17,7 +17,8 @@ if #KEYS ~= 1 then
end
end
-local key = assert('ql:q:' .. KEYS[1], 'Peek(): Key "queue" missing')
+local queue = assert(KEYS[1] , 'Peek(): Key "queue" missing')
+local key = 'ql:q:' .. queue
local count = assert(tonumber(ARGV[1]) , 'Peek(): Arg "count" missing or not a number: ' .. (ARGV[2] or 'nil'))
local now = assert(tonumber(ARGV[2]) , 'Peek(): Arg "now" missing or not a number: ' .. (ARGV[3] or 'nil'))
@@ -30,6 +31,57 @@ for index, jid in ipairs(redis.call('zrangebyscore', key .. '-locks', 0, now, 'L
table.insert(keys, jid)
end
+-- If we still need jobs in order to meet demand, then we should
+-- look for all the recurring jobs that need jobs run
+if #keys < count then
+ local r = redis.call('zrangebyscore', key .. '-recur', 0, now)
+ for index, jid in ipairs(r) do
+ -- For each of the jids that need jobs scheduled, first
+ -- get the last time each of them was run, and then increment
+ -- it by its interval. While this time is less than now,
+ -- we need to keep putting jobs on the queue
+ local klass, data, priority, tags, retries, interval, last_ran = unpack(redis.call('hmget', 'ql:r:' .. jid, 'klass', 'data', 'priority', 'tags', 'retries', 'interval', 'last-ran'))
+ local _tags = cjson.decode(tags)
+ last_ran = tonumber(last_ran)
+
+ while last_ran < now do
+ local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
+
+ -- Add this job to the list of jobs tagged with whatever tags were supplied
+ for i, tag in ipairs(_tags) do
+ redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count)
+ redis.call('zincrby', 'ql:tags', 1, tag)
+ end
+
+ -- First, let's save its data
+ redis.call('hmset', 'ql:j:' .. jid .. '-' .. count,
+ 'jid' , jid .. '-' .. count,
+ 'klass' , klass,
+ 'data' , data,
+ 'priority' , priority,
+ 'tags' , tags,
+ 'state' , 'waiting',
+ 'worker' , '',
+ 'expires' , 0,
+ 'queue' , queue,
+ 'retries' , retries,
+ 'remaining', retries,
+ 'history' , cjson.encode({{
+ q = queue,
+ put = math.floor(now)
+ }}))
+
+ -- Now, if a delay was provided, and if it's in the future,
+ -- then we'll have to schedule it. Otherwise, we're just
+ -- going to add it to the work queue.
+ redis.call('zadd', 'ql:q:' .. queue .. '-work', priority + (now / 10000000000), jid .. '-' .. count)
+
+ last_ran = redis.call('hincrby', 'ql:r:' .. jid, 'last-ran', interval)
+ end
+ redis.call('zadd', 'ql:q:' .. queue .. '-recur', last_ran, jid)
+ end
+end
+
-- Now we've checked __all__ the locks for this queue the could
-- have expired, and are no more than the number requested. If
-- we still need values in order to meet the demand, then we
View
57 pop.lua
@@ -79,14 +79,65 @@ for index, jid in ipairs(redis.call('zrangebyscore', key .. '-locks', 0, now, 'L
local w = redis.call('hget', 'ql:j:' .. jid, 'worker')
redis.call('zrem', 'ql:w:' .. w .. ':jobs', jid)
end
+-- Now we've checked __all__ the locks for this queue the could
+-- have expired, and are no more than the number requested.
-- If we got any expired locks, then we should increment the
-- number of retries for this stage for this bin
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'retries', #keys)
--- Now we've checked __all__ the locks for this queue the could
--- have expired, and are no more than the number requested. If
--- we still need values in order to meet the demand, then we
+-- If we still need jobs in order to meet demand, then we should
+-- look for all the recurring jobs that need jobs run
+if #keys < count then
+ local r = redis.call('zrangebyscore', key .. '-recur', 0, now)
+ for index, jid in ipairs(r) do
+ -- For each of the jids that need jobs scheduled, first
+ -- get the last time each of them was run, and then increment
+ -- it by its interval. While this time is less than now,
+ -- we need to keep putting jobs on the queue
+ local klass, data, priority, tags, retries, interval, last_ran = unpack(redis.call('hmget', 'ql:r:' .. jid, 'klass', 'data', 'priority', 'tags', 'retries', 'interval', 'last-ran'))
+ local _tags = cjson.decode(tags)
+ last_ran = tonumber(last_ran)
+
+ while last_ran < now do
+ local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
+
+ -- Add this job to the list of jobs tagged with whatever tags were supplied
+ for i, tag in ipairs(_tags) do
+ redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count)
+ redis.call('zincrby', 'ql:tags', 1, tag)
+ end
+
+ -- First, let's save its data
+ redis.call('hmset', 'ql:j:' .. jid .. '-' .. count,
+ 'jid' , jid .. '-' .. count,
+ 'klass' , klass,
+ 'data' , data,
+ 'priority' , priority,
+ 'tags' , tags,
+ 'state' , 'waiting',
+ 'worker' , '',
+ 'expires' , 0,
+ 'queue' , queue,
+ 'retries' , retries,
+ 'remaining', retries,
+ 'history' , cjson.encode({{
+ q = queue,
+ put = math.floor(now)
+ }}))
+
+ -- Now, if a delay was provided, and if it's in the future,
+ -- then we'll have to schedule it. Otherwise, we're just
+ -- going to add it to the work queue.
+ redis.call('zadd', 'ql:q:' .. queue .. '-work', priority + (now / 10000000000), jid .. '-' .. count)
+
+ last_ran = redis.call('hincrby', 'ql:r:' .. jid, 'last-ran', interval)
+ end
+ redis.call('zadd', 'ql:q:' .. queue .. '-recur', last_ran, jid)
+ end
+end
+
+-- If we still need values in order to meet the demand, then we
-- should check if any scheduled items, and if so, we should
-- insert them to ensure correctness when pulling off the next
-- unit of work.
View
@@ -12,7 +12,8 @@
-- 'waiting': 5,
-- 'running': 5,
-- 'scheduled': 10,
--- 'depends': 5
+-- 'depends': 5,
+-- 'recurring': 0
-- }, {
-- ...
-- }
@@ -36,7 +37,8 @@ if queue then
stalled = stalled,
running = redis.call('zcard', 'ql:q:' .. queue .. '-locks') - stalled,
scheduled = redis.call('zcard', 'ql:q:' .. queue .. '-scheduled'),
- depends = redis.call('zcard', 'ql:q:' .. queue .. '-depends')
+ depends = redis.call('zcard', 'ql:q:' .. queue .. '-depends'),
+ recurring = redis.call('zcard', 'ql:q:' .. queue .. '-recur')
}
else
for index, qname in ipairs(queuenames) do
@@ -47,7 +49,8 @@ else
stalled = stalled,
running = redis.call('zcard', 'ql:q:' .. qname .. '-locks') - stalled,
scheduled = redis.call('zcard', 'ql:q:' .. qname .. '-scheduled'),
- depends = redis.call('zcard', 'ql:q:' .. qname .. '-depends')
+ depends = redis.call('zcard', 'ql:q:' .. qname .. '-depends'),
+ recurring = redis.call('zcard', 'ql:q:' .. qname .. '-recur')
})
end
end
View
140 recur.lua
@@ -0,0 +1,140 @@
+-- Recur(0, 'on', queue, jid, klass, data, now, 'interval', second, offset, [priority p], [tags t], [retries r])
+-- Recur(0, 'off', jid)
+-- Recur(0, 'get', jid)
+-- Recur(0, 'update', jid, ['priority', priority], ['interval', interval], ['retries', retries], ['data', data], ['klass', klass], ['queue', queue])
+-- Recur(0, 'tag', jid, tag, [tag, [...]])
+-- Recur(0, 'untag', jid, tag, [tag, [...]])
+-- -------------------------------------------------------------------------------------------------------
+-- This script takes the name of a queue, and then the info
+-- info about the work item, and makes sure that jobs matching
+-- its criteria are regularly made available.
+
+if #KEYS ~= 0 then
+ error('Recur(): Got ' .. #KEYS .. ', expected 0 KEYS arguments')
+end
+
+local command = assert(ARGV[1] , 'Recur(): Missing first argument')
+
+if command == 'on' then
+ local queue = assert(ARGV[2] , 'Recur(): Arg "queue" missing')
+ local jid = assert(ARGV[3] , 'Recur(): Arg "jid" missing')
+ local klass = assert(ARGV[4] , 'Recur(): Arg "klass" missing')
+ local data = assert(cjson.decode(ARGV[5]) , 'Recur(): Arg "data" missing or not JSON: ' .. tostring(ARGV[5]))
+ local now = assert(tonumber(ARGV[6]) , 'Recur(): Arg "now" missing or not a number: ' .. tostring(ARGV[6]))
+ local spec = assert(ARGV[7] , 'Recur(): Arg "schedule type" missing')
+ if spec == 'interval' then
+ local interval = assert(tonumber(ARGV[8]) , 'Recur(): Arg "interval" must be a number: ' .. tostring(ARGV[8]))
+ local offset = assert(tonumber(ARGV[9]) , 'Recur(): Arg "offset" must be a number: ' .. tostring(ARGV[9]))
+ -- Read in all the optional parameters
+ local options = {}
+ for i = 10, #ARGV, 2 do options[ARGV[i]] = ARGV[i + 1] end
+ options.tags = assert(cjson.decode(options.tags or {}), 'Recur(): Arg "tags" must be JSON-encoded array of string. Got: ' .. tostring(options.tags))
+ options.priority = assert(tonumber(options.priority or 0) , 'Recur(): Arg "priority" must be a number. Got: ' .. tostring(options.priority))
+ options.retries = assert(tonumber(options.retries or 0) , 'Recur(): Arg "retries" must be a number. Got: ' .. tostring(options.retries))
+
+ -- Do some insertions
+ redis.call('hmset', 'ql:r:' .. jid,
+ 'jid' , jid,
+ 'klass' , klass,
+ 'data' , cjson.encode(data),
+ 'priority', options.priority,
+ 'tags' , cjson.encode(options.tags or {}),
+ 'state' , 'recur',
+ 'queue' , queue,
+ 'last-ran', math.floor(now + offset + interval),
+ 'type' , 'interval',
+ -- How many jobs we've spawned from this
+ 'count' , 0,
+ 'interval', interval,
+ 'retries' , options.retries)
+ -- Now, we should schedule the next run of the job
+ redis.call('zadd', 'ql:q:' .. queue .. '-recur', now + offset + interval, jid)
+
+ -- Lastly, we're going to make sure that this item is in the
+ -- set of known queues. We should keep this sorted by the
+ -- order in which we saw each of these queues
+ if redis.call('zscore', 'ql:queues', queue) == false then
+ redis.call('zadd', 'ql:queues', now, queue)
+ end
+
+ return jid
+ else
+ error('Recur(): schedule type "' .. tostring(spec) .. '" unknown')
+ end
+elseif command == 'off' then
+ local jid = assert(ARGV[2], 'Recur(): Arg "jid" missing')
+ -- First, find out what queue it was attached to
+ local queue = redis.call('hget', 'ql:r:' .. jid, 'queue')
+ if queue then
+ redis.call('echo', 'in queue: ' .. queue)
+ -- Now, delete it from the queue it was attached to, and delete the thing itself
+ redis.call('zrem', 'ql:q:' .. queue .. '-recur', jid)
+ redis.call('echo', 'Deleted ' .. jid .. ' from ' .. queue)
+ redis.call('del', 'ql:r:' .. jid)
+ return true
+ else
+ return true
+ end
+elseif command == 'get' then
+ local jid = assert(ARGV[2], 'Recur(): Arg "jid" missing')
+ local job = redis.call(
+ 'hmget', 'ql:r:' .. jid, 'jid', 'klass', 'state', 'queue',
+ 'priority', 'interval', 'retries', 'count', 'data', 'tags')
+
+ if not job[1] then
+ return false
+ end
+
+ return cjson.encode({
+ jid = job[1],
+ klass = job[2],
+ state = job[3],
+ queue = job[4],
+ priority = tonumber(job[5]),
+ interval = tonumber(job[6]),
+ retries = tonumber(job[7]),
+ count = tonumber(job[8]),
+ data = cjson.decode(job[9]),
+ tags = cjson.decode(job[10])
+ })
+elseif command == 'update' then
+ local jid = assert(ARGV[2], 'Recur(): Arg "jid" missing')
+ local options = {}
+
+ -- Make sure that the job exists
+ if redis.call('exists', 'ql:r:' .. jid) then
+ for i = 3, #ARGV, 2 do
+ local key = ARGV[i]
+ local value = ARGV[i+1]
+ if key == 'priority' or key == 'interval' or key == 'retries' then
+ value = assert(tonumber(value), 'Recur(): Arg "' .. key .. '" must be a number: ' .. tostring(value))
+ -- If the command is 'interval', then we need to update the time
+ -- when it should next be scheduled
+ if key == 'interval' then
+ local queue, interval = unpack(redis.call('hget', 'ql:r:' .. jid, 'queue', 'interval'))
+ redis.call('zincrby', 'ql:q:' .. queue .. '-recur', tonumber(interval) - value, jid)
+ end
+ redis.call('hset', 'ql:r:' .. jid, key, value)
+ elseif key == 'data' then
+ value = assert(cjson.decode(value), 'Recur(): Arg "data" is not JSON-encoded: ' .. tostring(value))
+ redis.call('hset', 'ql:r:' .. jid, 'data', cjson.encode(value))
+ elseif key == 'klass' then
+ redis.call('hset', 'ql:r:' .. jid, 'klass', value)
+ elseif key == 'queue' then
+ local queue = redis.call('hget', 'ql:r:' .. jid, 'queue')
+ local score = redis.call('zscore', 'ql:q:' .. queue .. '-recur', jid)
+ redis.call('zrem', 'ql:q:' .. queue .. '-recur', jid)
+ redis.call('zadd', 'ql:q:' .. value .. '-recur', score, jid)
+ else
+ error('Recur(): Unrecognized option "' .. key .. '"')
+ end
+ end
+ return true
+ else
+ return false
+ end
+elseif command == 'tag' then
+elseif command == 'untag' then
+else
+ error('Recur(): First argument must be one of [on, off, get, update, tag, untag]. Got ' .. tostring(ARGV[1]))
+end

0 comments on commit 3cafc67

Please sign in to comment.