Skip to content

Commit

Permalink
Introduce Scheduler#load and Async::Idler for load shedding and s…
Browse files Browse the repository at this point in the history
…aturation. (#309)

Introduce Scheduler#load which is a 1-second load average. This load
average can be used to detect overload conditions in the event loop. In
addition, introduce Async::Idler which will schedule tasks up to a given
maximum_load.
  • Loading branch information
ioquatix committed Mar 5, 2024
1 parent 13d3f47 commit 7d8c770
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 1 deletion.
2 changes: 1 addition & 1 deletion async.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ Gem::Specification.new do |spec|

spec.add_dependency "console", "~> 1.10"
spec.add_dependency "fiber-annotation"
spec.add_dependency "io-event", "~> 1.1"
spec.add_dependency "io-event", "~> 1.5", ">= 1.5.1"
spec.add_dependency "timers", "~> 4.1"
end
27 changes: 27 additions & 0 deletions examples/load/test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env ruby

require_relative '../../lib/async'
require_relative '../../lib/async/idler'

Async do
idler = Async::Idler.new(0.8)

Async do
while true
idler.async do
$stdout.write '.'
while true
sleep 0.1
end
end
end
end

scheduler = Fiber.scheduler
while true
load = scheduler.load

$stdout.write "\nLoad: #{load} "
sleep 1.0
end
end
39 changes: 39 additions & 0 deletions lib/async/idler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

module Async
class Idler
def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
@maximum_load = maximum_load
@backoff = backoff
@parent = parent
end

def async(*arguments, parent: (@parent or Task.current), **options, &block)
wait

# It is crucial that we optimistically execute the child task, so that we prevent a tight loop invoking this method from consuming all available resources.
parent.async(*arguments, **options, &block)
end

def wait
scheduler = Fiber.scheduler
backoff = nil

while true
load = scheduler.load
break if load < @maximum_load

if backoff
sleep(backoff)
backoff *= 2.0
else
scheduler.yield
backoff = @backoff
end
end
end
end
end
35 changes: 35 additions & 0 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,33 @@ def initialize(parent = nil, selector: nil)

@blocked = 0

@busy_time = 0.0
@idle_time = 0.0

@timers = ::Timers::Group.new
end

# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
def load
total_time = @busy_time + @idle_time

# If the total time is zero, then the load is zero:
return 0.0 if total_time.zero?

# We normalize to a 1 second window:
if total_time > 1.0
ratio = 1.0 / total_time
@busy_time *= ratio
@idle_time *= ratio

# We don't need to divide here as we've already normalised it to a 1s window:
return @busy_time
else
return @busy_time / total_time
end
end

def scheduler_close
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
unless $!
Expand Down Expand Up @@ -267,6 +291,8 @@ def run_once(timeout = nil)
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
private def run_once!(timeout = 0)
start_time = Async::Clock.now

interval = @timers.wait_interval

# If there is no interval to wait (thus no timers), and no tasks, we could be done:
Expand All @@ -288,6 +314,15 @@ def run_once(timeout = nil)

@timers.fire

# Compute load:
end_time = Async::Clock.now
total_duration = end_time - start_time
idle_duration = @selector.idle_duration
busy_duration = total_duration - idle_duration

@busy_time += busy_duration
@idle_time += idle_duration

# The reactor still has work to do:
return true
end
Expand Down
35 changes: 35 additions & 0 deletions test/async/idler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2023, by Samuel Williams.

require 'async/idler'
require 'sus/fixtures/async'

require 'chainable_async'

describe Async::Idler do
include Sus::Fixtures::Async::ReactorContext
let(:idler) {subject.new(0.5)}

it 'can schedule tasks up to the desired load' do
# Generate the load:
Async do
while true
idler.async do
while true
sleep 0.1
end
end
end
end

# This test must be longer than the test window...
sleep 1.1

# Verify that the load is within the desired range:
expect(Fiber.scheduler.load).to be_within(0.1).of(0.5)
end

it_behaves_like ChainableAsync
end

0 comments on commit 7d8c770

Please sign in to comment.