Permalink
Browse files

Async responses are now sent using chunked encoding under HTTP 1.1.

  • Loading branch information...
1 parent 31f7388 commit e93ff15423867227351f52a57e4c4678d1de817f @macournoyer committed Aug 6, 2012
View
@@ -6,22 +6,23 @@ module Protocols
# EventMachine HTTP protocol.
# Supports:
# * Rack specifications v1.1: http://rack.rubyforge.org/doc/SPEC.html
- # * Asynchronous responses, via the <tt>env['async.callback']</tt> or <tt>throw :async</tt>.
+ # * Asynchronous responses with chunked encoding, via the <tt>env['async.callback']</tt> or <tt>throw :async</tt>.
# * Keep-alive.
# * File streaming.
# * Calling the Rack app from pooled threads.
class Http < EM::Connection
# Http class has to be defined before requiring those.
require "thin/protocols/http/request"
require "thin/protocols/http/response"
+ require "thin/protocols/http/chunked_body"
attr_accessor :server
attr_accessor :listener
attr_reader :request, :response
- # == EM callbacks
+ # == EM callback methods
# Get the connection ready to process a request.
def post_init
@@ -34,18 +35,24 @@ def receive_data(data)
@parser << data
rescue HTTP::Parser::Error => e
$stderr.puts "Parse error: #{e}"
- send_response Response.error(400) # Bad Request
+ send_response_and_reset Response.error(400) # Bad Request
end
# Called when the connection is unbinded from the socket
# and can no longer be used to process requests.
def unbind
- @request.close if @request
- @response.close if @response
+ if @request
+ @request.close
+ @request = nil
+ end
+ if @response
+ @response.close
+ @response = nil
+ end
end
- # == Parser callbacks
+ # == Parser callback methods
def on_message_begin
@request = Request.new
@@ -74,7 +81,7 @@ def on_message_complete
end
- # == Request processing
+ # == Request processing methods
# Starts the processing of the current request in <tt>@request</tt>.
def process
@@ -95,68 +102,116 @@ def call_app
# Connection may be closed unless the App#call response was a [-1, ...]
# It should be noted that connection objects will linger until this
# callback is no longer referenced, so be tidy!
- @request.async_callback = method(:process_response)
+ @request.async_callback = method(:process_async_response)
# Call the Rack application
response = Response::ASYNC # `throw :async` will result in this response
catch(:async) do
response = @server.app.call(@request.env)
end
- # We're done with the request
- @request.close
-
response
rescue Exception
handle_error
nil # Signals that the request could not be processed
end
+
+ def prepare_response(response)
+ return unless response
+
+ Response.new(*response)
+ end
# Process the response returns by +call_app+.
def process_response(response)
- return unless response
+ @response = prepare_response(response)
- @response = Response.new(*response)
-
# We're going to respond later (async).
return if @response.async?
- # If the body is being deferred, then terminate afterward.
- @response.callback = method(:reset) if @response.callback?
+ # Close the resources used by the request as soon as possible.
+ @request.close
+
+ # Send the response.
+ send_response_and_reset
+ rescue Exception
+ handle_error
+ end
+
+ # Process the response sent asynchronously via <tt>body.call</tt>.
+ # The response will automatically be send using chunked encoding under
+ # HTTP 1.1 protocol.
+ def process_async_response(response)
+ @response = prepare_response(response)
+
+ # Terminate the connection on callback from the response's body.
+ @response.body_callback = method(:terminate_async_response)
+
+ # Use chunked encoding if available.
+ if @request.support_encoding_chunked?
+ @response.chunked_encoding!
+ @response.body = ChunkedBody.new(@response.body)
+ end
+
# Send the response.
send_response
-
+
rescue Exception
handle_error
end
+ # Called after an asynchronous response is done sending the body.
+ def terminate_async_response
+ if @request.support_encoding_chunked?
+ # Send tail chunk. 0 length signals we're done w/ HTTP chunked encoding.
+ send_chunk ChunkedBody::TAIL
+ end
+
+ reset
+
+ rescue Exception
+ handle_error
+ end
+
+ # Reset the connection and prepare for another request if keep-alive is
+ # requested.
+ # Else, closes the connection.
+ def reset
+ if @response && @response.keep_alive?
+ # Prepare the connection for another request if the client
+ # requested a persistent connection (keep-alive).
+ post_init
+ else
+ close_connection_after_writing
+ end
+
+ unbind
+ end
+
- # == Support methods
+ # == Response sending methods
# Send the HTTP response back to the client.
def send_response(response=@response)
@response = response
- # Keep connection alive if requested by the client
+ # Keep connection alive if requested by the client.
@response.keep_alive! if @request && @request.keep_alive?
+ # Prepare the response for sending.
+ @response.http_version = @request.http_version
@response.finish
if @response.file?
send_file
return
end
- @response.each do |chunk|
- print chunk if $DEBUG
- send_data chunk
- end
+ @response.each(&method(:send_chunk))
puts if $DEBUG
- reset
-
rescue Exception => e
# In case there's an error sending the response, we give up and just
# close the connection to prevent recursion and consuming too much
@@ -165,69 +220,59 @@ def send_response(response=@response)
close_connection
end
+ def send_response_and_reset(response=@response)
+ send_response(response)
+ reset
+ end
+
# Sending a file using EM streaming and HTTP 1.1 style chunked-encoding if
- # supported by client.
+ # supported by the client.
def send_file
# Use HTTP 1.1 style chunked-encoding to send the file if supported
if @request.support_encoding_chunked?
- @response.headers['Transfer-Encoding'] = 'chunked'
- send_data @response.head
+ @response.chunked_encoding!
+ send_chunk @response.head
deferrable = stream_file_data @response.filename, :http_chunks => true
else
- send_data @response.head
+ send_chunk @response.head
deferrable = stream_file_data @response.filename
end
deferrable.callback(&method(:reset))
deferrable.errback(&method(:reset))
if $DEBUG
- puts @response.head
puts "<Serving file #{@response.filename} with streaming ...>"
puts
end
end
- # Returns IP address of peer as a string.
- def socket_address
- if listener.unix?
- ""
- else
- Socket.unpack_sockaddr_in(get_peername)[1]
- end
- rescue Exception => e
- $stderr.puts "Can't get socket address: #{e}"
- ""
+ def send_chunk(data)
+ print data if $DEBUG
+ send_data data
end
- # Output the error to stderr and sends back a 500 error.
- def handle_error(e=$!)
- $stderr.puts "Error processing request: #{e}"
- $stderr.print "#{e}\n\t" + e.backtrace.join("\n\t") if $DEBUG
- send_response Response.error(500) # Internal Server Error
- end
+ private
+ # == Support methods
- # Reset the connection and prepare for another request if keep-alive is
- # requested.
- # Else, closes the connection.
- def reset
- if @response && @response.keep_alive?
- # Prepare the connection for another request if the client
- # requested a persistent connection (keep-alive).
- post_init
- else
- close_connection_after_writing
- end
-
- if @request
- @request.close
- @request = nil
+ # Returns IP address of peer as a string.
+ def socket_address
+ if listener.unix?
+ ""
+ else
+ Socket.unpack_sockaddr_in(get_peername)[1]
+ end
+ rescue Exception => e
+ $stderr.puts "Can't get socket address: #{e}"
+ ""
end
- if @response
- @response.close
- @response = nil
+
+ # Output the error to stderr and sends back a 500 error.
+ def handle_error(e=$!)
+ $stderr.puts "[ERROR] #{e}"
+ $stderr.puts "\t" + e.backtrace.join("\n\t") if $DEBUG
+ send_response_and_reset Response.error(500) # Internal Server Error
end
- end
end
end
end
@@ -0,0 +1,32 @@
+module Thin
+ module Protocols
+ class Http
+ # Same as Rack::Chunked::Body, but doesn't send the tail automaticaly.
+ class ChunkedBody
+ TERM = "\r\n"
+ TAIL = "0#{TERM}#{TERM}"
+
+ include Rack::Utils
+
+ def initialize(body)
+ @body = body
+ end
+
+ def each
+ term = TERM
+ @body.each do |chunk|
+ size = bytesize(chunk)
+ next if size == 0
+
+ chunk = chunk.dup.force_encoding(Encoding::BINARY) if chunk.respond_to?(:force_encoding)
+ yield [size.to_s(16), term, chunk, term].join
+ end
+ end
+
+ def close
+ @body.close if @body.respond_to?(:close)
+ end
+ end
+ end
+ end
+end
@@ -110,6 +110,10 @@ def http_version=(string)
@env[HTTP_VERSION] = string
@env[SERVER_PROTOCOL] = string
end
+
+ def http_version
+ @env[HTTP_VERSION]
+ end
def path=(path)
@env[PATH_INFO] = path
@@ -59,13 +59,16 @@ def to_s
attr_accessor :body
# Headers key-value hash
- attr_reader :headers
+ attr_reader :headers
+
+ attr_reader :http_version
def initialize(status=200, headers=nil, body=nil)
@headers = Headers.new
@status = status
@keep_alive = false
@body = body
+ @http_version = "HTTP/1.1"
self.headers = headers if headers
end
@@ -112,7 +115,7 @@ def finish
# containing the status code and response headers.
def head
status_message = Rack::Utils::HTTP_STATUS_CODES[@status.to_i]
- "HTTP/1.1 #{@status} #{status_message}\r\n#{@headers.to_s}\r\n"
+ "#{@http_version} #{@status} #{status_message}\r\n#{@headers.to_s}\r\n"
end
# Close any resource used by the response
@@ -156,14 +159,19 @@ def file?
def filename
@body.to_path
end
-
- def callback?
- @body.respond_to?(:callback) && @body.respond_to?(:errback)
+
+ def body_callback=(proc)
+ @body.callback(&proc) if @body.respond_to?(:callback)
+ @body.errback(&proc) if @body.respond_to?(:errback)
+ end
+
+ def chunked_encoding!
+ @headers['Transfer-Encoding'] = 'chunked'
end
- def callback=(proc)
- @body.callback(&proc)
- @body.errback(&proc)
+ def http_version=(string)
+ return unless string && string == "HTTP/1.1" || string == "HTTP/1.0"
+ @http_version = string
end
def self.error(status=500, message=Rack::Utils::HTTP_STATUS_CODES[status])
Oops, something went wrong.

2 comments on commit e93ff15

Contributor

atotic commented on e93ff15 Sep 28, 2012

Please add a method to turn off async chunked encoding. Chunked encoding can get in the way when:

  • you are sending lots of little chunks, encoding sends tons of little chunks
  • you are streaming back to the reverse proxy, which decides whether to chunk-encode or not
  • for debugging
Owner

macournoyer replied Oct 4, 2012

Noted. Will add an option to turn this off. Thx!

Please sign in to comment.