Permalink
Browse files

Merge pull request #77 from seomoz/middlewares

Middlewares
  • Loading branch information...
2 parents 34faabc + 204c65d commit 4cfcbc6a4b1026149d8e035a43aff2005ae4e560 @myronmarston myronmarston committed Feb 1, 2013
@@ -0,0 +1,22 @@
+module Qless
+ module Middleware
+ module RedisReconnect
+ def self.new(*redis_connections)
+ Module.new do
+ define_singleton_method :to_s do
+ "Qless::Middleware::RedisReconnect(#{redis_connections.map(&:id).join(', ')})"
+ end
+
+ define_method :around_perform do |job|
+ redis_connections.each do |redis|
+ redis.client.reconnect
+ end
+
+ super(job)
+ end
+ end
+ end
+ end
+ end
+end
+
@@ -0,0 +1,70 @@
+require 'raven'
+
+module Qless
+ module Middleware
+ # This middleware logs errors to the sentry exception notification service:
+ # http://getsentry.com/
+ module Sentry
+ def around_perform(job)
+ super
+ rescue Exception => e
+ SentryLogger.new(e, job).log
+ raise
+ end
+
+ # Logs a single exception to Sentry, adding pertinent job info.
+ class SentryLogger
+ def initialize(exception, job)
+ @exception, @job = exception, job
+ end
+
+ def log
+ event = ::Raven::Event.capture_exception(@exception) do |evt|
+ evt.extra = { job: job_metadata }
+ end
+
+ safely_send event
+ end
+
+ private
+
+ def safely_send(event)
+ return unless event
+ ::Raven.send(event)
+ rescue
+ # We don't want to silence our errors when the Sentry server
+ # responds with an error. We'll still see the errors on the
+ # Qless Web UI.
+ end
+
+
+ def job_metadata
+ {
+ jid: @job.jid,
+ klass: @job.klass_name,
+ history: job_history,
+ data: @job.data,
+ queue: @job.queue_name,
+ worker: @job.worker_name,
+ tags: @job.tags,
+ priority: @job.priority
+ }
+ end
+
+ # We want to log formatted timestamps rather than integer timestamps
+ def job_history
+ @job.history.map do |history_event|
+ history_event.each_with_object({}) do |(key, value), hash|
+ hash[key] = if value.is_a?(Integer)
+ Time.at(value).getutc.iso8601
+ else
+ value
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
+
View
@@ -8,7 +8,7 @@ module Qless
# https://github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb
class Worker
def initialize(job_reserver, options = {})
- @job_reserver = job_reserver
+ self.job_reserver = job_reserver
@shutdown = @paused = false
self.very_verbose = options[:very_verbose]
@@ -35,6 +35,10 @@ def initialize(job_reserver, options = {})
# Defaults to $stdout.
attr_accessor :output
+ # The object responsible for reserving jobs from the Qless server,
+ # using some reasonable strategy (e.g. round robin or ordered)
+ attr_accessor :job_reserver
+
# Starts a worker based on ENV vars. Supported ENV vars:
# - REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically)
# - QUEUES=high,medium,low or QUEUE=blah
View
@@ -46,4 +46,5 @@ Gem::Specification.new do |s|
s.add_development_dependency "poltergeist" , "~> 1.0"
s.add_development_dependency "launchy" , "~> 2.1.0"
s.add_development_dependency "simplecov" , "~> 0.6.2"
+ s.add_development_dependency 'sentry-raven', "~> 0.4"
end
@@ -0,0 +1,45 @@
+require 'spec_helper'
+require 'qless/middleware/redis_reconnect'
+require 'redis'
+require 'qless/worker'
+
+module Qless
+ module Middleware
+ describe RedisReconnect do
+ let(:url_1) { "redis://localhost:1234/2" }
+ let(:url_2) { "redis://localhost:4321/3" }
+
+ it 'includes the redis connection strings in its description' do
+ redis_1 = Redis.new(url: url_1)
+ redis_2 = Redis.new(url: url_2)
+ middleware = Qless::Middleware::RedisReconnect.new(redis_1, redis_2)
+
+ expect(middleware.inspect).to include(url_1, url_2)
+ expect(middleware.to_s).to include(url_1, url_2)
+ end
+
+ it 'reconnects to the given redis clients before performing the job' do
+ events = []
+
+ stub_const("MyJob", Class.new {
+ define_singleton_method :perform do |job|
+ events << :performed
+ end
+ })
+
+ redis_connections = 1.upto(2).map do |i|
+ client = fire_double("Redis::Client")
+ client.stub(:reconnect) { events << :"reconnect_#{i}" }
+ fire_double("Redis", client: client)
+ end
+
+ worker = Qless::Worker.new(stub)
+ worker.extend Qless::Middleware::RedisReconnect.new(*redis_connections)
+ worker.perform(Qless::Job.build(stub.as_null_object, MyJob))
+
+ expect(events).to eq([:reconnect_1, :reconnect_2, :performed])
+ end
+ end
+ end
+end
+
@@ -0,0 +1,82 @@
+require 'spec_helper'
+require 'qless/middleware/sentry'
+require 'qless'
+require 'qless/worker'
+
+module Qless
+ module Middleware
+ describe Sentry do
+ let(:client) { fire_double("Qless::Client").as_null_object }
+
+ let(:klass) do
+ Class.new do
+ def self.perform(job)
+ raise "job failure"
+ end
+ end
+ end
+
+ let(:time_1) { Time.utc(2012, 8, 1, 12, 30) }
+ let(:time_2) { Time.utc(2012, 8, 1, 12, 31) }
+
+ let(:history_event) do
+ {'popped' => time_2.to_i,
+ 'put' => time_1.to_i,
+ 'q' => 'test_error',
+ 'worker' => 'Myrons-Macbook-Pro.local-44396'}
+ end
+
+ let(:job) do
+ stub_const("MyJob", klass)
+ Qless::Job.build(client, MyJob,
+ data: { "some" => "data" },
+ worker: 'w1', queue: 'q1',
+ jid: 'abc', history: [history_event],
+ tags: ['x', 'y'], priority: 10)
+
+ end
+
+ def perform_job
+ worker = Qless::Worker.new(stub)
+ worker.extend Qless::Middleware::Sentry
+ worker.perform(job)
+ end
+
+ it 'logs jobs with errors to sentry' do
+ sent_event = nil
+ ::Raven.stub(:send) { |e| sent_event = e }
+
+ # it's important the job still fails normally
+ job.should_receive(:fail)
+
+ perform_job
+
+ expect(sent_event.message).to include("job failure")
+ expect(sent_event.extra[:job]).to include(
+ jid: 'abc',
+ klass: 'MyJob',
+ data: { "some" => "data" },
+ queue: 'q1',
+ worker: 'w1',
+ tags: ['x', 'y'],
+ priority: 10
+ )
+
+ expect(sent_event.extra[:job][:history].first).to include(
+ 'put' => time_1.iso8601, 'popped' => time_2.iso8601
+ )
+ end
+
+ it 'does not silence the original error when sentry responds with an error' do
+ ::Raven.stub(:send) { raise ::Raven::Error, "sentry failure" }
+ job.should_receive(:fail) do |_, message|
+ expect(message).to include("job failure")
+ expect(message).not_to include("sentry failure")
+ end
+
+ perform_job
+ end
+ end
+ end
+end
+

0 comments on commit 4cfcbc6

Please sign in to comment.