Permalink
Browse files

Add separate IO reactor to defeat slow clients

Previously, the app thread would be in charge of reading the request
directly from the client. This resulted in a set of slow clients being
able to completely starve the app thread pool and prevent any further
connections from being handled.

This new organization uses a seperate reactor thread that is in charge
of responding when a client has more data, buffering the data and
attempting to parse the data. When the data represents a fully realized
request, only then is it handed to the app thread pool. This means we
trust apps to not starve the pool, but don't trust clients.
  • Loading branch information...
1 parent 5b11c5e commit 6777c771d829a31634b968c74a829cc53b80a144 @evanphx evanphx committed Jul 23, 2012
Showing with 259 additions and 64 deletions.
  1. +142 −0 lib/puma/client.rb
  2. +82 −0 lib/puma/reactor.rb
  3. +31 −64 lib/puma/server.rb
  4. +4 −0 test/hello-post.ru
View
@@ -0,0 +1,142 @@
+module Puma
+ class Client
+ include Puma::Const
+
+ def initialize(io, env)
+ @io = io
+ @to_io = io.to_io
+ @proto_env = env
+ @env = env.dup
+
+ @parser = HttpParser.new
+ @parsed_bytes = 0
+ @read_header = true
+ @body = nil
+ @buffer = nil
+
+ @timeout_at = nil
+ end
+
+ attr_reader :env, :to_io, :body, :io, :timeout_at
+
+ def set_timeout(val)
+ @timeout_at = Time.now + val
+ end
+
+ def reset
+ @parser.reset
+ @read_header = true
+ @env = @proto_env.dup
+ @body = nil
+ @parsed_bytes = 0
+
+ if @buffer
+ @parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes)
+
+ if @parser.finished?
+ return setup_body
+ elsif @parsed_bytes >= MAX_HEADER
+ raise HttpParserError,
+ "HEADER is longer than allowed, aborting client early."
+ end
+
+ return false
+ end
+ end
+
+ def close
+ @io.close
+ end
+
+ EmptyBody = NullIO.new
+
+ def setup_body
+ body = @parser.body
+ cl = @env[CONTENT_LENGTH]
+
+ unless cl
+ @buffer = body.empty? ? nil : body
+ @body = EmptyBody
+ return true
+ end
+
+ remain = cl.to_i - body.bytesize
+
+ if remain <= 0
+ @body = StringIO.new(body)
+ return true
+ end
+
+ if remain > MAX_BODY
+ @body = Tempfile.new(Const::PUMA_TMP_BASE)
+ @body.binmode
+ else
+ # The body[0,0] trick is to get an empty string in the same
+ # encoding as body.
+ @body = StringIO.new body[0,0]
+ end
+
+ @body.write body
+
+ @body_remain = remain
+
+ @read_header = false
+
+ return false
+ end
+
+ def try_to_finish
+ return read_body unless @read_header
+
+ data = @io.readpartial(CHUNK_SIZE)
+
+ if @buffer
+ @buffer << data
+ else
+ @buffer = data
+ end
+
+ @parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes)
+
+ if @parser.finished?
+ return setup_body
+ elsif @parsed_bytes >= MAX_HEADER
+ raise HttpParserError,
+ "HEADER is longer than allowed, aborting client early."
+ end
+
+ false
+ end
+
+ def read_body
+ # Read an odd sized chunk so we can read even sized ones
+ # after this
+ remain = @body_remain
+
+ if remain > CHUNK_SIZE
+ want = CHUNK_SIZE
+ else
+ want = remain
+ end
+
+ chunk = @io.readpartial(want)
+
+ # No chunk means a closed socket
+ unless chunk
+ @body.close
+ raise EOFError
+ end
+
+ remain -= @body.write(chunk)
+
+ if remain <= 0
+ @body.rewind
+ return true
+ end
+
+ @body_remain = remain
+
+ false
+ end
+ end
+end
View
@@ -0,0 +1,82 @@
+module Puma
+ class Reactor
+ DefaultSleepFor = 5
+
+ def initialize(events, app_pool)
+ @events = events
+ @app_pool = app_pool
+
+ @mutex = Mutex.new
+ @ready, @trigger = IO.pipe
+ @input = []
+ @sleep_for = DefaultSleepFor
+ @timeouts = []
+ end
+
+ def run
+ sockets = [@ready]
+
+ while true
+ ready = IO.select sockets, nil, nil, @sleep_for
+
+ if ready and reads = ready[0]
+ reads.each do |c|
+ if c == @ready
+ @mutex.synchronize do
+ @ready.read(1) # drain
+ sockets += @input
+ @input.clear
+ end
+ else
+ begin
+ if c.try_to_finish
+ @app_pool << c
+ sockets.delete c
+ end
+ # The client doesn't know HTTP well
+ rescue HttpParserError => e
+ @events.parse_error self, c.env, e
+
+ rescue EOFError
+ c.close
+ sockets.delete c
+ end
+ end
+ end
+ end
+
+ unless @timeouts.empty?
+ now = Time.now
+
+ while @timeouts.first.timeout_at < now
+ c = @timeouts.shift
+ sockets.delete c
+ c.close
+
+ if @timeouts.empty?
+ @sleep_for = DefaultSleepFor
+ break
+ end
+ end
+ end
+ end
+ end
+
+ def run_in_thread
+ @thread = Thread.new { run }
+ end
+
+ def add(c)
+ @mutex.synchronize do
+ @input << c
+ @trigger << "!"
+
+ if c.timeout_at
+ @timeouts << c
+ @timeouts.sort! { |a,b| a.timeout_at <=> b.timeout_at }
+ @sleep_for = @timeouts.first.timeout_at.to_f - Time.now.to_f
+ end
+ end
+ end
+ end
+end
View
@@ -6,6 +6,8 @@
require 'puma/events'
require 'puma/null_io'
require 'puma/compat'
+require 'puma/reactor'
+require 'puma/client'
require 'puma/puma_http11'
@@ -196,10 +198,14 @@ def run(background=true)
@status = :run
- @thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client, env|
- process_client(client, env)
+ @thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client|
+ process_client(client)
end
+ @reactor = Reactor.new @events, @thread_pool
+
+ @reactor.run_in_thread
+
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
@@ -225,7 +231,8 @@ def handle_servers
if sock == check
break if handle_check
else
- pool << [sock.accept, @envs.fetch(sock, @proto_env)]
+ c = Client.new sock.accept, @envs.fetch(sock, @proto_env)
+ @reactor.add c
end
end
rescue Errno::ECONNABORTED
@@ -270,61 +277,23 @@ def handle_check
# indicates that it supports keep alive, wait for another request before
# returning.
#
- def process_client(client, proto_env)
- parser = HttpParser.new
- close_socket = true
-
+ def process_client(client)
begin
- while true
- parser.reset
-
- env = proto_env.dup
- data = client.readpartial(CHUNK_SIZE)
- nparsed = 0
-
- # Assumption: nparsed will always be less since data will get filled
- # with more after each parsing. If it doesn't get more then there was
- # a problem with the read operation on the client socket.
- # Effect is to stop processing when the socket can't fill the buffer
- # for further parsing.
- while nparsed < data.bytesize
- nparsed = parser.execute(env, data, nparsed)
-
- if parser.finished?
- cl = env[CONTENT_LENGTH]
-
- case handle_request(env, client, parser.body, cl)
- when false
- return
- when :async
- close_socket = false
- return
- end
-
- nparsed += parser.body.bytesize if cl
-
- if data.bytesize > nparsed
- data.slice!(0, nparsed)
- parser.reset
- env = @proto_env.dup
- nparsed = 0
- else
- unless ret = IO.select([client, @persistent_check], nil, nil, @persistent_timeout)
- raise EOFError, "Timed out persistent connection"
- end
+ close_socket = true
- return if ret.first.include? @persistent_check
- end
- else
- # Parser is not done, queue up more data to read and continue parsing
- chunk = client.readpartial(CHUNK_SIZE)
- return if !chunk or chunk.length == 0 # read failed, stop processing
-
- data << chunk
- if data.bytesize >= MAX_HEADER
- raise HttpParserError,
- "HEADER is longer than allowed, aborting client early."
- end
+ while true
+ case handle_request(client)
+ when false
+ return
+ when :async
+ close_socket = false
+ return
+ when true
+ unless client.reset
+ close_socket = false
+ client.set_timeout @persistent_timeout
+ @reactor.add client
+ return
end
end
end
@@ -335,7 +304,7 @@ def process_client(client, proto_env)
# The client doesn't know HTTP well
rescue HttpParserError => e
- @events.parse_error self, env, e
+ @events.parse_error self, client.env, e
# Server error
rescue StandardError => e
@@ -403,17 +372,15 @@ def normalize_env(env, client)
# was one. This is an optimization to keep from having to look
# it up again.
#
- def handle_request(env, client, body, cl)
+ def handle_request(req)
+ env = req.env
+ client = req.io
+
normalize_env env, client
env[PUMA_SOCKET] = client
- if cl
- body = read_body env, client, body, cl
- return false unless body
- else
- body = EmptyBody
- end
+ body = req.body
env[RACK_INPUT] = body
env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP
View
@@ -0,0 +1,4 @@
+run lambda { |env|
+ p :body => env['rack.input'].read
+ [200, {"Content-Type" => "text/plain"}, ["Hello World"]]
+}

0 comments on commit 6777c77

Please sign in to comment.