-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Middleware
Sidekiq has a similar notion of middleware to Rack: these are small bits of code that can implement functionality. Sidekiq breaks middleware into client-side and server-side. Client-side middleware runs around the pushing of the message to Redis and allows you to modify/stop the message before it gets pushed.
Sidekiq provides a "UniqueJobs" client-side middleware that checks to ensure that there is not already a duplicate message enqueued and awaiting processing in the last 30 minutes. If it does exist, it stops the message from going to Redis.
class Sidekiq::Middleware::Client::UniqueJobs
HASH_KEY_EXPIRATION = 30 * 60
def call(msg, queue)
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(msg))
Sidekiq.redis do |redis|
return if redis.get(payload_hash)
redis.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
end
yield
end
endServer-side middleware runs 'around' message processing. The error notification feature is implemented as a simple middleware. Writing your own middleware is easy; this is the server-side middleware which ensures that ActiveRecord connections are closed after each message is processed:
class Sidekiq::Middleware::Server::ActiveRecord
def call(worker, msg, queue)
yield
ensure
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
end
endYour middleware will be called with the worker instance which will process the message along with the full Hash which represents the message to process and the name of the queue it was pulled from.
You then register your middleware as part of the chain:
class AcmeCo::MyMiddleware
def initialize(options=nil)
# options == { :foo => 1, :bar => 2 }
end
def call(worker, msg, queue)
yield
end
end
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add AcmeCo::MyMiddleware, :foo => 1, :bar => 2
end
endI'd suggest putting this code in config/initializers/sidekiq.rb in your Rails app.
By default, Sidekiq's server middleware includes the following:
def self.default_middleware
Sidekiq::Middleware::Chain.new do |m|
m.add Sidekiq::Middleware::Server::ExceptionHandler
m.add Sidekiq::Middleware::Server::Logging
m.add Sidekiq::Middleware::Server::UniqueJobs
m.add Sidekiq::Middleware::Server::RetryJobs
m.add Sidekiq::Middleware::Server::ActiveRecord
end
end
- ExceptionHandler - integrates with Airbrake and other popular exception notification services so you are notified if your workers raise an exception.
- Logging - logs the start and finish of message processing
- UniqueJobs - removes the message fingerprint from Redis so an identical message can be processed
- RetryJobs - puts the message in the retry queue if it raises an exception
- ActiveRecord - closes active connections associated with the worker thread