Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions lib/concurrent/executor/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
module Concurrent

module Executor

# The policy defining how rejected tasks (tasks received once the
# queue size reaches the configured `max_queue`, or after the
# executor has shut down) are handled. Must be one of the values
# specified in `FALLBACK_POLICIES`.
attr_reader :fallback_policy

# @!macro [attach] executor_module_method_can_overflow_question
#
# Does the task queue have a maximum size?
Expand All @@ -17,6 +22,31 @@ def can_overflow?
false
end

# Handler which executes the `fallback_policy` once the queue size
# reaches `max_queue`.
#
# @param [Array] args the arguments to the task which is being handled.
#
# @!visibility private
def handle_fallback(*args)
case @fallback_policy
when :abort
raise RejectedExecutionError
when :discard
false
when :caller_runs
begin
yield(*args)
rescue => ex
# let it fail
log DEBUG, ex
end
true
else
fail "Unknown fallback policy #{@fallback_policy}"
end
end

# @!macro [attach] executor_module_method_serialized_question
#
# Does this executor guarantee serialization of its operations?
Expand Down Expand Up @@ -63,6 +93,9 @@ module RubyExecutor
include Executor
include Logging

# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = [:abort, :discard, :caller_runs]

# @!macro [attach] executor_method_post
#
# Submit a task to the executor for asynchronous processing.
Expand All @@ -78,16 +111,8 @@ module RubyExecutor
def post(*args, &task)
raise ArgumentError.new('no block given') unless block_given?
mutex.synchronize do
unless running?
# The executor is shut down - figure out how to reject this task
if self.respond_to?(:handle_overflow, true)
# Reject this task in the same way we'd reject an overflow
return handle_overflow(*args, &task)
else
# No handle_overflow method defined - just return false
return false
end
end
# If the executor is shut down, reject this task
return handle_fallback(*args, &task) unless running?
execute(*args, &task)
true
end
Expand Down Expand Up @@ -219,16 +244,20 @@ module JavaExecutor
include Executor
java_import 'java.lang.Runnable'

# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = {
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
}.freeze

# @!macro executor_method_post
def post(*args)
def post(*args, &task)
raise ArgumentError.new('no block given') unless block_given?
if running?
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
executor_submit.call { yield(*args) }
true
else
false
end
return handle_fallback(*args, &task) unless running?
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
executor_submit.call { yield(*args) }
true
rescue Java::JavaUtilConcurrent::RejectedExecutionException
raise RejectedExecutionError
end
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/java_cached_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
# Create a new thread pool.
#
# @param [Hash] opts the options defining pool behavior.
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
#
# @raise [ArgumentError] if `overflow_policy` is not a known policy
# @raise [ArgumentError] if `fallback_policy` is not a known policy
#
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
def initialize(opts = {})
@overflow_policy = opts.fetch(:overflow_policy, :abort)
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
@max_queue = 0

raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)

@executor = java.util.concurrent.Executors.newCachedThreadPool
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@fallback_policy].new)

set_shutdown_hook
end
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/java_fixed_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor
# Create a new thread pool.
#
# @param [Hash] opts the options defining pool behavior.
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
#
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
# @raise [ArgumentError] if `overflow_policy` is not a known policy
# @raise [ArgumentError] if `fallback_policy` is not a known policy
#
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
def initialize(num_threads, opts = {})
Expand All @@ -25,14 +25,14 @@ def initialize(num_threads, opts = {})
super(opts)


#@overflow_policy = opts.fetch(:overflow_policy, :abort)
#@fallback_policy = opts.fetch(:fallback_policy, :abort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these commented lines can be deleted. They are left over from a previous PR and I should have removed them earlier.

#@max_queue = 0
#
#raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
#raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
#raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
#
#@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads)
#@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
#@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@FALLBACK_policy].new)

set_shutdown_hook
end
Expand Down
6 changes: 6 additions & 0 deletions lib/concurrent/executor/java_single_thread_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ class JavaSingleThreadExecutor

# Create a new thread pool.
#
# @option opts [Symbol] :fallback_policy (:discard) the policy
# for handling new tasks that are received when the queue size
# has reached `max_queue` or after the executor has shut down
#
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
def initialize(opts = {})
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
@fallback_policy = opts.fetch(:fallback_policy, :discard)
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
set_shutdown_hook
end
end
Expand Down
31 changes: 10 additions & 21 deletions lib/concurrent/executor/java_thread_pool_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,14 @@ class JavaThreadPoolExecutor
# before being reclaimed.
DEFAULT_THREAD_IDLETIMEOUT = 60

