Skip to content

Commit

Permalink
One redis accessor to rule them all.
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Feb 17, 2012
1 parent 3dafe00 commit 0050103
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 55 deletions.
6 changes: 6 additions & 0 deletions lib/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@
require 'sidekiq/client'
require 'sidekiq/worker'
require 'sidekiq/rails' if defined?(::Rails)

module Sidekiq
class << self
attr_accessor :redis
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def parse(args=ARGV)
end

def run
Sidekiq::Manager.redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace])
Sidekiq.redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace])
manager = Sidekiq::Manager.new(@options)
begin
logger.info 'Starting processing, hit Ctrl-C to stop'
Expand Down
18 changes: 5 additions & 13 deletions lib/sidekiq/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,25 @@ def self.middleware
@middleware ||= begin
m = Middleware::Chain.new
m.register do
use Middleware::Client::UniqueJobs, Client.redis
use Middleware::Client::ResqueWebCompatibility, Client.redis
use Middleware::Client::UniqueJobs
use Middleware::Client::ResqueWebCompatibility
end
m
end
end

def self.registered_workers
redis.smembers('workers')
Sidekiq.redis.smembers('workers')
end

def self.registered_queues
redis.smembers('queues')
Sidekiq.redis.smembers('queues')
end

def self.queue_mappings
@queue_mappings ||= {}
end

def self.redis
@redis ||= RedisConnection.create
end

def self.redis=(redis)
@redis = redis
end

# Example usage:
# Sidekiq::Client.push('my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
def self.push(queue=nil, item)
Expand All @@ -52,7 +44,7 @@ def self.push(queue=nil, item)

pushed = false
middleware.invoke(item, queue) do
redis.rpush("queue:#{queue}", MultiJson.encode(item))
Sidekiq.redis.rpush("queue:#{queue}", MultiJson.encode(item))
pushed = true
end
pushed
Expand Down
4 changes: 0 additions & 4 deletions lib/sidekiq/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ class Manager

trap_exit :processor_died

class << self
attr_accessor :redis
end

def initialize(options={})
logger.info "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}"
logger.debug { options.inspect }
Expand Down
5 changes: 1 addition & 4 deletions lib/sidekiq/middleware/client/resque_web_compatibility.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ module Sidekiq
module Middleware
module Client
class ResqueWebCompatibility
def initialize(redis)
@redis = redis
end

def call(item, queue)
@redis.sadd('queues', queue)
Sidekiq.redis.sadd('queues', queue)
yield
end

Expand Down
17 changes: 4 additions & 13 deletions lib/sidekiq/middleware/client/unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,15 @@ module Client
class UniqueJobs
HASH_KEY_EXPIRATION = 30 * 60

def initialize(redis)
@redis = redis
end

def call(item, queue)
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
return if already_scheduled?(payload_hash)

@redis.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
Sidekiq.redis.with_connection do |redis|
return if redis.get(payload_hash)
redis.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
end

yield
end

private

def already_scheduled?(payload_hash)
!!@redis.get(payload_hash)
end
end
end
end
Expand Down
6 changes: 1 addition & 5 deletions lib/sidekiq/middleware/server/unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ module Sidekiq
module Middleware
module Server
class UniqueJobs
def initialize(redis)
@redis = redis
end

def call(*args)
yield
ensure
@redis.del(Digest::MD5.hexdigest(MultiJson.encode(args[1])))
Sidekiq.redis.del(Digest::MD5.hexdigest(MultiJson.encode(args[1])))
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def self.middleware
# default middleware
chain.register do
use Middleware::Server::Airbrake
use Middleware::Server::UniqueJobs, Sidekiq::Manager.redis
use Middleware::Server::UniqueJobs
use Middleware::Server::ActiveRecord
end
chain
Expand Down
5 changes: 4 additions & 1 deletion lib/sidekiq/util.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
require 'logger'

module Sidekiq
##
# This module is part of Sidekiq core and not intended for extensions.
#
module Util

def self.logger
Expand Down Expand Up @@ -39,7 +42,7 @@ def logger
end

def redis
Sidekiq::Manager.redis
Sidekiq.redis
end

def process_id
Expand Down
2 changes: 1 addition & 1 deletion myapp/config/initializers/sidekiq.rb
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Sidekiq::Client.redis = Sidekiq::RedisConnection.create(:namespace => 'resque')
Sidekiq.redis = Sidekiq::RedisConnection.create(:namespace => 'resque')
15 changes: 8 additions & 7 deletions test/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@
class TestClient < MiniTest::Unit::TestCase
describe 'with real redis' do
before do
Sidekiq::Client.redis = Redis.connect(:url => 'redis://localhost/sidekiq_test')
Sidekiq::Client.redis.flushdb
Sidekiq.redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
Sidekiq.redis.flushdb
end

it 'does not push duplicate messages when configured for unique only' do
Sidekiq::Client.middleware.entries.clear
Sidekiq::Client.middleware.register do
use Sidekiq::Middleware::Client::UniqueJobs, Sidekiq::Client.redis
use Sidekiq::Middleware::Client::ResqueWebCompatibility, Sidekiq::Client.redis
use Sidekiq::Middleware::Client::UniqueJobs
use Sidekiq::Middleware::Client::ResqueWebCompatibility
end
10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) }
assert_equal 1, Sidekiq::Client.redis.llen("queue:customqueue")
assert_equal 1, Sidekiq.redis.llen("queue:customqueue")
end

it 'does push duplicate messages when not configured for unique only' do
Sidekiq::Client.middleware.unregister(Sidekiq::Middleware::Client::UniqueJobs)
10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) }
assert_equal 10, Sidekiq::Client.redis.llen("queue:customqueue2")
assert_equal 10, Sidekiq.redis.llen("queue:customqueue2")
end
end

Expand All @@ -38,7 +38,8 @@ def @redis.del(*); nil; end
def @redis.incrby(*); nil; end
def @redis.setex(*); nil; end
def @redis.expire(*); true; end
Sidekiq::Client.redis = @redis
def @redis.with_connection; yield self; end
Sidekiq.redis = @redis
end

it 'raises ArgumentError with invalid params' do
Expand Down
2 changes: 1 addition & 1 deletion test/test_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class TestManager < MiniTest::Unit::TestCase
describe 'with redis' do
before do
Sidekiq::Manager.redis = Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
Sidekiq.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
@redis.flushdb
$processed = 0
$mutex = Mutex.new
Expand Down
3 changes: 1 addition & 2 deletions test/test_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
class TestMiddleware < MiniTest::Unit::TestCase
describe 'middleware chain' do
before do
Sidekiq::Manager.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
Sidekiq::Client.redis = nil
Sidekiq.redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
end

class CustomMiddleware
Expand Down
3 changes: 1 addition & 2 deletions test/test_stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
class TestStats < MiniTest::Unit::TestCase
describe 'with redis' do
before do
Sidekiq::Manager.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
Sidekiq::Client.redis = nil
Sidekiq.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
@redis.flushdb
end

Expand Down

0 comments on commit 0050103

Please sign in to comment.