Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

256 lines (228 sloc) 6.416 kb
require 'helper'
class TestGirlFridayQueue < MiniTest::Unit::TestCase
class TestErrorHandler
include MiniTest::Assertions
end
def test_should_process_messages
async_test do |cb|
queue = GirlFriday::WorkQueue.new('process') do |msg|
assert_equal 'foo', msg[:text]
queue.shutdown do
cb.call
end
end
queue.push(:text => 'foo')
end
end
def test_should_handle_worker_error
async_test do |cb|
queue = nil
TestErrorHandler.send(:define_method, :handle) do |ex|
assert_equal 'oops', ex.message
assert_equal 'RuntimeError', ex.class.name
queue.shutdown do
cb.call
end
end
queue = GirlFriday::WorkQueue.new('error', :error_handler => TestErrorHandler) do |msg|
raise 'oops'
end
queue.push(:text => 'foo')
end
end
def test_should_handle_worker_error_with_retry
async_test do |cb|
TestErrorHandler.send(:define_method, :handle) do |ex|
end
queue = GirlFriday::WorkQueue.new('error', :error_handler => TestErrorHandler, :size => 1) do |msg|
begin
raise 'oops' if msg == 1
queue.shutdown do
cb.call
end
ensure
queue.push(0)
end
end
queue.push(1)
end
end
def test_should_use_a_default_error_handler_when_none_specified
async_test do |cb|
queue = GirlFriday::WorkQueue.new('error') do |msg|
end
queue.shutdown do
cb.call
end
queue.push(:text => 'foo') # Redundant
# Not an ideal method, but I can't see a better way without complex stubbing.
assert queue.instance_eval { @error_handlers }.length > 0
end
end
def test_should_call_callback_when_complete
async_test do |cb|
queue = GirlFriday::WorkQueue.new('callback', :size => 1) do |msg|
assert_equal 'foo', msg[:text]
'camel'
end
queue.push(:text => 'foo') do |result|
assert_equal 'camel', result
queue.shutdown do
cb.call
end
end
end
end
def test_should_provide_status
mutex = Mutex.new
total = 200
count = 0
incr = Proc.new do
mutex.synchronize do
count += 1
end
end
actual = nil
async_test do |cb|
queue = GirlFriday::WorkQueue.new('status', :size => 3) do |msg|
mycount = incr.call
actual = queue.status if mycount == 100
queue.shutdown do
cb.call
end if mycount == total
end
total.times do |idx|
queue.push(:text => 'foo')
end
end
refute_nil actual
refute_nil actual['status']
metrics = actual['status']
assert metrics[:total_queued] > 0
assert metrics[:total_queued] <= total
assert_equal 3, metrics[:pool_size]
assert_equal 3, metrics[:busy]
assert_equal 0, metrics[:ready]
assert(metrics[:backlog] > 0)
assert(metrics[:total_processed] > 0)
end
def test_should_persist_with_redis_connection_pool
begin
require 'redis'
require 'connection_pool'
pool = ConnectionPool.new(:size => 5, :timeout => 2){ Redis.new }
pool.with {|redis| redis.flushdb }
rescue LoadError
return puts "Skipping redis test, 'redis' gem not found: #{$!.message}"
rescue Errno::ECONNREFUSED
return puts 'Skipping redis test, not running locally'
end
mutex = Mutex.new
total = 100
count = 0
incr = Proc.new do
mutex.synchronize do
count += 1
end
end
async_test(2.0) do |cb|
queue = GirlFriday::WorkQueue.new('redis-pool', :size => 2, :store => GirlFriday::Store::Redis, :store_config => { :pool => pool }) do |msg|
incr.call
queue.shutdown do
cb.call
end if count == total
end
total.times do
queue.push(:text => 'foo')
end
refute_nil queue.status['redis-pool'][:backlog]
end
end
def test_should_raise_if_no_store_config_passed_in_for_redis_backend
assert_raises(ArgumentError) do
GirlFriday::WorkQueue.new('raise-test', :store => GirlFriday::Store::Redis) do |msg|
# doing work
end
end
end
def test_should_allow_graceful_shutdown
mutex = Mutex.new
total = 100
count = 0
incr = Proc.new do
mutex.synchronize do
count += 1
end
end
async_test do |cb|
queue = GirlFriday::WorkQueue.new('shutdown', :size => 2) do |msg|
incr.call
cb.call if count == total
end
total.times do
queue.push(:text => 'foo')
end
sleep 0.1
assert_equal 1, GirlFriday.queues.size
count = GirlFriday.shutdown!
assert_equal 0, count
cb.call
end
end
def test_should_allow_in_progress_work_to_finish
mutex = Mutex.new
total = 8
count = 0
incr = Proc.new do
mutex.synchronize do
count += 1
end
end
async_test(10) do |cb|
queue = GirlFriday::WorkQueue.new('finish', :size => 10) do |msg|
sleep 1
incr.call
end
total.times do
queue.push(:text => 'foo')
end
GirlFriday.shutdown!
assert_equal total, queue.instance_variable_get("@total_processed")
assert_equal total, count
cb.call
end
end
def test_should_create_workers_lazily
async_test do |cb|
queue = GirlFriday::Queue.new('lazy', :size => 2) do |msg|
assert_equal 1, queue.instance_variable_get(:@ready_workers).size
queue.shutdown do
cb.call
end
end
assert queue.instance_variable_defined?(:@ready_workers)
assert_nil queue.instance_variable_get(:@ready_workers)
# don't instantiate the worker threads until we actually put
# work onto the queue.
queue << 'empty msg'
end
end
def test_stubbing_girl_friday_with_flexmock
expected = Thread.current.to_s
actual = nil
processor = Proc.new do |msg|
actual = Thread.current.to_s
end
async_test do |cb|
queue = GirlFriday::Queue.new('flexmock', :size => 2, &processor)
flexmock(queue).should_receive(:push).zero_or_more_times.and_return do |msg|
processor.call(msg)
end
queue.push 'hello world!'
assert_equal expected, actual
queue.shutdown do
cb.call
end
end
end
end
Jump to Line
Something went wrong with that request. Please try again.