Skip to content

Commit

Permalink
Rollback java executor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pitr-ch committed May 17, 2018
1 parent 23cb3f9 commit a6fc08f
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 158 deletions.
19 changes: 13 additions & 6 deletions lib/concurrent/executor/cached_thread_pool.rb
Expand Up @@ -46,17 +46,24 @@ def initialize(opts = {})

private

# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
if Concurrent.on_jruby?
if defined?(JavaThreadPoolExecutor) && self < JavaThreadPoolExecutor
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
@max_queue = 0
@executor = java.util.concurrent.Executors.newCachedThreadPool
@executor = java.util.concurrent.Executors.newCachedThreadPool
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
self.auto_terminate = opts.fetch(:auto_terminate, true)
end
else
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
end
end

end
end
2 changes: 1 addition & 1 deletion lib/concurrent/executor/single_thread_executor.rb
Expand Up @@ -8,7 +8,7 @@ module Concurrent

SingleThreadExecutorImplementation = case
when Concurrent.on_jruby?
JavaSingleThreadExecutor
RubySingleThreadExecutor
else
RubySingleThreadExecutor
end
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/thread_pool_executor.rb
Expand Up @@ -9,7 +9,7 @@ module Concurrent

ThreadPoolExecutorImplementation = case
when Concurrent.on_jruby?
JavaThreadPoolExecutor
RubyThreadPoolExecutor
else
RubyThreadPoolExecutor
end
Expand Down Expand Up @@ -58,9 +58,9 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
# @!macro [new] thread_pool_executor_method_initialize
#
# Create a new thread pool.
#
#
# @param [Hash] opts the options which configure the thread pool.
#
#
# @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
# number of threads to be created
# @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) When a new task is submitted
Expand All @@ -73,12 +73,12 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
# tasks that are received when the queue size has reached
# `max_queue` or the executor has shut down
#
#
# @raise [ArgumentError] if `:max_threads` is less than one
# @raise [ArgumentError] if `:min_threads` is less than zero
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
# in `FALLBACK_POLICIES`
#
#
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

# @!method initialize(opts = {})
Expand Down
200 changes: 100 additions & 100 deletions spec/concurrent/executor/cached_thread_pool_spec.rb
Expand Up @@ -116,122 +116,122 @@ module Concurrent

context 'runtime-specific implementation' do

if Concurrent.on_jruby?

context '#initialize' do

it 'sets :fallback_policy correctly' do
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
policy = clazz.new
expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)

subject = CachedThreadPool.new(fallback_policy: :discard)
expect(subject.fallback_policy).to eq :discard
end

it 'defaults :fallback_policy to :abort' do
subject = CachedThreadPool.new
expect(subject.fallback_policy).to eq :abort
end

it 'raises an exception if given an invalid :fallback_policy' do
expect {
CachedThreadPool.new(fallback_policy: :bogus)
}.to raise_error(ArgumentError)
end
# if Concurrent.on_jruby?
#
# context '#initialize' do
#
# it 'sets :fallback_policy correctly' do
# clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
# policy = clazz.new
# expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)
#
# subject = CachedThreadPool.new(fallback_policy: :discard)
# expect(subject.fallback_policy).to eq :discard
# end
#
# it 'defaults :fallback_policy to :abort' do
# subject = CachedThreadPool.new
# expect(subject.fallback_policy).to eq :abort
# end
#
# it 'raises an exception if given an invalid :fallback_policy' do
# expect {
# CachedThreadPool.new(fallback_policy: :bogus)
# }.to raise_error(ArgumentError)
# end
# end
#
# else

context 'garbage collection' do

subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }

it 'removes from pool any thread that has been idle too long' do
latch = Concurrent::CountDownLatch.new(4)
4.times { subject.post { sleep 0.1; latch.count_down } }
expect(latch.wait(1)).to be true
sleep 0.2
subject.post {}
sleep 0.2
expect(subject.length).to be < 4
end

else

context 'garbage collection' do

subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
it 'deals with dead threads' do
expect(subject).to receive(:ns_worker_died).exactly(5).times.and_call_original

it 'removes from pool any thread that has been idle too long' do
latch = Concurrent::CountDownLatch.new(4)
4.times { subject.post { sleep 0.1; latch.count_down } }
expect(latch.wait(1)).to be true
sleep 0.2
subject.post {}
sleep 0.2
expect(subject.length).to be < 4
end

it 'deals with dead threads' do
expect(subject).to receive(:ns_worker_died).exactly(5).times.and_call_original

dead_threads_queue = Queue.new
5.times { subject.post { sleep 0.1; dead_threads_queue.push Thread.current; raise Exception } }
sleep(0.2)
latch = Concurrent::CountDownLatch.new(5)
5.times { subject.post { sleep 0.1; latch.count_down } }
expect(latch.wait(1)).to be true
dead_threads_queue = Queue.new
5.times { subject.post { sleep 0.1; dead_threads_queue.push Thread.current; raise Exception } }
sleep(0.2)
latch = Concurrent::CountDownLatch.new(5)
5.times { subject.post { sleep 0.1; latch.count_down } }
expect(latch.wait(1)).to be true

