diff --git a/lib/puma/client.rb b/lib/puma/client.rb new file mode 100644 index 0000000000..7aee5d8e10 --- /dev/null +++ b/lib/puma/client.rb @@ -0,0 +1,142 @@ +module Puma + class Client + include Puma::Const + + def initialize(io, env) + @io = io + @to_io = io.to_io + @proto_env = env + @env = env.dup + + @parser = HttpParser.new + @parsed_bytes = 0 + @read_header = true + @body = nil + @buffer = nil + + @timeout_at = nil + end + + attr_reader :env, :to_io, :body, :io, :timeout_at + + def set_timeout(val) + @timeout_at = Time.now + val + end + + def reset + @parser.reset + @read_header = true + @env = @proto_env.dup + @body = nil + @parsed_bytes = 0 + + if @buffer + @parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes) + + if @parser.finished? + return setup_body + elsif @parsed_bytes >= MAX_HEADER + raise HttpParserError, + "HEADER is longer than allowed, aborting client early." + end + + return false + end + end + + def close + @io.close + end + + EmptyBody = NullIO.new + + def setup_body + body = @parser.body + cl = @env[CONTENT_LENGTH] + + unless cl + @buffer = body.empty? ? nil : body + @body = EmptyBody + return true + end + + remain = cl.to_i - body.bytesize + + if remain <= 0 + @body = StringIO.new(body) + return true + end + + if remain > MAX_BODY + @body = Tempfile.new(Const::PUMA_TMP_BASE) + @body.binmode + else + # The body[0,0] trick is to get an empty string in the same + # encoding as body. + @body = StringIO.new body[0,0] + end + + @body.write body + + @body_remain = remain + + @read_header = false + + return false + end + + def try_to_finish + return read_body unless @read_header + + data = @io.readpartial(CHUNK_SIZE) + + if @buffer + @buffer << data + else + @buffer = data + end + + @parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes) + + if @parser.finished? + return setup_body + elsif @parsed_bytes >= MAX_HEADER + raise HttpParserError, + "HEADER is longer than allowed, aborting client early." + end + + false + end + + def read_body + # Read an odd sized chunk so we can read even sized ones + # after this + remain = @body_remain + + if remain > CHUNK_SIZE + want = CHUNK_SIZE + else + want = remain + end + + chunk = @io.readpartial(want) + + # No chunk means a closed socket + unless chunk + @body.close + raise EOFError + end + + remain -= @body.write(chunk) + + if remain <= 0 + @body.rewind + return true + end + + @body_remain = remain + + false + end + end +end diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb new file mode 100644 index 0000000000..2a19f8b914 --- /dev/null +++ b/lib/puma/reactor.rb @@ -0,0 +1,82 @@ +module Puma + class Reactor + DefaultSleepFor = 5 + + def initialize(events, app_pool) + @events = events + @app_pool = app_pool + + @mutex = Mutex.new + @ready, @trigger = IO.pipe + @input = [] + @sleep_for = DefaultSleepFor + @timeouts = [] + end + + def run + sockets = [@ready] + + while true + ready = IO.select sockets, nil, nil, @sleep_for + + if ready and reads = ready[0] + reads.each do |c| + if c == @ready + @mutex.synchronize do + @ready.read(1) # drain + sockets += @input + @input.clear + end + else + begin + if c.try_to_finish + @app_pool << c + sockets.delete c + end + # The client doesn't know HTTP well + rescue HttpParserError => e + @events.parse_error self, c.env, e + + rescue EOFError + c.close + sockets.delete c + end + end + end + end + + unless @timeouts.empty? + now = Time.now + + while @timeouts.first.timeout_at < now + c = @timeouts.shift + sockets.delete c + c.close + + if @timeouts.empty? + @sleep_for = DefaultSleepFor + break + end + end + end + end + end + + def run_in_thread + @thread = Thread.new { run } + end + + def add(c) + @mutex.synchronize do + @input << c + @trigger << "!" + + if c.timeout_at + @timeouts << c + @timeouts.sort! { |a,b| a.timeout_at <=> b.timeout_at } + @sleep_for = @timeouts.first.timeout_at.to_f - Time.now.to_f + end + end + end + end +end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 0ceb92d049..6b9682d88c 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -6,6 +6,8 @@ require 'puma/events' require 'puma/null_io' require 'puma/compat' +require 'puma/reactor' +require 'puma/client' require 'puma/puma_http11' @@ -196,10 +198,14 @@ def run(background=true) @status = :run - @thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client, env| - process_client(client, env) + @thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client| + process_client(client) end + @reactor = Reactor.new @events, @thread_pool + + @reactor.run_in_thread + if @auto_trim_time @thread_pool.auto_trim!(@auto_trim_time) end @@ -225,7 +231,8 @@ def handle_servers if sock == check break if handle_check else - pool << [sock.accept, @envs.fetch(sock, @proto_env)] + c = Client.new sock.accept, @envs.fetch(sock, @proto_env) + @reactor.add c end end rescue Errno::ECONNABORTED @@ -270,61 +277,23 @@ def handle_check # indicates that it supports keep alive, wait for another request before # returning. # - def process_client(client, proto_env) - parser = HttpParser.new - close_socket = true - + def process_client(client) begin - while true - parser.reset - - env = proto_env.dup - data = client.readpartial(CHUNK_SIZE) - nparsed = 0 - - # Assumption: nparsed will always be less since data will get filled - # with more after each parsing. If it doesn't get more then there was - # a problem with the read operation on the client socket. - # Effect is to stop processing when the socket can't fill the buffer - # for further parsing. - while nparsed < data.bytesize - nparsed = parser.execute(env, data, nparsed) - - if parser.finished? - cl = env[CONTENT_LENGTH] - - case handle_request(env, client, parser.body, cl) - when false - return - when :async - close_socket = false - return - end - - nparsed += parser.body.bytesize if cl - - if data.bytesize > nparsed - data.slice!(0, nparsed) - parser.reset - env = @proto_env.dup - nparsed = 0 - else - unless ret = IO.select([client, @persistent_check], nil, nil, @persistent_timeout) - raise EOFError, "Timed out persistent connection" - end + close_socket = true - return if ret.first.include? @persistent_check - end - else - # Parser is not done, queue up more data to read and continue parsing - chunk = client.readpartial(CHUNK_SIZE) - return if !chunk or chunk.length == 0 # read failed, stop processing - - data << chunk - if data.bytesize >= MAX_HEADER - raise HttpParserError, - "HEADER is longer than allowed, aborting client early." - end + while true + case handle_request(client) + when false + return + when :async + close_socket = false + return + when true + unless client.reset + close_socket = false + client.set_timeout @persistent_timeout + @reactor.add client + return end end end @@ -335,7 +304,7 @@ def process_client(client, proto_env) # The client doesn't know HTTP well rescue HttpParserError => e - @events.parse_error self, env, e + @events.parse_error self, client.env, e # Server error rescue StandardError => e @@ -403,17 +372,15 @@ def normalize_env(env, client) # was one. This is an optimization to keep from having to look # it up again. # - def handle_request(env, client, body, cl) + def handle_request(req) + env = req.env + client = req.io + normalize_env env, client env[PUMA_SOCKET] = client - if cl - body = read_body env, client, body, cl - return false unless body - else - body = EmptyBody - end + body = req.body env[RACK_INPUT] = body env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP diff --git a/test/hello-post.ru b/test/hello-post.ru new file mode 100644 index 0000000000..ff0e1a0aaa --- /dev/null +++ b/test/hello-post.ru @@ -0,0 +1,4 @@ +run lambda { |env| + p :body => env['rack.input'].read + [200, {"Content-Type" => "text/plain"}, ["Hello World"]] +}