Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

When retrying a job we should note the state change. Since a retry is…

… essentially a move + delay and move note's state changes, retry should note state change too. This affects how/when a job is marked as complete by a worker.
  • Loading branch information...
commit e0991e29c41b01525eca837e69199ece6ee6f8b2 1 parent 1376638
Patrick Roby authored
Showing with 40 additions and 38 deletions.
  1. +40 −38 lib/qless/job.rb
View
78 lib/qless/job.rb
@@ -10,21 +10,21 @@ def initialize(client, jid)
@client = client
@jid = jid
end
-
+
def klass
@klass ||= @klass_name.split('::').inject(Kernel) { |context, name| context.const_get(name) }
end
-
+
def queue
@queue ||= Queue.new(@queue_name, @client)
end
end
-
+
class Job < BaseJob
attr_reader :jid, :expires_at, :state, :queue_name, :history, :worker_name, :failure, :klass_name, :tracked, :dependencies, :dependents
attr_reader :original_retries, :retries_left
attr_accessor :data, :priority, :tags
-
+
def perform
klass.perform(self)
end
@@ -52,14 +52,14 @@ def self.build(client, klass, attributes = {})
attributes["data"] = JSON.load(JSON.dump attributes["data"])
new(client, attributes)
end
-
+
def initialize(client, atts)
super(client, atts.fetch('jid'))
%w{jid data priority tags state tracked
failure history dependencies dependents}.each do |att|
self.instance_variable_set("@#{att}".to_sym, atts.fetch(att))
end
-
+
@expires_at = atts.fetch('expires')
@klass_name = atts.fetch('klass')
@queue_name = atts.fetch('queue')
@@ -73,21 +73,21 @@ def initialize(client, atts)
@dependencies = [] if @dependencies == {}
@state_changed = false
end
-
+
def priority=(priority)
if @client._priority.call([], [@jid, priority])
@priority = priority
end
end
-
+
def [](key)
@data[key]
end
-
+
def []=(key, val)
@data[key] = val
end
-
+
def to_s
inspect
end
@@ -95,15 +95,15 @@ def to_s
def description
"#{@jid} (#{@klass_name} / #{@queue_name})"
end
-
+
def inspect
"<Qless::Job #{description}>"
end
-
+
def ttl
@expires_at - Time.now.to_f
end
-
+
# Move this from it's current queue into another
def move(queue)
note_state_change do
@@ -112,7 +112,7 @@ def move(queue)
])
end
end
-
+
# Fail a job
def fail(group, message)
note_state_change do
@@ -124,7 +124,7 @@ def fail(group, message)
JSON.generate(@data)]) || false
end
end
-
+
# Heartbeat a job
def heartbeat()
@client._heartbeat.call([], [
@@ -133,7 +133,7 @@ def heartbeat()
Time.now.to_f,
JSON.generate(@data)]) || false
end
-
+
# Complete a job
# Options include
# => next (String) the next queue
@@ -155,38 +155,40 @@ def complete(nxt=nil, options={})
def state_changed?
@state_changed
end
-
+
def cancel
note_state_change do
@client._cancel.call([], [@jid])
end
end
-
+
def track()
@client._track.call([], ['track', @jid, Time.now.to_f])
end
-
+
def untrack
@client._track.call([], ['untrack', @jid, Time.now.to_f])
end
-
+
def tag(*tags)
@client._tag.call([], ['add', @jid, Time.now.to_f] + tags)
end
-
+
def untag(*tags)
@client._tag.call([], ['remove', @jid, Time.now.to_f] + tags)
end
-
+
def retry(delay=0)
- results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay])
- results.nil? ? false : results
+ note_state_change do
+ results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay])
+ results.nil? ? false : results
+ end
end
-
+
def depend(*jids)
!!@client._depends.call([], [@jid, 'on'] + jids)
end
-
+
def undepend(*jids)
!!@client._depends.call([], [@jid, 'off'] + jids)
end
@@ -199,59 +201,59 @@ def note_state_change
result
end
end
-
+
class RecurringJob < BaseJob
attr_reader :jid, :data, :priority, :tags, :retries, :interval, :count, :queue_name, :klass_name
-
+
def initialize(client, atts)
super(client, atts.fetch('jid'))
%w{jid data priority tags retries interval count}.each do |att|
self.instance_variable_set("@#{att}".to_sym, atts.fetch(att))
end
-
+
@klass_name = atts.fetch('klass')
@queue_name = atts.fetch('queue')
@tags = [] if @tags == {}
end
-
+
def priority=(value)
@client._recur.call([], ['update', @jid, 'priority', value])
@priority = value
end
-
+
def retries=(value)
@client._recur.call([], ['update', @jid, 'retries', value])
@retries = value
end
-
+
def interval=(value)
@client._recur.call([], ['update', @jid, 'interval', value])
@interval = value
end
-
+
def data=(value)
@client._recur.call([], ['update', @jid, 'data', JSON.generate(value)])
@data = value
end
-
+
def klass=(value)
@client._recur.call([], ['update', @jid, 'klass', value.to_s])
@klass_name = value.to_s
end
-
+
def move(queue)
@client._recur.call([], ['update', @jid, 'queue', queue])
@queue_name = queue
end
-
+
def cancel
@client._recur.call([], ['off', @jid])
end
-
+
def tag(*tags)
@client._recur.call([], ['tag', @jid] + tags)
end
-
+
def untag(*tags)
@client._recur.call([], ['untag', @jid] + tags)
end

2 comments on commit e0991e2

@myronmarston

It'd be nice to add a spec for this. I think you can just add :retry to the array of methods here:

https://github.com/seomoz/qless/blob/master/spec/unit/job_spec.rb#L46-72

@proby

Doh, thanks. Tests coming shortly.

Please sign in to comment.
Something went wrong with that request. Please try again.