Skip to content

Further reworking of queue overflow tests #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 13, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 149 additions & 37 deletions spec/concurrent/executor/thread_pool_executor_shared.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,30 +216,100 @@
end

specify 'a #post task is never executed when the queue is at capacity' do
executed = Concurrent::AtomicFixnum.new(0)
10.times do
begin
subject.post{ executed.increment; sleep(0.1) }
rescue
end
all_tasks_posted = Concurrent::Event.new

latch = Concurrent::CountDownLatch.new(max_threads)

initial_executed = Concurrent::AtomicFixnum.new(0)
subsequent_executed = Concurrent::AtomicFixnum.new(0)

# Fill up all the threads (with a task that won't complete until
# all tasks are posted)
max_threads.times do
subject.post{ latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
end
sleep(0.2)
expect(executed.value).to be < 10

# Wait for all those tasks to be taken off the queue onto a
# worker thread and start executing
latch.wait

# Fill up the queue (with a task that won't complete until
# all tasks are posted)
max_queue.times do
subject.post{ all_tasks_posted.wait; initial_executed.increment; }
end

# Inject 100 more tasks, which should throw an exception
100.times do
expect {
subject.post { subsequent_executed.increment; }
}.to raise_error(Concurrent::RejectedExecutionError)
end

# Trigger the event, so that the tasks in the threads and on
# the queue can run to completion
all_tasks_posted.set

# Wait for all tasks to finish
subject.shutdown
subject.wait_for_termination

# The tasks should have run until all the threads and the
# queue filled up...
expect(initial_executed.value).to be (max_threads + max_queue)

# ..but been dropped after that
expect(subsequent_executed.value).to be 0
end

specify 'a #<< task is never executed when the queue is at capacity' do
executed = Concurrent::AtomicFixnum.new(0)
10.times do
begin
subject << proc { executed.increment; sleep(0.1) }
rescue
end
all_tasks_posted = Concurrent::Event.new

latch = Concurrent::CountDownLatch.new(max_threads)

initial_executed = Concurrent::AtomicFixnum.new(0)
subsequent_executed = Concurrent::AtomicFixnum.new(0)

# Fill up all the threads (with a task that won't complete until
# all tasks are posted)
max_threads.times do
subject << proc { latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
end

# Wait for all those tasks to be taken off the queue onto a
# worker thread and start executing
latch.wait

# Fill up the queue (with a task that won't complete until
# all tasks are posted)
max_queue.times do
subject << proc { all_tasks_posted.wait; initial_executed.increment; }
end
sleep(0.2)
expect(executed.value).to be < 10

# Inject 100 more tasks, which should throw an exeption
100.times do
expect {
subject << proc { subsequent_executed.increment; }
}.to raise_error(Concurrent::RejectedExecutionError)
end

# Trigger the event, so that the tasks in the threads and on
# the queue can run to completion
all_tasks_posted.set

# Wait for all tasks to finish
subject.shutdown
subject.wait_for_termination

# The tasks should have run until all the threads and the
# queue filled up...
expect(initial_executed.value).to be (max_threads + max_queue)

# ..but been rejected after that
expect(subsequent_executed.value).to be 0
end
end

context ':discard' do

subject do
Expand All @@ -253,38 +323,37 @@
end

specify 'a #post task is never executed when the queue is at capacity' do
lock = Mutex.new
lock.lock
all_tasks_posted = Concurrent::Event.new

latch = Concurrent::CountDownLatch.new(max_threads)

initial_executed = Concurrent::AtomicFixnum.new(0)
subsequent_executed = Concurrent::AtomicFixnum.new(0)

# Fill up all the threads (with a task that won't run until
# lock.unlock is called)
# Fill up all the threads (with a task that won't complete until
# all tasks are posted)
max_threads.times do
subject.post{ latch.count_down; lock.lock; initial_executed.increment; lock.unlock }
subject.post{ latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
end

# Wait for all those tasks to be taken off the queue onto a
# worker thread and start executing
latch.wait

# Fill up the queue (with a task that won't run until
# lock.unlock is called)
# Fill up the queue (with a task that won't complete until
# all tasks are posted)
max_queue.times do
subject.post{ lock.lock; initial_executed.increment; lock.unlock }
subject.post{ all_tasks_posted.wait; initial_executed.increment; }
end

# Inject 100 more tasks, which should be dropped without an exception
100.times do
subject.post{ subsequent_executed.increment; }
end

# Unlock the lock, so that the tasks in the threads and on
# Trigger the event, so that the tasks in the threads and on
# the queue can run to completion
lock.unlock
all_tasks_posted.set

# Wait for all tasks to finish
subject.shutdown
Expand All @@ -299,34 +368,77 @@
end

specify 'a #<< task is never executed when the queue is at capacity' do
executed = Concurrent::AtomicFixnum.new(0)
1000.times do
subject << proc { sleep; executed.increment }
all_tasks_posted = Concurrent::Event.new

latch = Concurrent::CountDownLatch.new(max_threads)

initial_executed = Concurrent::AtomicFixnum.new(0)
subsequent_executed = Concurrent::AtomicFixnum.new(0)

# Fill up all the threads (with a task that won't complete until
# all tasks are posted)
max_threads.times do
subject << proc { latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
end
sleep(0.1)
expect(executed.value).to be 0

# Wait for all those tasks to be taken off the queue onto a
# worker thread and start executing
latch.wait

# Fill up the queue (with a task that won't complete until
# all tasks are posted)
max_queue.times do
subject << proc { all_tasks_posted.wait; initial_executed.increment; }
end

# Inject 100 more tasks, which should be dropped without an exception
100.times do
subject << proc { subsequent_executed.increment; }
end

# Trigger the event, so that the tasks in the threads and on
# the queue can run to completion
all_tasks_posted.set

# Wait for all tasks to finish
subject.shutdown
subject.wait_for_termination

# The tasks should have run until all the threads and the
# queue filled up...
expect(initial_executed.value).to be (max_threads + max_queue)

# ..but been dropped after that
expect(subsequent_executed.value).to be 0
end

specify 'a #post task is never executed when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)

subject.shutdown
subject.post{ sleep; executed.increment }
sleep(0.1)
subject.post{ executed.increment }

# Wait for all tasks to finish
subject.wait_for_termination

expect(executed.value).to be 0
end

specify 'a #<< task is never executed when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)

subject.shutdown
subject << proc { executed.increment }
sleep(0.1)

# Wait for all tasks to finish
subject.wait_for_termination

expect(executed.value).to be 0
end

specify '#post returns false when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)
subject.shutdown
ret = subject.post{ executed.increment }
ret = subject.post{ nil }
expect(ret).to be false
end
end
Expand Down