dead_threads = []
dead_threads << dead_threads_queue.pop until dead_threads_queue.empty?
expect(dead_threads.all? { |t| !t.alive? }).to be true
end
dead_threads = []
dead_threads << dead_threads_queue.pop until dead_threads_queue.empty?
expect(dead_threads.all? { |t| !t.alive? }).to be true
end
end

context 'worker creation and caching' do
context 'worker creation and caching' do

subject { described_class.new(idletime: 1, max_threads: 5) }
subject { described_class.new(idletime: 1, max_threads: 5) }

it 'creates new workers when there are none available' do
expect(subject.length).to eq 0
5.times { sleep(0.1); subject << proc { sleep(1) } }
sleep(1)
expect(subject.length).to eq 5
end
it 'creates new workers when there are none available' do
expect(subject.length).to eq 0
5.times { sleep(0.1); subject << proc { sleep(1) } }
sleep(1)
expect(subject.length).to eq 5
end

it 'uses existing idle threads' do
5.times { subject << proc { sleep(0.1) } }
sleep(1)
expect(subject.length).to be >= 5
3.times { subject << proc { sleep(1) } }
sleep(0.1)
expect(subject.length).to be >= 5
end
it 'uses existing idle threads' do
5.times { subject << proc { sleep(0.1) } }
sleep(1)
expect(subject.length).to be >= 5
3.times { subject << proc { sleep(1) } }
sleep(0.1)
expect(subject.length).to be >= 5
end
end
end

context 'stress', notravis: true do
configurations = [
{ min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
auto_terminate: false,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 },
{ min_threads: 2,
max_threads: 4,
auto_terminate: false,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 }
]

configurations.each do |config|
specify do
pool = RubyThreadPoolExecutor.new(config)

10.times do
count = Concurrent::CountDownLatch.new(100)
100.times do
pool.post { count.count_down }
end
count.wait
sleep 0.01 # let the tasks end after count_down
expect(pool.length).to be <= [200, config[:max_threads]].min
if pool.length > [110, config[:max_threads]].min
puts "ERRORSIZE #{pool.length} max #{config[:max_threads]}"
end
context 'stress', notravis: true do
configurations = [
{ min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
auto_terminate: false,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 },
{ min_threads: 2,
max_threads: 4,
auto_terminate: false,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 }
]

configurations.each do |config|
specify do
pool = RubyThreadPoolExecutor.new(config)

10.times do
count = Concurrent::CountDownLatch.new(100)
100.times do
pool.post { count.count_down }
end
count.wait
sleep 0.01 # let the tasks end after count_down
expect(pool.length).to be <= [200, config[:max_threads]].min
if pool.length > [110, config[:max_threads]].min
puts "ERRORSIZE #{pool.length} max #{config[:max_threads]}"
end
end
end
end
# end
end
end
end
60 changes: 30 additions & 30 deletions spec/concurrent/executor/fixed_thread_pool_spec.rb
Expand Up @@ -259,41 +259,41 @@ module Concurrent

context 'runtime-specific implementation' do

if Concurrent.on_jruby?

it 'sets :fallback_policy correctly' do
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
policy = clazz.new
expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)

subject = FixedThreadPool.new(5, fallback_policy: :discard)
expect(subject.fallback_policy).to eq :discard
end

else

context 'exception handling' do

it 'restarts threads that experience exception' do
count = subject.length
count.times{ subject << proc{ raise StandardError } }
sleep(1)
expect(subject.length).to eq count
end
# if Concurrent.on_jruby?
#
# it 'sets :fallback_policy correctly' do
# clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
# policy = clazz.new
# expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)
#
# subject = FixedThreadPool.new(5, fallback_policy: :discard)
# expect(subject.fallback_policy).to eq :discard
# end
#
# else

context 'exception handling' do

it 'restarts threads that experience exception' do
count = subject.length
count.times { subject << proc { raise StandardError } }
sleep(1)
expect(subject.length).to eq count
end
end

context 'worker creation and caching' do
context 'worker creation and caching' do

it 'creates new workers when there are none available' do
pool = described_class.new(5)
expect(pool.length).to eq 0
5.times{ pool << proc{ sleep(1) } }
sleep(0.1)
expect(pool.length).to eq 5
pool.kill
end
it 'creates new workers when there are none available' do
pool = described_class.new(5)
expect(pool.length).to eq 0
5.times { pool << proc { sleep(1) } }
sleep(0.1)
expect(pool.length).to eq 5
pool.kill
end
end
# end
end
end
end

0 comments on commit a6fc08f

Please sign in to comment.