Skip to content

Commit

Permalink
Add TimerTask.new(interval_type:) option to configure interval calc…
Browse files Browse the repository at this point in the history
…ulation

Can be either `:fixed_delay` or `:fixed_rate`, default to `:fixed_delay`
  • Loading branch information
bensheldon authored and eregon committed Jan 16, 2024
1 parent 18ffea9 commit fb19d0e
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 4 deletions.
54 changes: 51 additions & 3 deletions lib/concurrent-ruby/concurrent/timer_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ module Concurrent
# be tested separately then passed to the `TimerTask` for scheduling and
# running.
#
# A `TimerTask` supports two different types of interval calculations.
# A fixed delay will always wait the same amount of time between the
# completion of one task and the start of the next. A fixed rate will
# attempt to maintain a constant rate of execution regardless of the
# duration of the task. For example, if a fixed rate task is scheduled
# to run every 60 seconds but the task itself takes 10 seconds to
# complete, the next task will be scheduled to run 50 seconds after
# the start of the previous task. If the task takes 70 seconds to
# complete, the next task will be start immediately after the previous
# task completes. Tasks will not be executed concurrently.
#
# In some cases it may be necessary for a `TimerTask` to affect its own
# execution cycle. To facilitate this, a reference to the TimerTask instance
# is passed as an argument to the provided block every time the task is
Expand Down Expand Up @@ -74,6 +85,12 @@ module Concurrent
#
# #=> 'Boom!'
#
# @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay
# task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do
# puts 'Boom!'
# end
# task.interval_type #=> :fixed_rate
#
# @example Last `#value` and `Dereferenceable` mixin
# task = Concurrent::TimerTask.new(
# dup_on_deref: true,
Expand Down Expand Up @@ -152,8 +169,16 @@ class TimerTask < RubyExecutorService
# Default `:execution_interval` in seconds.
EXECUTION_INTERVAL = 60

# Default `:timeout_interval` in seconds.
TIMEOUT_INTERVAL = 30
# Maintain the interval between the end of one execution and the start of the next execution.
FIXED_DELAY = :fixed_delay

# Maintain the interval between the start of one execution and the start of the next.
# If execution time exceeds the interval, the next execution will start immediately
# after the previous execution finishes. Executions will not run concurrently.
FIXED_RATE = :fixed_rate

# Default `:interval_type`
DEFAULT_INTERVAL_TYPE = FIXED_DELAY

# Create a new TimerTask with the given task and configuration.
#
Expand All @@ -164,6 +189,9 @@ class TimerTask < RubyExecutorService
# @option opts [Boolean] :run_now Whether to run the task immediately
# upon instantiation or to wait until the first # execution_interval
# has passed (default: false)
# @options opts [Symbol] :interval_type method to calculate the interval
# between executions, can be either :fixed_rate or :fixed_delay.
# (default: :fixed_delay)
# @option opts [Executor] executor, default is `global_io_executor`
#
# @!macro deref_options
Expand Down Expand Up @@ -243,6 +271,10 @@ def execution_interval=(value)
end
end

# @!attribute [r] interval_type
# @return [Symbol] method to calculate the interval between executions
attr_reader :interval_type

# @!attribute [rw] timeout_interval
# @return [Fixnum] Number of seconds the task can run before it is
# considered to have failed.
Expand All @@ -265,10 +297,15 @@ def ns_initialize(opts, &task)
set_deref_options(opts)

self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
if opts[:interval_type] && ![FIXED_DELAY, FIXED_RATE].include?(opts[:interval_type])
raise ArgumentError.new('interval_type must be either :fixed_delay or :fixed_rate')
end
if opts[:timeout] || opts[:timeout_interval]
warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
end

@run_now = opts[:now] || opts[:run_now]
@interval_type = opts[:interval_type] || DEFAULT_INTERVAL_TYPE
@task = Concurrent::SafeTaskExecutor.new(task)
@executor = opts[:executor] || Concurrent.global_io_executor
@running = Concurrent::AtomicBoolean.new(false)
Expand Down Expand Up @@ -298,16 +335,27 @@ def schedule_next_task(interval = execution_interval)
# @!visibility private
def execute_task(completion)
return nil unless @running.true?
start_time = Concurrent.monotonic_time
_success, value, reason = @task.execute(self)
if completion.try?
self.value = value
schedule_next_task
schedule_next_task(calculate_next_interval(start_time))
time = Time.now
observers.notify_observers do
[time, self.value, reason]
end
end
nil
end

# @!visibility private
def calculate_next_interval(start_time)
if @interval_type == FIXED_RATE
run_time = Concurrent.monotonic_time - start_time
[execution_interval - run_time, 0].max
else # FIXED_DELAY
execution_interval
end
end
end
end
74 changes: 73 additions & 1 deletion spec/concurrent/timer_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def trigger_observable(observable)
expect(subject.execution_interval).to eq 5
end

it 'raises an exception if :interval_type is not a valid value' do
expect {
Concurrent::TimerTask.new(interval_type: :cat) { nil }
}.to raise_error(ArgumentError)
end

it 'uses the default :interval_type when no type is given' do
subject = TimerTask.new { nil }
expect(subject.interval_type).to eq TimerTask::FIXED_DELAY
end

it 'uses the given interval type' do
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE) { nil }
expect(subject.interval_type).to eq TimerTask::FIXED_RATE
end
end

context '#kill' do
Expand Down Expand Up @@ -113,7 +128,6 @@ def trigger_observable(observable)
end

specify '#execution_interval is writeable' do

latch = CountDownLatch.new(1)
subject = TimerTask.new(timeout_interval: 1,
execution_interval: 1,
Expand All @@ -133,6 +147,28 @@ def trigger_observable(observable)
subject.kill
end

it 'raises on invalid interval_type' do
expect {
fixed_delay = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
execution_interval: 0.1,
run_now: true) { nil }
fixed_delay.kill
}.not_to raise_error

expect {
fixed_rate = TimerTask.new(interval_type: TimerTask::FIXED_RATE,
execution_interval: 0.1,
run_now: true) { nil }
fixed_rate.kill
}.not_to raise_error

expect {
TimerTask.new(interval_type: :unknown,
execution_interval: 0.1,
run_now: true) { nil }
}.to raise_error(ArgumentError)
end

specify '#timeout_interval being written produces a warning' do
subject = TimerTask.new(timeout_interval: 1,
execution_interval: 0.1,
Expand Down Expand Up @@ -209,6 +245,42 @@ def trigger_observable(observable)

expect(executor).to have_received(:post)
end

it 'uses a fixed delay when set' do
finished = []
latch = CountDownLatch.new(2)
subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
execution_interval: 0.1,
run_now: true) do |task|
sleep(0.2)
finished << Concurrent.monotonic_time
latch.count_down
end
subject.execute
latch.wait(1)
subject.kill

expect(latch.count).to eq(0)
expect(finished[1] - finished[0]).to be >= 0.3
end

it 'uses a fixed rate when set' do
finished = []
latch = CountDownLatch.new(2)
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE,
execution_interval: 0.1,
run_now: true) do |task|
sleep(0.2)
finished << Concurrent.monotonic_time
latch.count_down
end
subject.execute
latch.wait(1)
subject.kill

expect(latch.count).to eq(0)
expect(finished[1] - finished[0]).to be < 0.3
end
end

context 'observation' do
Expand Down

0 comments on commit fb19d0e

Please sign in to comment.