Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject small delay for busy workers to improve requests distribution #2079

Merged
merged 1 commit into from May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -10,6 +10,7 @@
* Increases maximum URI path length from 2048 to 8196 bytes (#2167)
* Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203)
* Faster phased restart and worker timeout (#2121)
* Inject small delay for busy workers to improve requests distribution (#2079)

* Deprecations, Removals and Breaking API Changes
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)
Expand Down
102 changes: 102 additions & 0 deletions benchmarks/wrk/cpu_spin.sh
@@ -0,0 +1,102 @@
#!/bin/bash

set -eo pipefail

ITERATIONS=400000
HOST=127.0.0.1:9292
URL="http://$HOST/cpu/$ITERATIONS"

MIN_WORKERS=1
MAX_WORKERS=4

MIN_THREADS=4
MAX_THREADS=4

DURATION=2
MIN_CONCURRENT=1
MAX_CONCURRENT=8

retry() {
local tries="$1"
local sleep="$2"
shift 2

for i in $(seq 1 $tries); do
if eval "$@"; then
return 0
fi

sleep "$sleep"
done

return 1
}

ms() {
VALUE=$(cat)
FRAC=${VALUE%%[ums]*}
case "$VALUE" in
*us)
echo "scale=1; ${FRAC}/1000" | bc
;;

*ms)
echo "scale=1; ${FRAC}/1" | bc
;;

*s)
echo "scale=1; ${FRAC}*1000/1" | bc
;;
esac
}

run_wrk() {
result=$(wrk -H "Connection: Close" -c "$wrk_c" -t "$wrk_t" -d "$DURATION" --latency "$@" | tee -a wrk.txt)
req_sec=$(echo "$result" | grep "^Requests/sec:" | awk '{print $2}')
latency_avg=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $2}' | ms)
latency_stddev=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $3}' | ms)
latency_50=$(echo "$result" | grep "^\s*50%" | awk '{print $2}' | ms)
latency_75=$(echo "$result" | grep "^\s*75%" | awk '{print $2}' | ms)
latency_90=$(echo "$result" | grep "^\s*90%" | awk '{print $2}' | ms)
latency_99=$(echo "$result" | grep "^\s*99%" | awk '{print $2}' | ms)

echo -e "$workers\t$threads\t$wrk_c\t$wrk_t\t$req_sec\t$latency_avg\t$latency_stddev\t$latency_50\t$latency_75\t$latency_90\t$latency_99"
}

run_concurrency_tests() {
echo
echo -e "PUMA_W\tPUMA_T\tWRK_C\tWRK_T\tREQ_SEC\tL_AVG\tL_DEV\tL_50%\tL_75%\tL_90%\tL_99%"
for wrk_c in $(seq $MIN_CONCURRENT $MAX_CONCURRENT); do
wrk_t="$wrk_c"
eval "$@"
sleep 1
done
echo
}

with_puma() {
# start puma and wait for 10s for it to start
bundle exec bin/puma -w "$workers" -t "$threads" -b "tcp://$HOST" -C test/config/cpu_spin.rb &
local puma_pid=$!
trap "kill $puma_pid" EXIT

# wait for Puma to be up
if ! retry 10 1s curl --fail "$URL" &>/dev/null; then
echo "Failed to connect to $URL."
return 1
fi

# execute testing command
eval "$@"
kill "$puma_pid" || true
trap - EXIT
wait
}

for workers in $(seq $MIN_WORKERS $MAX_WORKERS); do
for threads in $(seq $MIN_THREADS $MAX_THREADS); do
with_puma \
run_concurrency_tests \
run_wrk "$URL"
done
done
12 changes: 12 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -664,6 +664,18 @@ def shutdown_debug(val=true)
@options[:shutdown_debug] = val
end

# Controls an injected delay (in seconds) before
# accepting new socket
# if we are already processing requests,
# it gives a time for less busy workers
# to pick workbefore us
#
# This only affects Ruby MRI implementation
# The best value is between 0.001 (1ms) to 0.010 (10ms)
def wait_for_less_busy_worker(val)
@options[:wait_for_less_busy_worker] = val.to_f
end

# Control how the remote address of the connection is set. This
# is configurable because to calculate the true socket peer address
# a kernel syscall is required which for very fast rack handlers
Expand Down
3 changes: 3 additions & 0 deletions lib/puma/server.rb
Expand Up @@ -283,6 +283,9 @@ def handle_servers
else
begin
pool.wait_until_not_full
pool.wait_for_less_busy_worker(
@options[:wait_for_less_busy_worker].to_f)

This comment was marked as resolved.

This comment was marked as resolved.


if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
if remote_addr_value
Expand Down
19 changes: 19 additions & 0 deletions lib/puma/thread_pool.rb
Expand Up @@ -228,6 +228,25 @@ def wait_until_not_full
end
end

def wait_for_less_busy_worker(delay_s)
# Ruby MRI does GVL, this can result
# in processing contention when multiple threads
# (requests) are running concurrently
return unless Puma.mri?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is adding two method calls to the hot path, but for now I'll leave them in. Just pointing out for future optimiation that we could metaprogram these out based on config.

