Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #84 from seomoz/per_job_middleware

Per job middleware
  • Loading branch information...
commit b7615fb9a184e0f24839056cfc3137d12c8f943a 2 parents 1dd4cd1 + fd6a66b
@myronmarston myronmarston authored
View
3  .travis.yml
@@ -1,8 +1,5 @@
before_script:
# From: https://github.com/jonleighton/poltergeist/blob/v1.0.0/.travis.yml#L5-7
- - sudo exe/install_phantomjs
- - "export PATH=phantomjs/bin:$PATH"
- - phantomjs --version
- git submodule init
- git submodule update
services:
View
33 README.md
@@ -228,6 +228,39 @@ Qless::Worker.class_eval do
end
```
+Per-Job Middlewares
+===================
+
+Qless also supports middleware on a per-job basis, when you have some
+orthogonal logic to run in the context of some (but not all) jobs.
+
+Per-job middlewares are defined the same as worker middlewares:
+
+``` ruby
+module ReEstablishDBConnection
+ def around_perform(job)
+ MyORM.establish_connection
+ super
+ end
+end
+```
+
+To add them to a job class, you first have to make your job class
+middleware-enabled by extending it with
+`Qless::Job::SupportsMiddleware`, then extend your middleware
+modules:
+
+``` ruby
+class MyJobClass
+ extend Qless::Job::SupportsMiddleware
+ extend ReEstablishDBConnection
+ extend MyOtherAwesomeMiddleware
+
+ def self.perform(job)
+ end
+end
+```
+
Web Interface
=============
View
26 lib/qless/job.rb
@@ -26,8 +26,26 @@ class Job < BaseJob
attr_reader :original_retries, :retries_left
attr_accessor :data, :priority, :tags
+ MiddlewareMisconfiguredError = Class.new(StandardError)
+
+ module SupportsMiddleware
+ def around_perform(job)
+ perform(job)
+ end
+ end
+
def perform
- klass.perform(self)
+ middlewares = Job.middlewares_on(klass)
+
+ if middlewares.last == SupportsMiddleware
+ klass.around_perform(self)
+ elsif middlewares.any?
+ raise MiddlewareMisconfiguredError, "The middleware chain for #{klass} " +
+ "(#{middlewares.inspect}) is misconfigured. Qless::Job::SupportsMiddleware " +
+ "must be extended onto your job class first if you want to use any middleware."
+ else
+ klass.perform(self)
+ end
end
def self.build(client, klass, attributes = {})
@@ -54,6 +72,12 @@ def self.build(client, klass, attributes = {})
new(client, attributes)
end
+ def self.middlewares_on(job_klass)
+ job_klass.singleton_class.ancestors.select do |ancestor|
+ ancestor.method_defined?(:around_perform)
+ end
+ end
+
def initialize(client, atts)
super(client, atts.fetch('jid'))
%w{jid data priority tags state tracked
View
8 lib/qless/middleware/redis_reconnect.rb
@@ -1,14 +1,16 @@
module Qless
module Middleware
module RedisReconnect
- def self.new(*redis_connections)
+ def self.new(*redis_connections, &block)
Module.new do
define_singleton_method :to_s do
- "Qless::Middleware::RedisReconnect(#{redis_connections.map(&:id).join(', ')})"
+ "Qless::Middleware::RedisReconnect"
end
+ block ||= lambda { |job| redis_connections }
+
define_method :around_perform do |job|
- redis_connections.each do |redis|
+ Array(block.call(job)).each do |redis|
redis.client.reconnect
end
View
23 lib/qless/middleware/retry_exceptions.rb
@@ -0,0 +1,23 @@
+module Qless
+ module Middleware
+ module RetryExceptions
+ def around_perform(job)
+ super
+ rescue *retryable_exception_classes => e
+ job.retry
+ end
+
+ def retryable_exception_classes
+ @retryable_exception_classes ||= []
+ end
+
+ def retry_on(*exception_classes)
+ retryable_exception_classes.push(*exception_classes)
+ end
+ end
+ end
+
+ # For backwards compatibility
+ RetryExceptions = Middleware::RetryExceptions
+end
+
View
11 lib/qless/retry_exceptions.rb
@@ -1,11 +0,0 @@
-module Qless
- module RetryExceptions
- def retryable_exception_classes
- @retryable_exception_classes ||= []
- end
-
- def retry_on(*exception_classes)
- self.retryable_exception_classes.push(*exception_classes)
- end
- end
-end
View
9 lib/qless/worker.rb
@@ -111,8 +111,6 @@ def work(interval = 5.0)
def perform(job)
around_perform(job)
- rescue *retryable_exception_classes(job)
- job.retry
rescue Exception => error
fail_job(job, error)
else
@@ -159,13 +157,6 @@ def uniq_clients
@uniq_clients ||= @job_reserver.queues.map(&:client).uniq
end
- def retryable_exception_classes(job)
- return [] unless job.klass.respond_to?(:retryable_exception_classes)
- job.klass.retryable_exception_classes
- rescue NameError => exn
- []
- end
-
def try_complete(job)
job.complete unless job.state_changed?
rescue Job::CantCompleteError => e
View
5 spec/integration/worker_spec.rb
@@ -3,7 +3,7 @@
require 'yaml'
require 'qless/worker'
require 'qless'
-require 'qless/retry_exceptions'
+require 'qless/middleware/retry_exceptions'
class WorkerIntegrationJob
def self.perform(job)
@@ -12,7 +12,8 @@ def self.perform(job)
end
class RetryIntegrationJob
- extend Qless::RetryExceptions
+ extend Qless::Job::SupportsMiddleware
+ extend Qless::Middleware::RetryExceptions
Kaboom = Class.new(StandardError)
retry_on Kaboom
View
42 spec/unit/job_spec.rb
@@ -8,6 +8,12 @@ class Nested
end
end
+ module SomeJobMiddleware
+ def around_perform(job)
+ super
+ end
+ end
+
let(:client) { stub.as_null_object }
describe ".build" do
@@ -55,6 +61,42 @@ class Nested
JobClass::Nested.should_receive(:perform).with(job).once
job.perform
end
+
+ context 'when the job class is a Qless::Job::SupportsMiddleware' do
+ it 'calls #around_perform on the job in order to run the middleware chain' do
+ klass = Class.new { extend Qless::Job::SupportsMiddleware }
+ stub_const("MyJobClass", klass)
+
+ job = Job.build(client, klass)
+ klass.should_receive(:around_perform).with(job).once
+ job.perform
+ end
+ end
+
+ context 'when the job mixes in a middleware but has forgotten Qless::Job::SupportsMiddleware' do
+ it 'raises an error to alert the user to the fact they need Qless::Job::SupportsMiddleware' do
+ klass = Class.new { extend SomeJobMiddleware }
+ stub_const('MyJobClass', klass)
+ job = Job.build(client, klass)
+
+ expect {
+ job.perform
+ }.to raise_error(Qless::Job::MiddlewareMisconfiguredError)
+ end
+ end
+ end
+
+ describe "#middlewares_on" do
+ it 'returns the list of middleware mixed into the job' do
+ klass = Class.new do
+ extend Qless::Job::SupportsMiddleware
+ extend SomeJobMiddleware
+ end
+
+ expect(Qless::Job.middlewares_on(klass)).to eq([
+ SomeJobMiddleware, Qless::Job::SupportsMiddleware
+ ])
+ end
end
[
View
59 spec/unit/middleware/redis_reconnect_spec.rb
@@ -9,35 +9,78 @@ module Middleware
let(:url_1) { "redis://localhost:1234/2" }
let(:url_2) { "redis://localhost:4321/3" }
- it 'includes the redis connection strings in its description' do
+ it 'has a human readable description' do
redis_1 = Redis.new(url: url_1)
redis_2 = Redis.new(url: url_2)
middleware = Qless::Middleware::RedisReconnect.new(redis_1, redis_2)
- expect(middleware.inspect).to include(url_1, url_2)
- expect(middleware.to_s).to include(url_1, url_2)
+ expect(middleware.inspect).to include("Qless::Middleware::RedisReconnect")
+ expect(middleware.to_s).to include("Qless::Middleware::RedisReconnect")
end
- it 'reconnects to the given redis clients before performing the job' do
- events = []
-
+ def define_job_class(events)
stub_const("MyJob", Class.new {
define_singleton_method :perform do |job|
events << :performed
end
})
+ end
- redis_connections = 1.upto(2).map do |i|
+ def create_redis_connections(number, events)
+ number.times.map do |i|
client = fire_double("Redis::Client")
client.stub(:reconnect) { events << :"reconnect_#{i}" }
fire_double("Redis", client: client)
end
+ end
+
+ it 'reconnects to the given redis clients before performing the job' do
+ define_job_class(events = [])
+
+ redis_connections = create_redis_connections(2, events)
worker = Qless::Worker.new(stub)
worker.extend Qless::Middleware::RedisReconnect.new(*redis_connections)
worker.perform(Qless::Job.build(stub.as_null_object, MyJob))
- expect(events).to eq([:reconnect_1, :reconnect_2, :performed])
+ expect(events).to eq([:reconnect_0, :reconnect_1, :performed])
+ end
+
+ it 'allows the redis connections to be picked based on job data' do
+ define_job_class(events = [])
+ worker = Qless::Worker.new(stub)
+ redis_connections = create_redis_connections(4, events)
+
+ worker.extend Qless::Middleware::RedisReconnect.new { |job|
+ if job.data["type"] == "evens"
+ [redis_connections[0], redis_connections[2]]
+ else
+ [redis_connections[1], redis_connections[3]]
+ end
+ }
+
+ even_job = Qless::Job.build(stub.as_null_object, MyJob, data: { "type" => "evens" })
+ odd_job = Qless::Job.build(stub.as_null_object, MyJob, data: { "type" => "odds" })
+
+ worker.perform(even_job)
+ expect(events).to eq([:reconnect_0, :reconnect_2, :performed])
+
+ worker.perform(odd_job)
+ expect(events).to eq([:reconnect_0, :reconnect_2, :performed,
+ :reconnect_1, :reconnect_3, :performed])
+ end
+
+ it 'allows the block to return a single redis connection' do
+ pending "waiting on https://github.com/rspec/rspec-mocks/issues/246", if: (RUBY_VERSION == '1.9.2') do
+ define_job_class(events = [])
+ worker = Qless::Worker.new(stub)
+ redis_connections = create_redis_connections(1, events)
+
+ worker.extend Qless::Middleware::RedisReconnect.new { |job| redis_connections.first }
+ worker.perform(Qless::Job.build(stub.as_null_object, MyJob))
+
+ expect(events).to eq([:reconnect_0, :performed])
+ end
end
end
end
View
24 spec/unit/middleware/retry_exceptions_spec.rb
@@ -0,0 +1,24 @@
+require 'qless/middleware/retry_exceptions'
+
+module Qless
+ module Middleware
+ describe RetryExceptions do
+ let(:job_class) { Class.new }
+ let(:exception_class) { Class.new(StandardError) }
+
+ before do
+ job_class.extend(RetryExceptions)
+ end
+
+ it 'defines a retryable_exceptions method that returns an empty array by default' do
+ job_class.retryable_exception_classes.should be_empty
+ end
+
+ it 'defines a retry_on method that makes exception types retryable' do
+ job_class.retry_on(exception_class)
+
+ job_class.retryable_exception_classes.should eq([exception_class])
+ end
+ end
+ end
+end
View
22 spec/unit/retry_exceptions_spec.rb
@@ -1,22 +0,0 @@
-require 'qless/retry_exceptions'
-
-module Qless
- describe RetryExceptions do
- let(:job_class) { Class.new }
- let(:exception_class) { Class.new(StandardError) }
-
- before do
- job_class.extend(RetryExceptions)
- end
-
- it 'defines a retryable_exceptions method that returns an empty array by default' do
- job_class.retryable_exception_classes.should be_empty
- end
-
- it 'defines a retry_on method that makes exception types retryable' do
- job_class.retry_on(exception_class)
-
- job_class.retryable_exception_classes.should eq([exception_class])
- end
- end
-end
View
9 spec/unit/worker_spec.rb
@@ -78,15 +78,6 @@ class MyJobClass; end
worker.perform(job)
end
- it 'retries the job if performing it raises a retryable error' do
- MyJobClass.stub(:retryable_exception_classes).and_return([ArgumentError])
- MyJobClass.stub(:perform) { raise ArgumentError.new("boom") }
-
- job.should_receive(:retry).with(no_args)
-
- worker.perform(job)
- end
-
it 'completes the job if it finishes with no errors' do
MyJobClass.stub(:perform)
job.should respond_to(:complete).with(0).arguments
Please sign in to comment.
Something went wrong with that request. Please try again.