Skip to content

Commit

Permalink
Add Concurrent.executor as an easy access point to global executors
Browse files Browse the repository at this point in the history
  • Loading branch information
pitr-ch committed Apr 22, 2015
1 parent af23a18 commit 80b2e46
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 181 deletions.
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -35,4 +35,3 @@ ext/**/*.bundle
ext/**/*.so
ext/**/*.jar
pkg
*.gem
6 changes: 2 additions & 4 deletions lib/concurrent/agent.rb
Expand Up @@ -2,7 +2,6 @@

require 'concurrent/dereferenceable'
require 'concurrent/observable'
require 'concurrent/executor/executor_options'
require 'concurrent/utility/timeout'
require 'concurrent/logging'

Expand All @@ -15,7 +14,6 @@ module Concurrent
class Agent
include Dereferenceable
include Observable
include ExecutorOptions
include Logging

attr_reader :timeout, :io_executor, :fast_executor
Expand All @@ -42,8 +40,8 @@ def initialize(initial, opts = {})
@validator = Proc.new { |result| true }
self.observers = CopyOnWriteObserverSet.new
@serialized_execution = SerializedExecution.new
@io_executor = get_executor_from(opts) || Concurrent.global_io_executor
@fast_executor = get_executor_from(opts) || Concurrent.global_fast_executor
@io_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@fast_executor = Executor.executor_from_options(opts) || Concurrent.global_fast_executor
init_mutex
set_deref_options(opts)
end
Expand Down
10 changes: 10 additions & 0 deletions lib/concurrent/configuration.rb
Expand Up @@ -105,6 +105,16 @@ def self.global_timer_set
GLOBAL_TIMER_SET.value
end

# General access point to global executors.
# @param [Symbol, Executor] maps symbols:
# - :fast - {Concurrent.global_fast_executor}
# - :io - {Concurrent.global_io_executor}
# - :immediate - {Concurrent.global_immediate_executor}
# @return [Executor]
def self.executor(executor_identifier)
Executor.executor(executor_identifier)
end

