Skip to content

Commit 9365cbf

Browse files
committed
Use global_operation_pool for jobs which may block
1 parent 1ca7374 commit 9365cbf

File tree

9 files changed

+25
-25
lines changed

9 files changed

+25
-25
lines changed

lib/concurrent/dataflow.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def update(time, value, reason)
6161
# @raise [ArgumentError] if no block is given
6262
# @raise [ArgumentError] if any of the inputs are not `IVar`s
6363
def dataflow(*inputs, &block)
64-
dataflow_with(Concurrent.configuration.global_task_pool, *inputs, &block)
64+
dataflow_with(Concurrent.configuration.global_operation_pool, *inputs, &block)
6565
end
6666
module_function :dataflow
6767

lib/concurrent/executor/timer_set.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class TimerSet
2323
# this executor rather than the global thread pool (overrides :operation)
2424
def initialize(opts = {})
2525
@queue = PriorityQueue.new(order: :min)
26-
@task_executor = OptionsParser::get_executor_from(opts)
26+
@task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool
2727
@timer_executor = SingleThreadExecutor.new
2828
@condition = Condition.new
2929
init_executor

lib/concurrent/future.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def initialize(opts = {}, &block)
6161
super(IVar::NO_VALUE, opts)
6262
@state = :unscheduled
6363
@task = block
64-
@executor = OptionsParser::get_executor_from(opts)
64+
@executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool
6565
end
6666

6767
# Execute an `:unscheduled` `Future`. Immediately sets the state to `:pending` and

lib/concurrent/options_parser.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ module OptionsParser
1010
# @option opts [Boolean] :operation (`false`) when true use the global operation pool
1111
# @option opts [Boolean] :task (`true`) when true use the global task pool
1212
#
13-
# @return [Executor] the requested thread pool (default: global task pool)
13+
# @return [Executor, nil] the requested thread pool, or nil when no option specified
1414
def get_executor_from(opts = {})
1515
if opts[:executor]
1616
opts[:executor]
1717
elsif opts[:operation] == true || opts[:task] == false
1818
Concurrent.configuration.global_operation_pool
19-
else
19+
elsif opts[:operation] == false || opts[:task] == true
2020
Concurrent.configuration.global_task_pool
21+
else
22+
nil
2123
end
2224
end
2325

lib/concurrent/promise.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
module Concurrent
77

8+
# TODO unify promise and future to single class, with dataflow
89
class Promise
910
include Obligation
1011

@@ -32,7 +33,7 @@ class Promise
3233
def initialize(opts = {}, &block)
3334
opts.delete_if { |k, v| v.nil? }
3435

35-
@executor = OptionsParser::get_executor_from(opts)
36+
@executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool
3637
@parent = opts.fetch(:parent) { nil }
3738
@on_fulfill = opts.fetch(:on_fulfill) { Proc.new { |result| result } }
3839
@on_reject = opts.fetch(:on_reject) { Proc.new { |reason| raise reason } }

lib/concurrent/scheduled_task.rb

+6-5
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@ def initialize(intended_time, opts = {}, &block)
1515
super(NO_VALUE, opts)
1616

1717
self.observers = CopyOnNotifyObserverSet.new
18-
@intended_time = intended_time
19-
@state = :unscheduled
20-
@task = block
18+
@intended_time = intended_time
19+
@state = :unscheduled
20+
@task = block
21+
@executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool
2122
end
2223

2324
# @since 0.5.0
2425
def execute
2526
if compare_and_set_state(:pending, :unscheduled)
2627
@schedule_time = TimerSet.calculate_schedule_time(@intended_time)
27-
Concurrent::timer(@schedule_time.to_f - Time.now.to_f, &method(:process_task))
28+
Concurrent::timer(@schedule_time.to_f - Time.now.to_f) { @executor.post &method(:process_task) }
2829
self
2930
end
3031
end
@@ -71,7 +72,7 @@ def process_task
7172
end
7273

7374
time = Time.now
74-
observers.notify_and_delete_observers{ [time, self.value, reason] }
75+
observers.notify_and_delete_observers { [time, self.value, reason] }
7576
end
7677
end
7778
end

spec/concurrent/dataflow_spec.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ module Concurrent
1212
expect { Concurrent::dataflow_with(root_executor) }.to raise_error(ArgumentError)
1313
end
1414

15-
specify '#dataflow uses the global task pool' do
15+
specify '#dataflow uses the global operation pool' do
1616
input = Future.execute{0}
1717
expect(Concurrent).to receive(:dataflow_with).once.
18-
with(Concurrent.configuration.global_task_pool, input)
18+
with(Concurrent.configuration.global_operation_pool, input)
1919
Concurrent::dataflow(input){0}
2020
end
2121

@@ -229,9 +229,9 @@ def fib_with_dot(n)
229229
end
230230
end
231231

232-
expected = fib_with_dot(14)
232+
expected = fib_with_dot(7)
233233
sleep(0.1)
234-
expect(expected.value).to eq 377
234+
expect(expected.value).to eq 13
235235
end
236236

237237
end

spec/concurrent/future_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ def trigger_observable(observable)
114114
Future.execute(task: true){ nil }
115115
end
116116

117-
it 'uses the global task pool by default' do
118-
expect(Concurrent.configuration).to receive(:global_task_pool).and_return(executor)
117+
it 'uses the global operation pool by default' do
118+
expect(Concurrent.configuration).to receive(:global_operation_pool).and_return(executor)
119119
Future.execute{ nil }
120120
end
121121
end

spec/concurrent/options_parser_spec.rb

+4-8
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,12 @@ module Concurrent
3939
OptionsParser::get_executor_from(task: true)
4040
end
4141

42-
it 'returns the global task pool when :executor is nil' do
43-
expect(Concurrent.configuration).to receive(:global_task_pool).
44-
and_return(:task_pool)
45-
OptionsParser::get_executor_from(executor: nil)
42+
it 'returns nil when :executor is nil' do
43+
expect(OptionsParser::get_executor_from(executor: nil)).to be_nil
4644
end
4745

48-
it 'returns the global task pool when no option is given' do
49-
expect(Concurrent.configuration).to receive(:global_task_pool).
50-
and_return(:task_pool)
51-
OptionsParser::get_executor_from
46+
it 'returns nil task pool when no option is given' do
47+
expect(OptionsParser::get_executor_from).to be_nil
5248
end
5349

5450
specify ':executor overrides :operation' do

0 commit comments

Comments
 (0)