Skip to content

Commit

Permalink
Merge 3a45019 into 2160db1
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Chalupa committed May 22, 2014
2 parents 2160db1 + 3a45019 commit 9ac427a
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 168 deletions.
1 change: 0 additions & 1 deletion lib/concurrent/atomic/event.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
require 'thread'
require 'concurrent/utilities'
require 'concurrent/atomic/condition'

module Concurrent
Expand Down
89 changes: 48 additions & 41 deletions lib/concurrent/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'thread'
require 'concurrent/delay'
require 'concurrent/executor/thread_pool_executor'
require 'concurrent/executor/timer_set'
require 'concurrent/utility/processor_count'
Expand All @@ -8,57 +9,48 @@ module Concurrent
# An error class to be raised when errors occur during configuration.
ConfigurationError = Class.new(StandardError)

class << self
attr_accessor :configuration
end

# Perform gem-level configuration.
#
# @yield the configuration commands
# @yieldparam [Configuration] the current configuration object
def self.configure
(@mutex ||= Mutex.new).synchronize do
yield(configuration)

# initialize the global thread pools if necessary
configuration.global_task_pool
configuration.global_operation_pool
configuration.global_timer_set
end
end

# A gem-level configuration object.
class Configuration

# Create a new configuration object.
def initialize
@cores ||= Concurrent::processor_count
@cores = Concurrent::processor_count

@global_task_pool = Delay.new do
Concurrent::ThreadPoolExecutor.new(
min_threads: [2, @cores].max,
max_threads: [20, @cores * 15].max,
idletime: 2 * 60, # 2 minutes
max_queue: 0, # unlimited
overflow_policy: :abort # raise an exception
)
end

@global_operation_pool = Delay.new do
Concurrent::ThreadPoolExecutor.new(
min_threads: [2, @cores].max,
max_threads: [2, @cores].max,
idletime: 10 * 60, # 10 minutes
max_queue: [20, @cores * 15].max,
overflow_policy: :abort # raise an exception
)
end

@global_timer_set = Delay.new { Concurrent::TimerSet.new }
end

# Global thread pool optimized for short *tasks*.
#
# @return [ThreadPoolExecutor] the thread pool
def global_task_pool
@global_task_pool ||= Concurrent::ThreadPoolExecutor.new(
min_threads: [2, @cores].max,
max_threads: [20, @cores * 15].max,
idletime: 2 * 60, # 2 minutes
max_queue: 0, # unlimited
overflow_policy: :abort # raise an exception
)
@global_task_pool.value
end

# Global thread pool optimized for long *operations*.
#
# @return [ThreadPoolExecutor] the thread pool
def global_operation_pool
@global_operation_pool ||= Concurrent::ThreadPoolExecutor.new(
min_threads: [2, @cores].max,
max_threads: [2, @cores].max,
idletime: 10 * 60, # 10 minutes
max_queue: [20, @cores * 15].max,
overflow_policy: :abort # raise an exception
)
@global_operation_pool.value
end

# Global thread pool optimized for *timers*
Expand All @@ -67,7 +59,7 @@ def global_operation_pool
#
# @see Concurrent::timer
def global_timer_set
@global_timer_set ||= Concurrent::TimerSet.new
@global_timer_set.value
end

# Global thread pool optimized for short *tasks*.
Expand All @@ -85,8 +77,8 @@ def global_timer_set
#
# @raise [ConfigurationError] if this thread pool has already been set
def global_task_pool=(executor)
raise ConfigurationError.new('global task pool was already set') unless @global_task_pool.nil?
@global_task_pool = executor
@global_task_pool.reconfigure { executor } or
raise ConfigurationError.new('global task pool was already set')
end

# Global thread pool optimized for long *operations*.
Expand All @@ -104,11 +96,28 @@ def global_task_pool=(executor)
#
# @raise [ConfigurationError] if this thread pool has already been set
def global_operation_pool=(executor)
raise ConfigurationError.new('global operation pool was already set') unless @global_operation_pool.nil?
@global_operation_pool = executor
@global_operation_pool.reconfigure { executor } or
raise ConfigurationError.new('global operation pool was already set')
end
end

# create the default configuration on load
@configuration = Configuration.new
singleton_class.send :attr_reader, :configuration

# Perform gem-level configuration.
#
# @yield the configuration commands
# @yieldparam [Configuration] the current configuration object
def self.configure
yield(configuration)

# initialize the global thread pools if necessary
configuration.global_task_pool
configuration.global_operation_pool
configuration.global_timer_set
end

private

# Attempt to properly shutdown the given executor using the `shutdown` or
Expand All @@ -129,8 +138,6 @@ def self.finalize_executor(executor)
false
end

# create the default configuration on load
self.configuration = Configuration.new

# set exit hook to shutdown global thread pools
at_exit do
Expand Down
37 changes: 27 additions & 10 deletions lib/concurrent/delay.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'thread'
require 'concurrent/obligation'

module Concurrent

Expand Down Expand Up @@ -48,7 +49,7 @@ def initialize(opts = {}, &block)

init_obligation
@state = :pending
@task = block
@task = block
set_deref_options(opts)
end

Expand Down Expand Up @@ -78,18 +79,34 @@ def value
mutex.unlock
end

# reconfigures the block returning the value if still #incomplete?
# @yield the delayed operation to perform
# @returns [true, false] if success
def reconfigure(&block)
mutex.lock
raise ArgumentError.new('no block given') unless block_given?
if @state == :pending
@task = block
true
else
false
end
ensure
mutex.unlock
end

private

def execute_task_once
if @state == :pending
begin
@value = @task.call
@state = :fulfilled
rescue => ex
@reason = ex
@state = :rejected
end
def execute_task_once
if @state == :pending
begin
@value = @task.call
@state = :fulfilled
rescue => ex
@reason = ex
@state = :rejected
end
end
end
end
end
Loading

0 comments on commit 9ac427a

Please sign in to comment.