def self.new_fast_executor(opts = {})
FixedThreadPool.new(
[2, Concurrent.processor_count].max,
Expand Down
4 changes: 1 addition & 3 deletions lib/concurrent/delay.rb
@@ -1,7 +1,6 @@
require 'thread'
require 'concurrent/configuration'
require 'concurrent/obligation'
require 'concurrent/executor/executor_options'
require 'concurrent/executor/immediate_executor'

module Concurrent
Expand Down Expand Up @@ -39,7 +38,6 @@ module Concurrent
# @see Concurrent::Dereferenceable
class Delay
include Obligation
include ExecutorOptions

# NOTE: Because the global thread pools are lazy-loaded with these objects
# there is a performance hit every time we post a new task to one of these
Expand All @@ -61,7 +59,7 @@ def initialize(opts = {}, &block)

init_obligation
set_deref_options(opts)
@task_executor = get_executor_from(opts)
@task_executor = Executor.executor_from_options(opts)

@task = block
@state = :pending
Expand Down
61 changes: 61 additions & 0 deletions lib/concurrent/executor/executor.rb
Expand Up @@ -6,6 +6,65 @@
module Concurrent

module Executor
include Logging

# Get the requested `Executor` based on the values set in the options hash.
#
# @param [Hash] opts the options defining the requested executor
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:fast` returns the global fast executor,
# `:io` returns the global io executor, and `:immediate` returns a new
# `ImmediateExecutor` object.
#
# @return [Executor, nil] the requested thread pool, or nil when no option specified
#
# @!visibility private
def self.executor_from_options(opts = {}) # :nodoc:
case
when opts.key?(:executor)
if opts[:executor].nil?
nil
else
executor(opts[:executor])
end
when opts.key?(:operation) || opts.key?(:task)
if opts[:operation] == true || opts[:task] == false
Kernel.warn '[DEPRECATED] use `executor: :fast` instead'
return Concurrent.global_fast_executor
end

if opts[:operation] == false || opts[:task] == true
Kernel.warn '[DEPRECATED] use `executor: :io` instead'
return Concurrent.global_io_executor
end

raise ArgumentError.new("executor '#{opts[:executor]}' not recognized")
else
nil
end
end

def self.executor(executor_identifier)
case executor_identifier
when :fast
Concurrent.global_fast_executor
when :io
Concurrent.global_io_executor
when :immediate
Concurrent.global_immediate_executor
when :operation
Kernel.warn '[DEPRECATED] use `executor: :fast` instead'
Concurrent.global_fast_executor
when :task
Kernel.warn '[DEPRECATED] use `executor: :io` instead'
Concurrent.global_io_executor
when Executor
executor_identifier
else
raise ArgumentError, "executor not recognized by '#{executor_identifier}'"
end
end

# The policy defining how rejected tasks (tasks received once the
# queue size reaches the configured `max_queue`, or after the
# executor has shut down) are handled. Must be one of the values
Expand Down Expand Up @@ -363,4 +422,6 @@ def synchronize
end
end
end


end
62 changes: 0 additions & 62 deletions lib/concurrent/executor/executor_options.rb

This file was deleted.

12 changes: 5 additions & 7 deletions lib/concurrent/executor/ruby_thread_pool_executor.rb
Expand Up @@ -187,13 +187,11 @@ def ns_shutdown_execution

# @api private
def ns_kill_execution
ns_shutdown_execution
unless stopped_event.wait(1)
@pool.each &:kill
@pool.clear
@ready.clear
# TODO log out unprocessed tasks in queue
end
# TODO log out unprocessed tasks in queue
# TODO try to shutdown first?
@pool.each &:kill
@pool.clear
@ready.clear
end

alias_method :kill_execution, :ns_kill_execution
Expand Down
4 changes: 1 addition & 3 deletions lib/concurrent/executor/timer_set.rb
Expand Up @@ -4,7 +4,6 @@
require 'concurrent/executor/executor'
require 'concurrent/executor/single_thread_executor'
require 'concurrent/utility/monotonic_time'
require 'concurrent/executor/executor_options'

module Concurrent

Expand All @@ -15,7 +14,6 @@ module Concurrent
# @!macro monotonic_clock_warning
class TimerSet
include RubyExecutor
include ExecutorOptions

# Create a new set of timed tasks.
#
Expand All @@ -28,7 +26,7 @@ class TimerSet
# `ImmediateExecutor` object.
def initialize(opts = {})
@queue = PriorityQueue.new(order: :min)
@task_executor = get_executor_from(opts) || Concurrent.global_io_executor
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@timer_executor = SingleThreadExecutor.new
@condition = Condition.new
init_executor
Expand Down
3 changes: 2 additions & 1 deletion lib/concurrent/file_map.rb
Expand Up @@ -5,7 +5,8 @@ module Concurrent
edge_lib_files = Dir['lib/concurrent/actor.rb',
'lib/concurrent/actor/**/*.rb',
'lib/concurrent/channel.rb',
'lib/concurrent/channel/**/*.rb'] & git_files
'lib/concurrent/channel/**/*.rb',
'lib/concurrent/next.rb'] & git_files
core_lib_files = all_lib_files - edge_lib_files

FILE_MAP = {
Expand Down
4 changes: 1 addition & 3 deletions lib/concurrent/future.rb
Expand Up @@ -2,7 +2,6 @@

require 'concurrent/ivar'
require 'concurrent/executor/safe_task_executor'
require 'concurrent/executor/executor_options'

module Concurrent

Expand All @@ -12,7 +11,6 @@ module Concurrent
# @see http://clojuredocs.org/clojure_core/clojure.core/future Clojure's future function
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html java.util.concurrent.Future
class Future < IVar
include ExecutorOptions

# Create a new `Future` in the `:unscheduled` state.
#
Expand All @@ -29,7 +27,7 @@ def initialize(opts = {}, &block)
super(IVar::NO_VALUE, opts)
@state = :unscheduled
@task = block
@executor = get_executor_from(opts) || Concurrent.global_io_executor
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@args = get_arguments_from(opts)
end

Expand Down
4 changes: 1 addition & 3 deletions lib/concurrent/promise.rb
@@ -1,7 +1,6 @@
require 'thread'

require 'concurrent/obligation'
require 'concurrent/executor/executor_options'

module Concurrent

Expand Down Expand Up @@ -183,7 +182,6 @@ module Concurrent
# - `rescue` is aliased by `catch` and `on_error`
class Promise
include Obligation
include ExecutorOptions

# Initialize a new Promise with the provided options.
#
Expand All @@ -204,7 +202,7 @@ class Promise
def initialize(opts = {}, &block)
opts.delete_if { |k, v| v.nil? }

@executor = get_executor_from(opts) || Concurrent.global_io_executor
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@args = get_arguments_from(opts)

@parent = opts.fetch(:parent) { nil }
Expand Down
4 changes: 1 addition & 3 deletions lib/concurrent/scheduled_task.rb
@@ -1,7 +1,6 @@
require 'concurrent/ivar'
require 'concurrent/utility/timer'
require 'concurrent/executor/safe_task_executor'
require 'concurrent/executor/executor_options'

module Concurrent

Expand Down Expand Up @@ -134,7 +133,6 @@ module Concurrent
#
# @!macro monotonic_clock_warning
class ScheduledTask < IVar
include ExecutorOptions

attr_reader :delay

Expand Down Expand Up @@ -166,7 +164,7 @@ def initialize(delay, opts = {}, &block)
self.observers = CopyOnNotifyObserverSet.new
@state = :unscheduled
@task = block
@executor = get_executor_from(opts) || Concurrent.global_io_executor
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
end

# Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending`
Expand Down

0 comments on commit 80b2e46

Please sign in to comment.