Skip to content

Commit

Permalink
Add full multithreaded integration test for manager
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Feb 3, 2012
1 parent b77e879 commit 92c51c5
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Expand Up @@ -10,7 +10,7 @@ PATH
GEM
remote: http://rubygems.org/
specs:
celluloid (0.7.2)
celluloid (0.8.0)
connection_pool (0.1.0)
minitest (2.10.0)
multi_json (1.0.4)
Expand Down
10 changes: 5 additions & 5 deletions lib/sidekiq/cli.rb
Expand Up @@ -2,7 +2,7 @@
require 'sidekiq/version'
require 'sidekiq/util'
require 'sidekiq/client'
require 'sidekiq/server'
require 'sidekiq/manager'
require 'connection_pool'

module Sidekiq
Expand All @@ -19,15 +19,15 @@ def initialize

def run
::Sidekiq::Client.redis = ConnectionPool.new { Redis.connect(:url => @options[:server]) }
server = Sidekiq::Server.new(@options[:server], @options)
manager = Sidekiq::Manager.new(@options[:server], @options)
begin
log 'Starting processing, hit Ctrl-C to stop'
server.start!
manager.start!
sleep FOREVER
rescue Interrupt
log 'Shutting down...'
server.stop!
server.wait(:shutdown)
manager.stop!
manager.wait(:shutdown)
end
end

Expand Down
18 changes: 13 additions & 5 deletions lib/sidekiq/server.rb → lib/sidekiq/manager.rb
Expand Up @@ -8,11 +8,11 @@
module Sidekiq

##
# This is the main router in the system. This
# The main router in the system. This
# manages the processor state and fetches messages
# from Redis to be dispatched to ready processor.
# from Redis to be dispatched to an idle processor.
#
class Server
class Manager
include Util
include Celluloid

Expand All @@ -21,7 +21,7 @@ class Server
def initialize(location, options={})
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}"
verbose options.inspect
@count = options[:processor_count]
@count = options[:processor_count] || 25
@queues = options[:queues]
@queue_idx = 0
@queues_size = @queues.size
Expand Down Expand Up @@ -49,7 +49,12 @@ def start
dispatch(true)
end

def when_done
@done_callback = Proc.new
end

def processor_done(processor)
@done_callback.call(processor)
@busy.delete(processor)
if stopped?
processor.terminate
Expand Down Expand Up @@ -83,7 +88,7 @@ def find_work(queue_idx)
@busy << processor
processor.process! MultiJson.decode(msg)
end
msg
!!msg
end

def dispatch(schedule = false)
Expand Down Expand Up @@ -112,6 +117,9 @@ def dispatch(schedule = false)
end
end

# This is the polling loop that ensures we check Redis every
# second for work, even if there was nothing to do this time
# around.
after(1) { verbose('ping'); dispatch(schedule) } if schedule
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/util.rb
Expand Up @@ -25,7 +25,7 @@ def err(msg)
end

def log(msg)
STDOUT.puts(msg)
STDOUT.puts(msg) unless $TESTING
end

def verbose(msg)
Expand Down
38 changes: 38 additions & 0 deletions test/test_manager.rb
@@ -0,0 +1,38 @@
require 'helper'
require 'sidekiq'
require 'sidekiq/manager'
require 'timed_queue'

class TestManager < MiniTest::Unit::TestCase
describe 'with redis' do
before do
Sidekiq::Client.redis = @redis = Redis.connect(:url => 'redis://localhost/sidekiq_test')
@redis.flushdb
$processed = 0
end

class IntegrationWorker
include Sidekiq::Worker

def perform(a, b)
$processed += 1
a + b
end
end

it 'processes messages' do
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 2])
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 2])

q = TimedQueue.new
mgr = Sidekiq::Manager.new("redis://localhost/sidekiq_test", :queues => [:foo])
mgr.when_done do |_|
q << 'done' if $processed == 2
end
mgr.start!
result = q.timed_pop
assert_equal 'done', result
mgr.stop
end
end
end
42 changes: 42 additions & 0 deletions test/timed_queue.rb
@@ -0,0 +1,42 @@
require 'thread'
require 'timeout'

class TimedQueue
def initialize
@que = []
@mutex = Mutex.new
@resource = ConditionVariable.new
end

def push(obj)
@mutex.synchronize do
@que.push obj
@resource.broadcast
end
end
alias_method :<<, :push

def timed_pop(timeout=0.5)
deadline = Time.now + timeout
@mutex.synchronize do
loop do
return @que.shift unless @que.empty?
to_wait = deadline - Time.now
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
@resource.wait(@mutex, to_wait)
end
end
end

def empty?
@que.empty?
end

def clear
@que.clear
end

def length
@que.length
end
end

0 comments on commit 92c51c5

Please sign in to comment.