Permalink
Browse files

Merge pull request #88 from seomoz/deserialize-history

Deserialize history
  • Loading branch information...
2 parents f562ee0 + 14de65a commit 8e2917be640cfa31cef70acbdd45abf73a5fe1ff @myronmarston myronmarston committed Apr 4, 2013
Showing with 115 additions and 30 deletions.
  1. +38 −10 lib/qless/job.rb
  2. +3 −3 lib/qless/middleware/sentry.rb
  3. +2 −2 lib/qless/server.rb
  4. +5 −5 lib/qless/server/views/_job.erb
  5. +10 −10 spec/integration/qless_spec.rb
  6. +57 −0 spec/unit/job_spec.rb
View
48 lib/qless/job.rb
@@ -22,8 +22,8 @@ def queue
end
class Job < BaseJob
- attr_reader :jid, :expires_at, :state, :queue_name, :history, :worker_name, :failure, :klass_name, :tracked, :dependencies, :dependents
- attr_reader :original_retries, :retries_left
+ attr_reader :jid, :expires_at, :state, :queue_name, :worker_name, :failure, :klass_name, :tracked, :dependencies, :dependents
+ attr_reader :original_retries, :retries_left, :raw_queue_history
attr_accessor :data, :priority, :tags
MiddlewareMisconfiguredError = Class.new(StandardError)
@@ -81,16 +81,17 @@ def self.middlewares_on(job_klass)
def initialize(client, atts)
super(client, atts.fetch('jid'))
%w{jid data priority tags state tracked
- failure history dependencies dependents}.each do |att|
+ failure dependencies dependents}.each do |att|
self.instance_variable_set("@#{att}".to_sym, atts.fetch(att))
end
- @expires_at = atts.fetch('expires')
- @klass_name = atts.fetch('klass')
- @queue_name = atts.fetch('queue')
- @worker_name = atts.fetch('worker')
- @original_retries = atts.fetch('retries')
- @retries_left = atts.fetch('remaining')
+ @expires_at = atts.fetch('expires')
+ @klass_name = atts.fetch('klass')
+ @queue_name = atts.fetch('queue')
+ @worker_name = atts.fetch('worker')
+ @original_retries = atts.fetch('retries')
+ @retries_left = atts.fetch('remaining')
+ @raw_queue_history = atts.fetch('history')
# This is a silly side-effect of Lua doing JSON parsing
@tags = [] if @tags == {}
@@ -133,13 +134,36 @@ def reconnect_to_redis
@client.redis.client.reconnect
end
+ def history
+ warn "WARNING: Qless::Job#history is deprecated; use Qless::Job#raw_queue_history instead" +
+ "; called from:\n#{caller.first}\n"
+ raw_queue_history
+ end
+
+ def queue_history
+ @queue_history ||= @raw_queue_history.map do |history_event|
+ history_event.each_with_object({}) do |(key, value), hash|
+ # The only integer values we get in the history are timestamps
+ hash[key] = if value.is_a?(Integer)
+ Time.at(value).utc
+ else
+ value
+ end
+ end
+ end
+ end
+
+ def initially_put_at
+ @initially_put_at ||= history_timestamp('put', :min)
+ end
+
def to_hash
{
jid: jid,
expires_at: expires_at,
state: state,
queue_name: queue_name,
- history: history,
+ history: raw_queue_history,
worker_name: worker_name,
failure: failure,
klass_name: klass_name,
@@ -261,6 +285,10 @@ def note_state_change
@state_changed = true
result
end
+
+ def history_timestamp(name, selector)
+ queue_history.map { |q| q[name] }.compact.send(selector)
+ end
end
class RecurringJob < BaseJob
View
6 lib/qless/middleware/sentry.rb
@@ -53,10 +53,10 @@ def job_metadata
# We want to log formatted timestamps rather than integer timestamps
def job_history
- @job.history.map do |history_event|
+ @job.queue_history.map do |history_event|
history_event.each_with_object({}) do |(key, value), hash|
- hash[key] = if value.is_a?(Integer)
- Time.at(value).getutc.iso8601
+ hash[key] = if value.is_a?(Time)
+ value.iso8601
else
value
end
View
4 lib/qless/server.rb
@@ -384,7 +384,7 @@ def strftime(t)
if job.nil?
halt 404, "Could not find job"
else
- queue = job.history[-1]["q"]
+ queue = job.raw_queue_history[-1]["q"]
job.move(queue)
return json({ :id => data["id"], :queue => queue})
end
@@ -399,7 +399,7 @@ def strftime(t)
halt 400, "Neet type"
else
return json(client.jobs.failed(data["type"], 0, 500)['jobs'].map do |job|
- queue = job.history[-1]["q"]
+ queue = job.raw_queue_history[-1]["q"]
job.move(queue)
{ :id => job.jid, :queue => queue}
end)
View
10 lib/qless/server/views/_job.erb
@@ -112,12 +112,12 @@
<div class="span6">
<h3><small>History</small></h3>
<div style="overflow-y:scroll; height: 200px">
- <% job.history.reverse.each do |h| %>
+ <% job.queue_history.reverse.each do |h| %>
<pre><strong><%= h['q'] %></strong>
- Put: <%= strftime(Time.at(h['put'])) %><% if not h['popped'].nil? %>
- Pop: <%= strftime(Time.at(h['popped'])) %> by <%= h['worker'] %><% end %><% if not h['completed'].nil? %>
- Completed: <%= strftime(Time.at(h['completed'])) %><% end %><% if not h['failed'].nil? %>
- Failed: <%= strftime(Time.at(h['failed'])) %><% end %></pre>
+ Put: <%= strftime(h['put']) %><% if not h['popped'].nil? %>
+ Pop: <%= strftime(h['popped']) %> by <%= h['worker'] %><% end %><% if not h['completed'].nil? %>
+ Completed: <%= strftime(h['completed']) %><% end %><% if not h['failed'].nil? %>
+ Failed: <%= strftime(h['failed']) %><% end %></pre>
<% end %>
</div>
</div>
View
20 spec/integration/qless_spec.rb
@@ -463,7 +463,7 @@ def should_only_have_tracked_jid_for(type)
jobs = q.pop(100)
jobs.length.should eq(6)
6.times do |i|
- jobs[i].history[0]['put'].should eq(start + i * 10)
+ jobs[i].raw_queue_history[0]['put'].should eq(start + i * 10)
end
# Cancel the original rcurring job, complete these jobs, start for peek
client.jobs[jid].cancel
@@ -476,7 +476,7 @@ def should_only_have_tracked_jid_for(type)
jobs = q.peek(100)
jobs.length.should eq(6)
6.times do |i|
- jobs[i].history[0]['put'].should eq(start + i * 10)
+ jobs[i].raw_queue_history[0]['put'].should eq(start + i * 10)
end
end
@@ -546,8 +546,8 @@ def should_only_have_tracked_jid_for(type)
job.tags.should eq([])
job.worker_name.should eq("")
job.state.should eq("waiting")
- job.history.length.should eq(1)
- job.history[0]['q'].should eq("testing")
+ job.raw_queue_history.length.should eq(1)
+ job.raw_queue_history[0]['q'].should eq("testing")
end
it "can put, peek, and pop many" do
@@ -678,10 +678,10 @@ def should_only_have_tracked_jid_for(type)
# 4) Complete job, check history
jid = q.put(Qless::Job, {"test" => "put_history"})
job = client.jobs[jid]
- (job.history[0]["put"] - Time.now.to_i).abs.should < 1
+ (job.raw_queue_history[0]["put"] - Time.now.to_i).abs.should < 1
job = q.pop
job = client.jobs[jid]
- (job.history[0]["popped"] - Time.now.to_i).abs.should < 1
+ (job.raw_queue_history[0]["popped"] - Time.now.to_i).abs.should < 1
end
it "peeks and pops empty queues with nil" do
@@ -746,7 +746,7 @@ def should_only_have_tracked_jid_for(type)
before.tags.should eq(after.tags)
before.data.should eq(after.data)
before.priority.should eq(after.priority)
- after.history.length.should eq(2)
+ after.raw_queue_history.length.should eq(2)
end
end
@@ -1018,7 +1018,7 @@ def should_only_have_tracked_jid_for(type)
job = q.pop
job.complete.should eq("complete")
job = client.jobs[jid]
- job.history.length.should eq(1)
+ job.raw_queue_history.length.should eq(1)
job.state.should eq("complete")
job.worker_name.should eq("")
job.queue_name.should eq("")
@@ -1044,7 +1044,7 @@ def should_only_have_tracked_jid_for(type)
job = q.pop
job.complete("testing").should eq("waiting")
job = client.jobs[jid]
- job.history.length.should eq(2)
+ job.raw_queue_history.length.should eq(2)
job.state.should eq("waiting")
job.worker_name.should eq("")
job.queue_name.should eq("testing")
@@ -1072,7 +1072,7 @@ def should_only_have_tracked_jid_for(type)
expect { bjob.complete }.not_to raise_error
job = client.jobs[jid]
- job.history.length.should eq(1)
+ job.raw_queue_history.length.should eq(1)
job.state.should eq("complete")
job.worker_name.should eq("")
job.queue_name.should eq("")
View
57 spec/unit/job_spec.rb
@@ -154,6 +154,63 @@ class MyCustomError < StandardError; end
job.inspect.should include(job.queue_name)
end
end
+
+ describe "history methods" do
+ let(:time_1) { Time.utc(2012, 8, 1, 12, 30) }
+ let(:time_2) { Time.utc(2012, 8, 1, 12, 31) }
+
+ let(:history_event) do
+ {'popped' => time_2.to_i,
+ 'put' => time_1.to_i,
+ 'q' => 'test_error',
+ 'worker' => 'Myrons-Macbook-Pro.local-44396'}
+ end
+
+ let(:job) do
+ Qless::Job.build(client, JobClass, history: [history_event])
+ end
+
+ it 'returns the raw history from `raw_queue_history`' do
+ expect(job.raw_queue_history).to eq([history_event])
+ end
+
+ it 'returns the raw history from `history` as well' do
+ job.stub(:warn)
+ expect(job.history).to eq([history_event])
+ end
+
+ it 'prints a deprecation warning from `history`' do
+ job.should_receive(:warn).with(/deprecated/i)
+ job.history
+ end
+
+ it 'converts timestamps to Time objects for `queue_history`' do
+ converted = history_event.merge('popped' => time_2, 'put' => time_1)
+ expect(job.queue_history).to eq([converted])
+ end
+ end
+
+ describe "#initially_put_at" do
+ let(:time_1) { Time.utc(2012, 8, 1, 12, 30) }
+ let(:time_2) { Time.utc(2012, 8, 1, 12, 31) }
+
+ let(:queue_1) { { 'put' => time_1 } }
+ let(:queue_2) { { 'put' => time_2 } }
+
+ def build_job(*events)
+ Qless::Job.build(client, JobClass, history: events)
+ end
+
+ it 'returns the earliest `put` timestamp' do
+ job = build_job(queue_2, queue_1)
+ expect(job.initially_put_at).to eq(time_1)
+ end
+
+ it 'tolerates queues that lack a `put` time' do
+ job = build_job({}, queue_1)
+ expect(job.initially_put_at).to eq(time_1)
+ end
+ end
end
end

0 comments on commit 8e2917b

Please sign in to comment.