Skip to content

Commit

Permalink
Merge pull request #343 from janko-m/stream-response-body
Browse files Browse the repository at this point in the history
Stream the response to the client
  • Loading branch information
igrigorik committed Oct 4, 2017
2 parents 922b2d6 + 8c2b57f commit a920d03
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 30 deletions.
1 change: 1 addition & 0 deletions goliath.gemspec
Expand Up @@ -38,6 +38,7 @@ Gem::Specification.new do |s|
s.add_development_dependency 'amqp', '>=0.7.1'
s.add_development_dependency 'em-websocket-client'
s.add_development_dependency 'em-eventsource'
s.add_development_dependency 'rack', '< 2'

s.add_development_dependency 'tilt', '>=1.2.2'
s.add_development_dependency 'haml', '>=3.0.25'
Expand Down
13 changes: 8 additions & 5 deletions lib/goliath/connection.rb
Expand Up @@ -42,7 +42,8 @@ def post_init
end

@parser.on_body = proc do |data|
@requests.first.parse(data)
req = @requests.first
req.parse(data) unless req.env[:terminate_connection]
end

@parser.on_message_complete = proc do
Expand Down Expand Up @@ -84,15 +85,17 @@ def unbind
end

def terminate_request(keep_alive)
if req = @pending.shift
@current = req
@current.succeed
elsif @current
if @current
@current.close
@current = nil
end

close_connection_after_writing rescue nil if !keep_alive

if req = @pending.shift
@current = req
@current.succeed
end
end

def remote_address
Expand Down
12 changes: 3 additions & 9 deletions lib/goliath/rack/default_response_format.rb
Expand Up @@ -4,17 +4,11 @@ class DefaultResponseFormat
include Goliath::Rack::AsyncMiddleware

def post_process(env, status, headers, body)
return [status, headers, body] if body.respond_to?(:to_ary)

new_body = []
if body.respond_to?(:each)
body.each { |chunk| new_body << chunk }
if body.is_a?(String)
[status, headers, [body]]
else
new_body << body
[status, headers, body]
end
new_body.collect! { |item| item.to_s }

[status, headers, new_body.flatten]
end
end
end
Expand Down
44 changes: 28 additions & 16 deletions lib/goliath/request.rb
Expand Up @@ -197,29 +197,46 @@ def post_process(results)
callback do
begin
@response.status, @response.headers, @response.body = status, headers, body
@response.each { |chunk| @conn.send_data(chunk) }

elapsed_time = (Time.now.to_f - @env[:start_time]) * 1000
begin
Goliath::Request.log_block.call(@env, @response, elapsed_time)
rescue => err
# prevent an infinite loop if the block raised an error
@env[RACK_LOGGER].error("log block raised #{err}")
end

@conn.terminate_request(keep_alive)
stream_data(@response.each) do
terminate_request
end
rescue Exception => e
server_exception(e)
end
end

rescue Exception => e
server_exception(e)
end
end

private

# Writes each chunk of the response data in a new tick. This achieves
# streaming, because EventMachine flushes the sent data to the socket at
# the end of each tick.
def stream_data(chunks, &block)
@conn.send_data(chunks.next)
EM.next_tick { stream_data(chunks, &block) }
rescue StopIteration
block.call
rescue Exception => e
server_exception(e)
end

# Logs the response time and terminates the request.
def terminate_request
elapsed_time = (Time.now.to_f - @env[:start_time]) * 1000
begin
Goliath::Request.log_block.call(@env, @response, elapsed_time)
rescue => err
# prevent an infinite loop if the block raised an error
@env[RACK_LOGGER].error("log block raised #{err}")
end

@conn.terminate_request(keep_alive)
end

# Handles logging server exceptions
#
# @param e [Exception] The exception to log
Expand All @@ -242,11 +259,6 @@ def server_exception(e)
headers['Content-Length'] = body.bytesize.to_s
@env[:terminate_connection] = true
post_process([status, headers, body])

# Mark the request as complete to force a flush on the response.
# Note: #on_body and #response hooks may still fire if the data
# is already in the parser buffer.
succeed
end

# Used to determine if the connection should be kept open
Expand Down
2 changes: 2 additions & 0 deletions lib/goliath/response.rb
Expand Up @@ -79,6 +79,8 @@ def close
# @yield [String] The header line, headers and body content
# @return [Nil]
def each
return enum_for(__method__) unless block_given?

yield head
yield headers_output

Expand Down

0 comments on commit a920d03

Please sign in to comment.