Skip to content

Commit

Permalink
Initial pass at API
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Mar 25, 2014
1 parent eda5627 commit 30198da
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 3 deletions.
43 changes: 41 additions & 2 deletions lib/sidekiq/client.rb
Expand Up @@ -32,8 +32,8 @@ def middleware(&block)
#
# Sidekiq::Client.new(ConnectionPool.new { Redis.new })
#
def initialize(redis_pool = Sidekiq.redis_pool)
@redis_pool = redis_pool
def initialize(redis_pool = nil)
@redis_pool = redis_pool || Sidekiq.redis_pool
end

##
Expand All @@ -54,12 +54,15 @@ def initialize(redis_pool = Sidekiq.redis_pool)
# push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
#
def push(item)
Thread.current[:current_pool] = @redis_pool
normed = normalize_item(item)
payload = process_single(item['class'], normed)

pushed = false
pushed = raw_push([payload]) if payload
pushed ? payload['jid'] : nil
ensure
Thread.current[:current_pool] = nil
end

##
Expand All @@ -77,6 +80,7 @@ def push(item)
# pushed can be less than the number given if the middleware stopped processing for one
# or more jobs.
def push_bulk(items)
Thread.current[:current_pool] = @redis_pool
normed = normalize_item(items)
payloads = items['args'].map do |args|
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array)
Expand All @@ -86,9 +90,44 @@ def push_bulk(items)
pushed = false
pushed = raw_push(payloads) if !payloads.empty?
pushed ? payloads.collect { |payload| payload['jid'] } : nil
ensure
Thread.current[:current_pool] = nil
end

class << self

# Allows sharding of jobs across any number of Redis instances. All jobs
# defined within the block will use the given Redis connection pool.
#
# pool = ConnectionPool.new { Redis.new }
# Sidekiq::Client.via(pool) do
# SomeWorker.perform_async(1,2,3)
# SomeOtherWorker.perform_async(1,2,3)
# end
#
# Generally this is only needed for very large Sidekiq installs processing
# more than thousands jobs per second.
def via(pool)
raise ArgumentError, "No pool given" if pool.nil?
Thread.current[:sidekiq_via_pool] = pool
yield
ensure
Thread.current[:sidekiq_via_pool] = nil
end

#
# Returns the Redis pool being used for the current client operation.
# Client operations should use +Sidekiq::Client.redis_pool+ whereas server
# operations should use +Sidekiq.redis_pool+.
#
# For example, in client-side middleware, you must use this method.
# In server-side middleware, you use +Sidekiq.redis_pool+.
#
# This complexity is necessary to support Redis sharding.
def redis_pool
Thread.current[:current_pool] || Sidekiq.redis_pool
end

def default
@default ||= new
end
Expand Down
23 changes: 22 additions & 1 deletion lib/sidekiq/worker.rb
Expand Up @@ -34,6 +34,25 @@ def logger
Sidekiq.logger
end

# Allows sharding of jobs across any number of Redis instances. All jobs
# defined within the block will use the given Redis connection pool.
#
# pool = ConnectionPool.new { Redis.new }
# Sidekiq::Worker.via(pool) do
# SomeWorker.perform_async(1,2,3)
# SomeOtherWorker.perform_async(1,2,3)
# end
#
# Generally this is only needed for very large Sidekiq installs processing
# more than thousands jobs per second.
def self.via(pool)
raise ArgumentError, "No pool given" if pool.nil?
Thread.current[:sidekiq_via_pool] = pool
yield
ensure
Thread.current[:sidekiq_via_pool] = nil
end

module ClassMethods

def perform_async(*args)
Expand Down Expand Up @@ -62,6 +81,7 @@ def perform_in(interval, *args)
# :retry - enable the RetryJobs middleware for this Worker, default *true*
# :backtrace - whether to save any error backtrace in the retry payload to display in web UI,
# can be true, false or an integer number of lines to save, default *false*
# :pool - use the given Redis connection pool to push this type of job to a given shard.
def sidekiq_options(opts={})
self.sidekiq_options_hash = get_sidekiq_options.merge((opts || {}).stringify_keys)
::Sidekiq.logger.warn("#{self.name} - :timeout is unsafe and support has been removed from Sidekiq, see http://bit.ly/OtYpK for details") if opts.include? :timeout
Expand All @@ -80,7 +100,8 @@ def get_sidekiq_options # :nodoc:
end

def client_push(item) # :nodoc:
Sidekiq::Client.push(item.stringify_keys)
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
Sidekiq::Client.new(pool).push(item.stringify_keys)
end

end
Expand Down
29 changes: 29 additions & 0 deletions test/test_client.rb
Expand Up @@ -235,4 +235,33 @@ def call(worker_class, message, queue)
assert_equal 2, Sidekiq::Client.new.__send__(:normalize_item, 'class' => CWorker, 'args' => [])['retry']
end
end

describe 'sharding' do
class DWorker < BaseWorker
end
it 'allows sidekiq_options to point to different Redi' do
conn = MiniTest::Mock.new
conn.expect(:multi, [0, 1])
DWorker.sidekiq_options('pool' => ConnectionPool.new(size: 1) { conn })
DWorker.perform_async(1,2,3)
conn.verify
end
it 'allows #via to point to different Redi' do
conn = MiniTest::Mock.new
conn.expect(:multi, [0, 1])
default = Sidekiq::Client.redis_pool
Sidekiq::Client.via(ConnectionPool.new(size: 1) { conn }) do
CWorker.perform_async(1,2,3)
end
assert_equal default, Sidekiq::Client.redis_pool
conn.verify
end
it 'allows Resque helpers to point to different Redi' do
conn = MiniTest::Mock.new
conn.expect(:zadd, 1, [String, Array])
DWorker.sidekiq_options('pool' => ConnectionPool.new(size: 1) { conn })
Sidekiq::Client.enqueue_in(10, DWorker, 3)
conn.verify
end
end
end

0 comments on commit 30198da

Please sign in to comment.