# The set of possible overflow policies that may be set at thread pool creation.
OVERFLOW_POLICIES = {
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
}.freeze

# The maximum number of threads that may be created in the pool.
attr_reader :max_length

# The maximum number of tasks that may be waiting in the work queue at any one time.
# When the queue size reaches `max_queue` subsequent tasks will be rejected in
# accordance with the configured `overflow_policy`.
# accordance with the configured `fallback_policy`.
attr_reader :max_queue

# The policy defining how rejected tasks (tasks received once the queue size reaches
# the configured `max_queue`) are handled. Must be one of the values specified in
# `OVERFLOW_POLICIES`.
attr_reader :overflow_policy

# Create a new thread pool.
#
# @param [Hash] opts the options which configure the thread pool
Expand All @@ -52,27 +40,28 @@ class JavaThreadPoolExecutor
# number of seconds a thread may be idle before being reclaimed
# @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
# number of tasks allowed in the work queue at any one time; a value of
# zero means the queue may grow without bounnd
# @option opts [Symbol] :overflow_policy (:abort) the policy for handling new
# tasks that are received when the queue size has reached `max_queue`
# zero means the queue may grow without bound
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
# tasks that are received when the queue size has reached
# `max_queue` or the executir has shut down
#
# @raise [ArgumentError] if `:max_threads` is less than one
# @raise [ArgumentError] if `:min_threads` is less than zero
# @raise [ArgumentError] if `:overflow_policy` is not one of the values specified
# in `OVERFLOW_POLICIES`
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
# in `FALLBACK_POLICIES`
#
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
def initialize(opts = {})
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
@overflow_policy = opts.fetch(:overflow_policy, :abort)
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))

raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)

if @max_queue == 0
queue = java.util.concurrent.LinkedBlockingQueue.new
Expand All @@ -83,7 +72,7 @@ def initialize(opts = {})
@executor = java.util.concurrent.ThreadPoolExecutor.new(
min_length, max_length,
idletime, java.util.concurrent.TimeUnit::SECONDS,
queue, OVERFLOW_POLICIES[@overflow_policy].new)
queue, FALLBACK_POLICIES[@fallback_policy].new)

set_shutdown_hook
end
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/ruby_cached_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
# Create a new thread pool.
#
# @param [Hash] opts the options defining pool behavior.
# number of seconds a thread may be idle before it is reclaimed
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
#
# @raise [ArgumentError] if `overflow_policy` is not a known policy
# @raise [ArgumentError] if `fallback_policy` is not a known policy
def initialize(opts = {})
overflow_policy = opts.fetch(:overflow_policy, :abort)
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))

raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)

opts = opts.merge(
min_threads: 0,
max_threads: DEFAULT_MAX_POOL_SIZE,
overflow_policy: overflow_policy,
fallback_policy: fallback_policy,
max_queue: DEFAULT_MAX_QUEUE_SIZE,
idletime: DEFAULT_THREAD_IDLETIMEOUT
)
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/ruby_fixed_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor
#
# @param [Integer] num_threads the number of threads to allocate
# @param [Hash] opts the options defining pool behavior.
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
#
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
# @raise [ArgumentError] if `overflow_policy` is not a known policy
# @raise [ArgumentError] if `fallback_policy` is not a known policy
def initialize(num_threads, opts = {})
overflow_policy = opts.fetch(:overflow_policy, :abort)
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))

raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)

opts = {
min_threads: num_threads,
max_threads: num_threads,
overflow_policy: overflow_policy,
fallback_policy: fallback_policy,
max_queue: DEFAULT_MAX_QUEUE_SIZE,
idletime: DEFAULT_THREAD_IDLETIMEOUT,
}.merge(opts)
Expand Down
6 changes: 6 additions & 0 deletions lib/concurrent/executor/ruby_single_thread_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@ class RubySingleThreadExecutor

# Create a new thread pool.
#
# @option opts [Symbol] :fallback_policy (:discard) the policy for
# handling new tasks that are received when the queue size has
# reached `max_queue` or after the executor has shut down
#
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
def initialize(opts = {})
@queue = Queue.new
@thread = nil
@fallback_policy = opts.fetch(:fallback_policy, :discard)
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
init_executor
end

Expand Down
Loading