Copy link
Contributor

@ghiculescu ghiculescu Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth swapping the order of calls.

require "benchmark/ips"

def mri?
  RUBY_ENGINE == 'ruby' || RUBY_ENGINE.nil?
end

num = rand(-10..10)

Benchmark.ips do |b|
  b.report("mri?") { mri? }

  b.report("delay_s > 0") { num > 0 }

  b.report("delay_s > 0 with presence check") { num && num > 0 }

  b.compare!
end
Warming up --------------------------------------
                mri?     1.043M i/100ms
         delay_s > 0     2.328M i/100ms
delay_s > 0 with presence check
                         2.281M i/100ms
Calculating -------------------------------------
                mri?     11.013M (± 3.6%) i/s -     55.264M in   5.025004s
         delay_s > 0     25.234M (± 3.0%) i/s -    128.063M in   5.079836s
delay_s > 0 with presence check
                         22.287M (± 4.6%) i/s -    111.772M in   5.026304s

Comparison:
         delay_s > 0: 25233810.8 i/s
delay_s > 0 with presence check: 22286998.1 i/s - 1.13x  (± 0.00) slower
                mri?: 11013083.0 i/s - 2.29x  (± 0.00) slower

(I'm not sure how hot this path runs / how much it matters.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return unless delay_s > 0

with_mutex do
return if @shutdown

# do not delay, if we are not busy
return unless busy_threads > 0

# this will be signaled once a request finishes,
# which can happen earlier than delay
@not_full.wait @mutex, delay_s
end
end

ayufan marked this conversation as resolved.
Show resolved Hide resolved
# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
Expand Down
17 changes: 17 additions & 0 deletions test/config/cpu_spin.rb
@@ -0,0 +1,17 @@
# call with "GET /cpu/<d> HTTP/1.1\r\n\r\n",
# where <d> is the number of iterations

require 'benchmark'

# configure `wait_for_less_busy_workers` based on ENV, default `true`
wait_for_less_busy_worker ENV.fetch('WAIT_FOR_LESS_BUSY_WORKERS', '0.005').to_f

app do |env|
iterations = (env['REQUEST_PATH'][/\/cpu\/(\d.*)/,1] || '1000').to_i

duration = Benchmark.measure do
iterations.times { rand }
end

[200, {"Content-Type" => "text/plain"}, ["Run for #{duration.total} #{Process.pid}"]]
end
1 change: 1 addition & 0 deletions test/helper.rb
Expand Up @@ -116,6 +116,7 @@ def skip_unless(eng, bt: caller)
when :darwin then "Skip unless darwin" unless RUBY_PLATFORM[/darwin/]
when :jruby then "Skip unless JRuby" unless Puma.jruby?
when :windows then "Skip unless Windows" unless Puma.windows?
when :mri then "Skip unless MRI" unless Puma.mri?
else false
end
skip skip_msg, bt if skip_msg
Expand Down
106 changes: 106 additions & 0 deletions test/test_busy_worker.rb
@@ -0,0 +1,106 @@
require_relative "helper"
require "puma/events"

class TestBusyWorker < Minitest::Test
parallelize_me!

def setup
@ios = []
@server = nil
end

def teardown
@server.stop(true) if @server
@ios.each {|i| i.close unless i.closed?}
end

def new_connection
TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
retry
end

def send_http(req)
new_connection << req
end

def send_http_and_read(req)
send_http(req).read
end

def with_server(**options, &app)
@requests_count = 0 # number of requests processed
@requests_running = 0 # current number of requests running
@requests_max_running = 0 # max number of requests running in parallel
@mutex = Mutex.new

request_handler = ->(env) do
@mutex.synchronize do
@requests_count += 1
@requests_running += 1
if @requests_running > @requests_max_running
@requests_max_running = @requests_running
end
end

begin
yield(env)
ensure
@mutex.synchronize do
@requests_running -= 1
end
end
end

@server = Puma::Server.new request_handler, Puma::Events.strings, **options
@server.min_threads = options[:min_threads] || 0
@server.max_threads = options[:max_threads] || 10
@server.add_tcp_listener '127.0.0.1', 0
@server.run
end

# Multiple concurrent requests are not processed
# sequentially as a small delay is introduced
def test_multiple_requests_waiting_on_less_busy_worker
skip_unless :mri

with_server(wait_for_less_busy_worker: 1.0) do |_|
sleep(0.1)

[200, {}, [""]]
end

n = 2

Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)

assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal 1, @requests_max_running, "maximum number of concurrent requests needs to be 1"
end

# Multiple concurrent requests are processed
# in parallel as a delay is disabled
def test_multiple_requests_processing_in_parallel
skip_unless :mri

with_server(wait_for_less_busy_worker: 0.0) do |_|
sleep(0.1)

[200, {}, [""]]
end

n = 4

Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)

assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal n, @requests_max_running, "maximum number of concurrent requests needs to match"
end
end