From 314d9991b5f031e043585ae579ce2b065f2a860b Mon Sep 17 00:00:00 2001 From: Rob Day Date: Sun, 7 Dec 2014 22:59:46 +0000 Subject: [PATCH 1/3] Unify overflow/fallback handling between all four Executors --- lib/concurrent/executor/executor.rb | 64 +++++++++++++------ .../executor/java_single_thread_executor.rb | 5 ++ .../executor/java_thread_pool_executor.rb | 7 -- .../executor/ruby_single_thread_executor.rb | 5 ++ .../executor/ruby_thread_pool_executor.rb | 31 --------- 5 files changed, 56 insertions(+), 56 deletions(-) diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index ce82e0c4c..f60aee7f9 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -5,7 +5,11 @@ module Concurrent module Executor - + # The policy defining how rejected tasks (tasks received once the queue size reaches + # the configured `max_queue`) are handled. Must be one of the values specified in + # `OVERFLOW_POLICIES`. + attr_reader :overflow_policy + # @!macro [attach] executor_module_method_can_overflow_question # # Does the task queue have a maximum size? @@ -17,6 +21,31 @@ def can_overflow? false end + # Handler which executes the `overflow_policy` once the queue size + # reaches `max_queue`. + # + # @param [Array] args the arguments to the task which is being handled. + # + # @!visibility private + def handle_overflow(*args) + case @overflow_policy + when :abort + raise RejectedExecutionError + when :discard + false + when :caller_runs + begin + yield(*args) + rescue => ex + # let it fail + log DEBUG, ex + end + true + else + fail "Unknown overflow policy #{@overflow_policy}" + end + end + # @!macro [attach] executor_module_method_serialized_question # # Does this executor guarantee serialization of its operations? @@ -63,6 +92,9 @@ module RubyExecutor include Executor include Logging + # The set of possible overflow policies that may be set at thread pool creation. + OVERFLOW_POLICIES = [:abort, :discard, :caller_runs] + # @!macro [attach] executor_method_post # # Submit a task to the executor for asynchronous processing. @@ -78,16 +110,8 @@ module RubyExecutor def post(*args, &task) raise ArgumentError.new('no block given') unless block_given? mutex.synchronize do - unless running? - # The executor is shut down - figure out how to reject this task - if self.respond_to?(:handle_overflow, true) - # Reject this task in the same way we'd reject an overflow - return handle_overflow(*args, &task) - else - # No handle_overflow method defined - just return false - return false - end - end + # If the executor is shut down, reject this task + return handle_overflow(*args, &task) unless running? execute(*args, &task) true end @@ -219,16 +243,20 @@ module JavaExecutor include Executor java_import 'java.lang.Runnable' + # The set of possible overflow policies that may be set at thread pool creation. + OVERFLOW_POLICIES = { + abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy, + discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy, + caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy + }.freeze + # @!macro executor_method_post def post(*args) raise ArgumentError.new('no block given') unless block_given? - if running? - executor_submit = @executor.java_method(:submit, [Runnable.java_class]) - executor_submit.call { yield(*args) } - true - else - false - end + return handle_overflow(*args, &task) unless running? + executor_submit = @executor.java_method(:submit, [Runnable.java_class]) + executor_submit.call { yield(*args) } + true rescue Java::JavaUtilConcurrent::RejectedExecutionException raise RejectedExecutionError end diff --git a/lib/concurrent/executor/java_single_thread_executor.rb b/lib/concurrent/executor/java_single_thread_executor.rb index c96c2065e..9d6fd6bd6 100644 --- a/lib/concurrent/executor/java_single_thread_executor.rb +++ b/lib/concurrent/executor/java_single_thread_executor.rb @@ -10,11 +10,16 @@ class JavaSingleThreadExecutor # Create a new thread pool. # + # @option opts [Symbol] :overflow_policy (:discard) the policy for handling new + # tasks that are received when the queue size has reached `max_queue` + # # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html def initialize(opts = {}) @executor = java.util.concurrent.Executors.newSingleThreadExecutor + @overflow_policy = opts.fetch(:overflow_policy, :discard) + raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy) set_shutdown_hook end end diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index 1660c3dee..39aaf2705 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -20,13 +20,6 @@ class JavaThreadPoolExecutor # before being reclaimed. DEFAULT_THREAD_IDLETIMEOUT = 60 - # The set of possible overflow policies that may be set at thread pool creation. - OVERFLOW_POLICIES = { - abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy, - discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy, - caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy - }.freeze - # The maximum number of threads that may be created in the pool. attr_reader :max_length diff --git a/lib/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent/executor/ruby_single_thread_executor.rb index a512fd114..5cf9eac1f 100644 --- a/lib/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent/executor/ruby_single_thread_executor.rb @@ -9,12 +9,17 @@ class RubySingleThreadExecutor # Create a new thread pool. # + # @option opts [Symbol] :overflow_policy (:discard) the policy for handling new + # tasks that are received when the queue size has reached `max_queue` + # # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html def initialize(opts = {}) @queue = Queue.new @thread = nil + @overflow_policy = opts.fetch(:overflow_policy, :discard) + raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy) init_executor end diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index c79697818..4a4f93144 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -23,9 +23,6 @@ class RubyThreadPoolExecutor # before being reclaimed. DEFAULT_THREAD_IDLETIMEOUT = 60 - # The set of possible overflow policies that may be set at thread pool creation. - OVERFLOW_POLICIES = [:abort, :discard, :caller_runs] - # The maximum number of threads that may be created in the pool. attr_reader :max_length @@ -49,11 +46,6 @@ class RubyThreadPoolExecutor # accordance with the configured `overflow_policy`. attr_reader :max_queue - # The policy defining how rejected tasks (tasks received once the queue size reaches - # the configured `max_queue`) are handled. Must be one of the values specified in - # `OVERFLOW_POLICIES`. - attr_reader :overflow_policy - # Create a new thread pool. # # @param [Hash] opts the options which configure the thread pool @@ -224,29 +216,6 @@ def ensure_capacity? capacity end - # Handler which executes the `overflow_policy` once the queue size - # reaches `max_queue`. - # - # @param [Array] args the arguments to the task which is being handled. - # - # @!visibility private - def handle_overflow(*args) - case @overflow_policy - when :abort - raise RejectedExecutionError - when :discard - false - when :caller_runs - begin - yield(*args) - rescue => ex - # let it fail - log DEBUG, ex - end - true - end - end - # Scan all threads in the pool and reclaim any that are dead or # have been idle too long. Will check the last time the pool was # pruned and only run if the configured garbage collection From 5148d1043abb3b68e4c026f57d45c63ebf01d419 Mon Sep 17 00:00:00 2001 From: Rob Day Date: Mon, 8 Dec 2014 00:10:05 +0000 Subject: [PATCH 2/3] Rename 'overflow_policy' to 'fallback_policy'. This reflects its broader scope (e.g. deciding how to handle tasks after shutdown). --- lib/concurrent/executor/executor.rb | 29 +++++++-------- .../executor/java_cached_thread_pool.rb | 10 +++--- .../executor/java_fixed_thread_pool.rb | 10 +++--- .../executor/java_single_thread_executor.rb | 9 ++--- .../executor/java_thread_pool_executor.rb | 24 ++++++------- .../executor/ruby_cached_thread_pool.rb | 10 +++--- .../executor/ruby_fixed_thread_pool.rb | 10 +++--- .../executor/ruby_single_thread_executor.rb | 9 ++--- .../executor/ruby_thread_pool_executor.rb | 19 +++++----- .../executor/thread_pool_executor.rb | 8 ++--- .../executor/cached_thread_pool_shared.rb | 2 +- .../executor/fixed_thread_pool_shared.rb | 24 ++++++------- .../executor/java_cached_thread_pool_spec.rb | 16 ++++----- .../executor/java_fixed_thread_pool_spec.rb | 8 ++--- .../java_thread_pool_executor_spec.rb | 8 ++--- .../executor/ruby_cached_thread_pool_spec.rb | 2 +- .../executor/ruby_fixed_thread_pool_spec.rb | 2 +- .../ruby_thread_pool_executor_spec.rb | 36 +++++++++++++++---- .../executor/thread_pool_executor_shared.rb | 18 +++++----- 19 files changed, 138 insertions(+), 116 deletions(-) diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index f60aee7f9..b9f2175dc 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -5,10 +5,11 @@ module Concurrent module Executor - # The policy defining how rejected tasks (tasks received once the queue size reaches - # the configured `max_queue`) are handled. Must be one of the values specified in - # `OVERFLOW_POLICIES`. - attr_reader :overflow_policy + # The policy defining how rejected tasks (tasks received once the + # queue size reaches the configured `max_queue`, or after the + # executor has shut down) are handled. Must be one of the values + # specified in `FALLBACK_POLICIES`. + attr_reader :fallback_policy # @!macro [attach] executor_module_method_can_overflow_question # @@ -21,14 +22,14 @@ def can_overflow? false end - # Handler which executes the `overflow_policy` once the queue size + # Handler which executes the `fallback_policy` once the queue size # reaches `max_queue`. # # @param [Array] args the arguments to the task which is being handled. # # @!visibility private - def handle_overflow(*args) - case @overflow_policy + def handle_fallback(*args) + case @fallback_policy when :abort raise RejectedExecutionError when :discard @@ -42,7 +43,7 @@ def handle_overflow(*args) end true else - fail "Unknown overflow policy #{@overflow_policy}" + fail "Unknown fallback policy #{@fallback_policy}" end end @@ -92,8 +93,8 @@ module RubyExecutor include Executor include Logging - # The set of possible overflow policies that may be set at thread pool creation. - OVERFLOW_POLICIES = [:abort, :discard, :caller_runs] + # The set of possible fallback policies that may be set at thread pool creation. + FALLBACK_POLICIES = [:abort, :discard, :caller_runs] # @!macro [attach] executor_method_post # @@ -111,7 +112,7 @@ def post(*args, &task) raise ArgumentError.new('no block given') unless block_given? mutex.synchronize do # If the executor is shut down, reject this task - return handle_overflow(*args, &task) unless running? + return handle_fallback(*args, &task) unless running? execute(*args, &task) true end @@ -243,8 +244,8 @@ module JavaExecutor include Executor java_import 'java.lang.Runnable' - # The set of possible overflow policies that may be set at thread pool creation. - OVERFLOW_POLICIES = { + # The set of possible fallback policies that may be set at thread pool creation. + FALLBACK_POLICIES = { abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy, discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy, caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy @@ -253,7 +254,7 @@ module JavaExecutor # @!macro executor_method_post def post(*args) raise ArgumentError.new('no block given') unless block_given? - return handle_overflow(*args, &task) unless running? + return handle_fallback(*args, &task) unless running? executor_submit = @executor.java_method(:submit, [Runnable.java_class]) executor_submit.call { yield(*args) } true diff --git a/lib/concurrent/executor/java_cached_thread_pool.rb b/lib/concurrent/executor/java_cached_thread_pool.rb index 3535802a7..93c52b869 100644 --- a/lib/concurrent/executor/java_cached_thread_pool.rb +++ b/lib/concurrent/executor/java_cached_thread_pool.rb @@ -10,19 +10,19 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor # Create a new thread pool. # # @param [Hash] opts the options defining pool behavior. - # @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy + # @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy # - # @raise [ArgumentError] if `overflow_policy` is not a known policy + # @raise [ArgumentError] if `fallback_policy` is not a known policy # # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool-- def initialize(opts = {}) - @overflow_policy = opts.fetch(:overflow_policy, :abort) + @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) @max_queue = 0 - raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy) + raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy) @executor = java.util.concurrent.Executors.newCachedThreadPool - @executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new) + @executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@fallback_policy].new) set_shutdown_hook end diff --git a/lib/concurrent/executor/java_fixed_thread_pool.rb b/lib/concurrent/executor/java_fixed_thread_pool.rb index b792e8032..1869db020 100644 --- a/lib/concurrent/executor/java_fixed_thread_pool.rb +++ b/lib/concurrent/executor/java_fixed_thread_pool.rb @@ -10,10 +10,10 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor # Create a new thread pool. # # @param [Hash] opts the options defining pool behavior. - # @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy + # @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy # # @raise [ArgumentError] if `num_threads` is less than or equal to zero - # @raise [ArgumentError] if `overflow_policy` is not a known policy + # @raise [ArgumentError] if `fallback_policy` is not a known policy # # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int- def initialize(num_threads, opts = {}) @@ -25,14 +25,14 @@ def initialize(num_threads, opts = {}) super(opts) - #@overflow_policy = opts.fetch(:overflow_policy, :abort) + #@fallback_policy = opts.fetch(:fallback_policy, :abort) #@max_queue = 0 # #raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1 - #raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy) + #raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy) # #@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads) - #@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new) + #@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@FALLBACK_policy].new) set_shutdown_hook end diff --git a/lib/concurrent/executor/java_single_thread_executor.rb b/lib/concurrent/executor/java_single_thread_executor.rb index 9d6fd6bd6..074c3e333 100644 --- a/lib/concurrent/executor/java_single_thread_executor.rb +++ b/lib/concurrent/executor/java_single_thread_executor.rb @@ -10,16 +10,17 @@ class JavaSingleThreadExecutor # Create a new thread pool. # - # @option opts [Symbol] :overflow_policy (:discard) the policy for handling new - # tasks that are received when the queue size has reached `max_queue` + # @option opts [Symbol] :fallback_policy (:discard) the policy + # for handling new tasks that are received when the queue size + # has reached `max_queue` or after the executor has shut down # # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html def initialize(opts = {}) @executor = java.util.concurrent.Executors.newSingleThreadExecutor - @overflow_policy = opts.fetch(:overflow_policy, :discard) - raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy) + @fallback_policy = opts.fetch(:fallback_policy, :discard) + raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy) set_shutdown_hook end end diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index 39aaf2705..7a73fe1b7 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -25,14 +25,9 @@ class JavaThreadPoolExecutor # The maximum number of tasks that may be waiting in the work queue at any one time. # When the queue size reaches `max_queue` subsequent tasks will be rejected in - # accordance with the configured `overflow_policy`. + # accordance with the configured `fallback_policy`. attr_reader :max_queue - # The policy defining how rejected tasks (tasks received once the queue size reaches - # the configured `max_queue`) are handled. Must be one of the values specified in - # `OVERFLOW_POLICIES`. - attr_reader :overflow_policy - # Create a new thread pool. # # @param [Hash] opts the options which configure the thread pool @@ -45,14 +40,15 @@ class JavaThreadPoolExecutor # number of seconds a thread may be idle before being reclaimed # @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum # number of tasks allowed in the work queue at any one time; a value of - # zero means the queue may grow without bounnd - # @option opts [Symbol] :overflow_policy (:abort) the policy for handling new - # tasks that are received when the queue size has reached `max_queue` + # zero means the queue may grow without bound + # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new + # tasks that are received when the queue size has reached + # `max_queue` or the executir has shut down # # @raise [ArgumentError] if `:max_threads` is less than one # @raise [ArgumentError] if `:min_threads` is less than zero - # @raise [ArgumentError] if `:overflow_policy` is not one of the values specified - # in `OVERFLOW_POLICIES` + # @raise [ArgumentError] if `:fallback_policy` is not one of the values specified + # in `FALLBACK_POLICIES` # # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html def initialize(opts = {}) @@ -60,12 +56,12 @@ def initialize(opts = {}) max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i - @overflow_policy = opts.fetch(:overflow_policy, :abort) + @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0 raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0 raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length - raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy) + raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) if @max_queue == 0 queue = java.util.concurrent.LinkedBlockingQueue.new @@ -76,7 +72,7 @@ def initialize(opts = {}) @executor = java.util.concurrent.ThreadPoolExecutor.new( min_length, max_length, idletime, java.util.concurrent.TimeUnit::SECONDS, - queue, OVERFLOW_POLICIES[@overflow_policy].new) + queue, FALLBACK_POLICIES[@fallback_policy].new) set_shutdown_hook end diff --git a/lib/concurrent/executor/ruby_cached_thread_pool.rb b/lib/concurrent/executor/ruby_cached_thread_pool.rb index 97a771022..951cadc4e 100644 --- a/lib/concurrent/executor/ruby_cached_thread_pool.rb +++ b/lib/concurrent/executor/ruby_cached_thread_pool.rb @@ -8,18 +8,18 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor # Create a new thread pool. # # @param [Hash] opts the options defining pool behavior. - # number of seconds a thread may be idle before it is reclaimed + # @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy # - # @raise [ArgumentError] if `overflow_policy` is not a known policy + # @raise [ArgumentError] if `fallback_policy` is not a known policy def initialize(opts = {}) - overflow_policy = opts.fetch(:overflow_policy, :abort) + fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) - raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy) + raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy) opts = opts.merge( min_threads: 0, max_threads: DEFAULT_MAX_POOL_SIZE, - overflow_policy: overflow_policy, + fallback_policy: fallback_policy, max_queue: DEFAULT_MAX_QUEUE_SIZE, idletime: DEFAULT_THREAD_IDLETIMEOUT ) diff --git a/lib/concurrent/executor/ruby_fixed_thread_pool.rb b/lib/concurrent/executor/ruby_fixed_thread_pool.rb index 77119655b..ed29a386b 100644 --- a/lib/concurrent/executor/ruby_fixed_thread_pool.rb +++ b/lib/concurrent/executor/ruby_fixed_thread_pool.rb @@ -9,20 +9,20 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor # # @param [Integer] num_threads the number of threads to allocate # @param [Hash] opts the options defining pool behavior. - # @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy + # @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy # # @raise [ArgumentError] if `num_threads` is less than or equal to zero - # @raise [ArgumentError] if `overflow_policy` is not a known policy + # @raise [ArgumentError] if `fallback_policy` is not a known policy def initialize(num_threads, opts = {}) - overflow_policy = opts.fetch(:overflow_policy, :abort) + fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1 - raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy) + raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy) opts = { min_threads: num_threads, max_threads: num_threads, - overflow_policy: overflow_policy, + fallback_policy: fallback_policy, max_queue: DEFAULT_MAX_QUEUE_SIZE, idletime: DEFAULT_THREAD_IDLETIMEOUT, }.merge(opts) diff --git a/lib/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent/executor/ruby_single_thread_executor.rb index 5cf9eac1f..d42345e9f 100644 --- a/lib/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent/executor/ruby_single_thread_executor.rb @@ -9,8 +9,9 @@ class RubySingleThreadExecutor # Create a new thread pool. # - # @option opts [Symbol] :overflow_policy (:discard) the policy for handling new - # tasks that are received when the queue size has reached `max_queue` + # @option opts [Symbol] :fallback_policy (:discard) the policy for + # handling new tasks that are received when the queue size has + # reached `max_queue` or after the executor has shut down # # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html @@ -18,8 +19,8 @@ class RubySingleThreadExecutor def initialize(opts = {}) @queue = Queue.new @thread = nil - @overflow_policy = opts.fetch(:overflow_policy, :discard) - raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy) + @fallback_policy = opts.fetch(:fallback_policy, :discard) + raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) init_executor end diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index 4a4f93144..334399397 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -43,7 +43,7 @@ class RubyThreadPoolExecutor # The maximum number of tasks that may be waiting in the work queue at any one time. # When the queue size reaches `max_queue` subsequent tasks will be rejected in - # accordance with the configured `overflow_policy`. + # accordance with the configured `fallback_policy`. attr_reader :max_queue # Create a new thread pool. @@ -58,14 +58,15 @@ class RubyThreadPoolExecutor # number of seconds a thread may be idle before being reclaimed # @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum # number of tasks allowed in the work queue at any one time; a value of - # zero means the queue may grow without bounnd - # @option opts [Symbol] :overflow_policy (:abort) the policy for handling new - # tasks that are received when the queue size has reached `max_queue` + # zero means the queue may grow without bound + # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new + # tasks that are received when the queue size has reached + # `max_queue` or the executor has shut down # # @raise [ArgumentError] if `:max_threads` is less than one # @raise [ArgumentError] if `:min_threads` is less than zero - # @raise [ArgumentError] if `:overflow_policy` is not one of the values specified - # in `OVERFLOW_POLICIES` + # @raise [ArgumentError] if `:fallback_policy` is not one of the values specified + # in `FALLBACK_POLICIES` # # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html def initialize(opts = {}) @@ -73,11 +74,11 @@ def initialize(opts = {}) @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i - @overflow_policy = opts.fetch(:overflow_policy, :abort) + @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0 raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0 - raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy) + raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length init_executor @@ -161,7 +162,7 @@ def execute(*args, &task) @scheduled_task_count += 1 @queue << [args, task] else - handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue + handle_fallback(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue end end diff --git a/lib/concurrent/executor/thread_pool_executor.rb b/lib/concurrent/executor/thread_pool_executor.rb index fe6f3e832..c06032202 100644 --- a/lib/concurrent/executor/thread_pool_executor.rb +++ b/lib/concurrent/executor/thread_pool_executor.rb @@ -40,13 +40,13 @@ module Concurrent # * `idletime`: The number of seconds that a thread may be idle before being reclaimed. # * `max_queue`: The maximum number of tasks that may be waiting in the work queue at # any one time. When the queue size reaches `max_queue` subsequent tasks will be - # rejected in accordance with the configured `overflow_policy`. - # * `overflow_policy`: The policy defining how rejected tasks are handled. # + # rejected in accordance with the configured `fallback_policy`. + # * `fallback_policy`: The policy defining how rejected tasks are handled. # # - # Three overflow policies are supported: + # Three fallback policies are supported: # # * `:abort`: Raise a `RejectedExecutionError` exception and discard the task. - # * `:discard`: Silently discard the task and return `nil` as the task result. + # * `:discard`: Discard the task and return false. # * `:caller_runs`: Execute the task on the calling thread. # # {include:file:doc/thread_pools.md} diff --git a/spec/concurrent/executor/cached_thread_pool_shared.rb b/spec/concurrent/executor/cached_thread_pool_shared.rb index f16202b6a..1ec647c05 100644 --- a/spec/concurrent/executor/cached_thread_pool_shared.rb +++ b/spec/concurrent/executor/cached_thread_pool_shared.rb @@ -4,7 +4,7 @@ shared_examples :cached_thread_pool do subject do - described_class.new(overflow_policy: :discard) + described_class.new(fallback_policy: :discard) end after(:each) do diff --git a/spec/concurrent/executor/fixed_thread_pool_shared.rb b/spec/concurrent/executor/fixed_thread_pool_shared.rb index 7a8437d47..8f3a4bf61 100644 --- a/spec/concurrent/executor/fixed_thread_pool_shared.rb +++ b/spec/concurrent/executor/fixed_thread_pool_shared.rb @@ -26,8 +26,8 @@ expect(subject.max_length).to eq 5 end - it 'defaults :overflow_policy to :abort' do - expect(subject.overflow_policy).to eq :abort + it 'defaults :fallback_policy to :abort' do + expect(subject.fallback_policy).to eq :abort end @@ -55,9 +55,9 @@ expect(subject.max_queue).to eq 10 end - it 'correctly sets valid :overflow_policy' do - subject = described_class.new(5, :overflow_policy => :caller_runs) - expect(subject.overflow_policy).to eq :caller_runs + it 'correctly sets valid :fallback_policy' do + subject = described_class.new(5, :fallback_policy => :caller_runs) + expect(subject.fallback_policy).to eq :caller_runs end it "correctly sets valid :idletime" do @@ -65,9 +65,9 @@ expect(subject.idletime).to eq 10 end - it 'raises an exception if given an invalid :overflow_policy' do + it 'raises an exception if given an invalid :fallback_policy' do expect { - described_class.new(5, overflow_policy: :bogus) + described_class.new(5, fallback_policy: :bogus) }.to raise_error(ArgumentError) end @@ -180,7 +180,7 @@ end end - context 'overflow policy' do + context 'fallback policy' do before(:each) do @queue = Queue.new @@ -195,7 +195,7 @@ latch = Concurrent::CountDownLatch.new(5) mutex = Mutex.new - subject = described_class.new(2, :max_queue => 2, :overflow_policy => :abort) + subject = described_class.new(2, :max_queue => 2, :fallback_policy => :abort) expect { 5.times do |i| subject.post do @@ -209,11 +209,11 @@ end # On discard, we'd expect no error, but also not all five results - it 'discards when overflow is :discard' do + it 'discards when fallback_policy is :discard' do latch = Concurrent::CountDownLatch.new(5) mutex = Mutex.new - subject = described_class.new(2, :max_queue => 2, :overflow_policy => :discard) + subject = described_class.new(2, :max_queue => 2, :fallback_policy => :discard) 5.times do |i| subject.post do sleep 0.1 @@ -233,7 +233,7 @@ latch = Concurrent::CountDownLatch.new(5) mutex = Mutex.new - subject = described_class.new(2, :max_queue => 2, :overflow_policy => :caller_runs) + subject = described_class.new(2, :max_queue => 2, :fallback_policy => :caller_runs) 5.times do |i| subject.post do diff --git a/spec/concurrent/executor/java_cached_thread_pool_spec.rb b/spec/concurrent/executor/java_cached_thread_pool_spec.rb index 00c842c9b..91e4a7a7a 100644 --- a/spec/concurrent/executor/java_cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_cached_thread_pool_spec.rb @@ -8,7 +8,7 @@ module Concurrent describe JavaCachedThreadPool, :type=>:jruby do - subject { described_class.new(overflow_policy: :discard) } + subject { described_class.new(fallback_policy: :discard) } after(:each) do subject.kill @@ -19,23 +19,23 @@ module Concurrent context '#initialize' do - it 'sets :overflow_policy correctly' do + it 'sets :fallback_policy correctly' do clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy policy = clazz.new expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy) - subject = JavaCachedThreadPool.new(overflow_policy: :discard) - expect(subject.overflow_policy).to eq :discard + subject = JavaCachedThreadPool.new(fallback_policy: :discard) + expect(subject.fallback_policy).to eq :discard end - it 'defaults :overflow_policy to :abort' do + it 'defaults :fallback_policy to :abort' do subject = JavaCachedThreadPool.new - expect(subject.overflow_policy).to eq :abort + expect(subject.fallback_policy).to eq :abort end - it 'raises an exception if given an invalid :overflow_policy' do + it 'raises an exception if given an invalid :fallback_policy' do expect { - JavaCachedThreadPool.new(overflow_policy: :bogus) + JavaCachedThreadPool.new(fallback_policy: :bogus) }.to raise_error(ArgumentError) end end diff --git a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb index 4ddb86cc0..305b8da47 100644 --- a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb @@ -8,7 +8,7 @@ module Concurrent describe JavaFixedThreadPool, :type=>:jruby do - subject { described_class.new(5, overflow_policy: :discard) } + subject { described_class.new(5, fallback_policy: :discard) } after(:each) do subject.kill @@ -20,13 +20,13 @@ module Concurrent context '#initialize' do - it 'sets :overflow_policy correctly' do + it 'sets :fallback_policy correctly' do clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy policy = clazz.new expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy) - subject = JavaFixedThreadPool.new(5, overflow_policy: :discard) - expect(subject.overflow_policy).to eq :discard + subject = JavaFixedThreadPool.new(5, fallback_policy: :discard) + expect(subject.fallback_policy).to eq :discard end end diff --git a/spec/concurrent/executor/java_thread_pool_executor_spec.rb b/spec/concurrent/executor/java_thread_pool_executor_spec.rb index ad86f4c3e..4119dc690 100644 --- a/spec/concurrent/executor/java_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/java_thread_pool_executor_spec.rb @@ -19,7 +19,7 @@ module Concurrent max_threads: 5, idletime: 60, max_queue: 10, - overflow_policy: :discard + fallback_policy: :discard ) end @@ -36,7 +36,7 @@ module Concurrent max_threads: 5, idletime: 60, max_queue: 10, - overflow_policy: :abort + fallback_policy: :abort ) end @@ -49,7 +49,7 @@ module Concurrent max_threads: 5, idletime: 60, max_queue: 10, - overflow_policy: :discard + fallback_policy: :discard ) end @@ -62,7 +62,7 @@ module Concurrent max_threads: 5, idletime: 60, max_queue: 10, - overflow_policy: :caller_runs + fallback_policy: :caller_runs ) end end diff --git a/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb b/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb index 85dae0916..bb5c8b693 100644 --- a/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb @@ -7,7 +7,7 @@ module Concurrent subject do described_class.new( - overflow_policy: :discard, + fallback_policy: :discard, gc_interval: 0 ) end diff --git a/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb b/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb index a3c147498..9f7196934 100644 --- a/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb +++ b/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb @@ -5,7 +5,7 @@ module Concurrent describe RubyFixedThreadPool, :type=> :mrirbx do - subject { described_class.new(5, overflow_policy: :discard) } + subject { described_class.new(5, fallback_policy: :discard) } after(:each) do subject.kill diff --git a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb index 7ec824897..ec66c68a5 100644 --- a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb @@ -16,7 +16,7 @@ module Concurrent max_threads: 5, idletime: 60, max_queue: 10, - overflow_policy: :discard + fallback_policy: :discard ) end @@ -32,7 +32,7 @@ module Concurrent max_threads: 20, idletime: 60, max_queue: expected_max, - overflow_policy: :discard + fallback_policy: :discard ) end @@ -49,7 +49,7 @@ module Concurrent end end - context '#overload_policy' do + context '#fallback_policy' do let!(:min_threads){ 1 } let!(:max_threads){ 1 } @@ -64,7 +64,7 @@ module Concurrent max_threads: max_threads, idletime: idletime, max_queue: max_queue, - overflow_policy: :abort + fallback_policy: :abort ) end @@ -125,7 +125,7 @@ module Concurrent max_threads: max_threads, idletime: idletime, max_queue: max_queue, - overflow_policy: :discard + fallback_policy: :discard ) end @@ -179,7 +179,7 @@ module Concurrent max_threads: 1, idletime: idletime, max_queue: 1, - overflow_policy: :caller_runs + fallback_policy: :caller_runs ) end @@ -190,7 +190,7 @@ module Concurrent end specify '#<< executes the task on the current thread when the queue is at capacity' do - expect(subject).to receive(:handle_overflow).with(any_args).at_least(:once) + expect(subject).to receive(:handle_fallback).with(any_args).at_least(:once) 5.times{ subject << proc { sleep(0.1) } } end @@ -216,5 +216,27 @@ module Concurrent end end end + + context '#fallback_policy' do + context ':caller_runs is honoured even if the old fallback_policy arg is used' do + + subject do + described_class.new( + min_threads: 1, + max_threads: 1, + idletime: 60, + max_queue: 1, + fallback_policy: :caller_runs + ) + end + + specify '#<< executes the task on the current thread when the executor is shutting down' do + latch = Concurrent::CountDownLatch.new(1) + subject.shutdown + subject << proc { latch.count_down } + latch.wait(0.1) + end + end + end end end diff --git a/spec/concurrent/executor/thread_pool_executor_shared.rb b/spec/concurrent/executor/thread_pool_executor_shared.rb index e3738d248..05043e128 100644 --- a/spec/concurrent/executor/thread_pool_executor_shared.rb +++ b/spec/concurrent/executor/thread_pool_executor_shared.rb @@ -30,8 +30,8 @@ expect(subject.max_queue).to eq described_class::DEFAULT_MAX_QUEUE_SIZE end - it 'defaults :overflow_policy to :abort' do - expect(subject.overflow_policy).to eq :abort + it 'defaults :fallback_policy to :abort' do + expect(subject.fallback_policy).to eq :abort end end @@ -55,10 +55,10 @@ }.to raise_error(ArgumentError) end - it 'accepts all valid overflow policies' do - Concurrent::RubyThreadPoolExecutor::OVERFLOW_POLICIES.each do |policy| - subject = described_class.new(overflow_policy: policy) - expect(subject.overflow_policy).to eq policy + it 'accepts all valid fallback policies' do + Concurrent::RubyThreadPoolExecutor::FALLBACK_POLICIES.each do |policy| + subject = described_class.new(fallback_policy: policy) + expect(subject.fallback_policy).to eq policy end end @@ -75,9 +75,9 @@ }.to raise_error(ArgumentError) end - it 'raises an exception if given an invalid :overflow_policy' do + it 'raises an exception if given an invalid :fallback_policy' do expect { - described_class.new(overflow_policy: :bogus) + described_class.new(fallback_policy: :bogus) }.to raise_error(ArgumentError) end end @@ -114,7 +114,7 @@ min_threads: 2, max_threads: 5, max_queue: expected_max, - overflow_policy: :discard + fallback_policy: :discard ) end From 6bd961c6a57f944f0675f7bd188bff8c7876c21d Mon Sep 17 00:00:00 2001 From: Rob Day Date: Mon, 8 Dec 2014 00:40:09 +0000 Subject: [PATCH 3/3] Fixes from testing under JRuby --- lib/concurrent/executor/executor.rb | 2 +- .../ruby_thread_pool_executor_spec.rb | 190 ----------------- .../executor/thread_pool_executor_shared.rb | 192 ++++++++++++++++++ 3 files changed, 193 insertions(+), 191 deletions(-) diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index b9f2175dc..d89d6b45f 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -252,7 +252,7 @@ module JavaExecutor }.freeze # @!macro executor_method_post - def post(*args) + def post(*args, &task) raise ArgumentError.new('no block given') unless block_given? return handle_fallback(*args, &task) unless running? executor_submit = @executor.java_method(:submit, [Runnable.java_class]) diff --git a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb index ec66c68a5..b3a2a0eb1 100644 --- a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb @@ -48,195 +48,5 @@ module Concurrent expect(subject.remaining_capacity).to be < expected_max end end - - context '#fallback_policy' do - - let!(:min_threads){ 1 } - let!(:max_threads){ 1 } - let!(:idletime){ 60 } - let!(:max_queue){ 1 } - - context ':abort' do - - subject do - described_class.new( - min_threads: min_threads, - max_threads: max_threads, - idletime: idletime, - max_queue: max_queue, - fallback_policy: :abort - ) - end - - specify '#post raises an error when the queue is at capacity' do - expect { - 100.times{ subject.post{ sleep(1) } } - }.to raise_error(Concurrent::RejectedExecutionError) - end - - specify '#<< raises an error when the queue is at capacity' do - expect { - 100.times{ subject << proc { sleep(1) } } - }.to raise_error(Concurrent::RejectedExecutionError) - end - - specify '#post raises an error when the executor is shutting down' do - expect { - subject.shutdown; subject.post{ sleep(1) } - }.to raise_error(Concurrent::RejectedExecutionError) - end - - specify '#<< raises an error when the executor is shutting down' do - expect { - subject.shutdown; subject << proc { sleep(1) } - }.to raise_error(Concurrent::RejectedExecutionError) - end - - specify 'a #post task is never executed when the queue is at capacity' do - executed = Concurrent::AtomicFixnum.new(0) - 10.times do - begin - subject.post{ executed.increment; sleep(0.1) } - rescue - end - end - sleep(0.2) - expect(executed.value).to be < 10 - end - - specify 'a #<< task is never executed when the queue is at capacity' do - executed = Concurrent::AtomicFixnum.new(0) - 10.times do - begin - subject << proc { executed.increment; sleep(0.1) } - rescue - end - end - sleep(0.2) - expect(executed.value).to be < 10 - end - end - - context ':discard' do - - subject do - described_class.new( - min_threads: min_threads, - max_threads: max_threads, - idletime: idletime, - max_queue: max_queue, - fallback_policy: :discard - ) - end - - specify 'a #post task is never executed when the queue is at capacity' do - executed = Concurrent::AtomicFixnum.new(0) - 1000.times do - subject.post{ executed.increment } - end - sleep(0.1) - expect(executed.value).to be < 1000 - end - - specify 'a #<< task is never executed when the queue is at capacity' do - executed = Concurrent::AtomicFixnum.new(0) - 1000.times do - subject << proc { executed.increment } - end - sleep(0.1) - expect(executed.value).to be < 1000 - end - - specify 'a #post task is never executed when the executor is shutting down' do - executed = Concurrent::AtomicFixnum.new(0) - subject.shutdown - subject.post{ executed.increment } - sleep(0.1) - expect(executed.value).to be 0 - end - - specify 'a #<< task is never executed when the executor is shutting down' do - executed = Concurrent::AtomicFixnum.new(0) - subject.shutdown - subject << proc { executed.increment } - sleep(0.1) - expect(executed.value).to be 0 - end - - specify '#post returns false when the executor is shutting down' do - executed = Concurrent::AtomicFixnum.new(0) - subject.shutdown - ret = subject.post{ executed.increment } - expect(ret).to be false - end - end - - context ':caller_runs' do - - subject do - described_class.new( - min_threads: 1, - max_threads: 1, - idletime: idletime, - max_queue: 1, - fallback_policy: :caller_runs - ) - end - - specify '#post does not create any new threads when the queue is at capacity' do - initial = Thread.list.length - 5.times{ subject.post{ sleep(0.1) } } - expect(Thread.list.length).to be < initial + 5 - end - - specify '#<< executes the task on the current thread when the queue is at capacity' do - expect(subject).to receive(:handle_fallback).with(any_args).at_least(:once) - 5.times{ subject << proc { sleep(0.1) } } - end - - specify '#post executes the task on the current thread when the queue is at capacity' do - latch = Concurrent::CountDownLatch.new(5) - subject.post{ sleep(1) } - 5.times{|i| subject.post{ latch.count_down } } - latch.wait(0.1) - end - - specify '#post executes the task on the current thread when the executor is shutting down' do - latch = Concurrent::CountDownLatch.new(1) - subject.shutdown - subject.post{ latch.count_down } - latch.wait(0.1) - end - - specify '#<< executes the task on the current thread when the executor is shutting down' do - latch = Concurrent::CountDownLatch.new(1) - subject.shutdown - subject << proc { latch.count_down } - latch.wait(0.1) - end - end - end - - context '#fallback_policy' do - context ':caller_runs is honoured even if the old fallback_policy arg is used' do - - subject do - described_class.new( - min_threads: 1, - max_threads: 1, - idletime: 60, - max_queue: 1, - fallback_policy: :caller_runs - ) - end - - specify '#<< executes the task on the current thread when the executor is shutting down' do - latch = Concurrent::CountDownLatch.new(1) - subject.shutdown - subject << proc { latch.count_down } - latch.wait(0.1) - end - end - end end end diff --git a/spec/concurrent/executor/thread_pool_executor_shared.rb b/spec/concurrent/executor/thread_pool_executor_shared.rb index 05043e128..735772522 100644 --- a/spec/concurrent/executor/thread_pool_executor_shared.rb +++ b/spec/concurrent/executor/thread_pool_executor_shared.rb @@ -171,4 +171,196 @@ expect(subject.remaining_capacity).to eq expected_max end end + + context '#fallback_policy' do + + let!(:min_threads){ 1 } + let!(:max_threads){ 1 } + let!(:idletime){ 60 } + let!(:max_queue){ 1 } + + context ':abort' do + + subject do + described_class.new( + min_threads: min_threads, + max_threads: max_threads, + idletime: idletime, + max_queue: max_queue, + fallback_policy: :abort + ) + end + + specify '#post raises an error when the queue is at capacity' do + expect { + 100.times{ subject.post{ sleep(1) } } + }.to raise_error(Concurrent::RejectedExecutionError) + end + + specify '#<< raises an error when the queue is at capacity' do + expect { + 100.times{ subject << proc { sleep(1) } } + }.to raise_error(Concurrent::RejectedExecutionError) + end + + specify '#post raises an error when the executor is shutting down' do + expect { + subject.shutdown; subject.post{ sleep(1) } + }.to raise_error(Concurrent::RejectedExecutionError) + end + + specify '#<< raises an error when the executor is shutting down' do + expect { + subject.shutdown; subject << proc { sleep(1) } + }.to raise_error(Concurrent::RejectedExecutionError) + end + + specify 'a #post task is never executed when the queue is at capacity' do + executed = Concurrent::AtomicFixnum.new(0) + 10.times do + begin + subject.post{ executed.increment; sleep(0.1) } + rescue + end + end + sleep(0.2) + expect(executed.value).to be < 10 + end + + specify 'a #<< task is never executed when the queue is at capacity' do + executed = Concurrent::AtomicFixnum.new(0) + 10.times do + begin + subject << proc { executed.increment; sleep(0.1) } + rescue + end + end + sleep(0.2) + expect(executed.value).to be < 10 + end + end + + context ':discard' do + + subject do + described_class.new( + min_threads: min_threads, + max_threads: max_threads, + idletime: idletime, + max_queue: max_queue, + fallback_policy: :discard + ) + end + + specify 'a #post task is never executed when the queue is at capacity' do + executed = Concurrent::AtomicFixnum.new(0) + 1000.times do + subject.post{ executed.increment } + end + sleep(0.1) + expect(executed.value).to be < 1000 + end + + specify 'a #<< task is never executed when the queue is at capacity' do + executed = Concurrent::AtomicFixnum.new(0) + 1000.times do + subject << proc { executed.increment } + end + sleep(0.1) + expect(executed.value).to be < 1000 + end + + specify 'a #post task is never executed when the executor is shutting down' do + executed = Concurrent::AtomicFixnum.new(0) + subject.shutdown + subject.post{ executed.increment } + sleep(0.1) + expect(executed.value).to be 0 + end + + specify 'a #<< task is never executed when the executor is shutting down' do + executed = Concurrent::AtomicFixnum.new(0) + subject.shutdown + subject << proc { executed.increment } + sleep(0.1) + expect(executed.value).to be 0 + end + + specify '#post returns false when the executor is shutting down' do + executed = Concurrent::AtomicFixnum.new(0) + subject.shutdown + ret = subject.post{ executed.increment } + expect(ret).to be false + end + end + + context ':caller_runs' do + + subject do + described_class.new( + min_threads: 1, + max_threads: 1, + idletime: idletime, + max_queue: 1, + fallback_policy: :caller_runs + ) + end + + specify '#post does not create any new threads when the queue is at capacity' do + initial = Thread.list.length + 5.times{ subject.post{ sleep(0.1) } } + expect(Thread.list.length).to be < initial + 5 + end + + specify '#<< executes the task on the current thread when the queue is at capacity' do + latch = Concurrent::CountDownLatch.new(5) + subject.post{ sleep(1) } + 5.times{|i| subject << proc { latch.count_down } } + latch.wait(0.1) + end + + specify '#post executes the task on the current thread when the queue is at capacity' do + latch = Concurrent::CountDownLatch.new(5) + subject.post{ sleep(1) } + 5.times{|i| subject.post{ latch.count_down } } + latch.wait(0.1) + end + + specify '#post executes the task on the current thread when the executor is shutting down' do + latch = Concurrent::CountDownLatch.new(1) + subject.shutdown + subject.post{ latch.count_down } + latch.wait(0.1) + end + + specify '#<< executes the task on the current thread when the executor is shutting down' do + latch = Concurrent::CountDownLatch.new(1) + subject.shutdown + subject << proc { latch.count_down } + latch.wait(0.1) + end + end + end + + context '#overflow_policy' do + context ':caller_runs is honoured even if the old overflow_policy arg is used' do + + subject do + described_class.new( + min_threads: 1, + max_threads: 1, + idletime: 60, + max_queue: 1, + overflow_policy: :caller_runs + ) + end + + specify '#<< executes the task on the current thread when the executor is shutting down' do + latch = Concurrent::CountDownLatch.new(1) + subject.shutdown + subject << proc { latch.count_down } + latch.wait(0.1) + end + end + end end