Skip to content

Commit

Permalink
Support streaming both requests and responses.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Apr 8, 2018
1 parent 80438c9 commit bbe30cd
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 56 deletions.
85 changes: 46 additions & 39 deletions lib/async/http/body.rb
Expand Up @@ -22,42 +22,6 @@

module Async
module HTTP
class BufferedBody
def initialize(body)
@chunks = []

body.each do |chunk|
@chunks << chunk
end
end

def each(&block)
@chunks.each(&block)
end

def read
@buffer ||= @chunks.join
end

def closed?
true
end

module Reader
def read
self.body ? self.body.read : nil
end

def close
return if self.body.nil? or self.body.closed?

unless self.body.is_a? BufferedBody
self.body = BufferedBody.new(self.body)
end
end
end
end

class Body < Async::Queue
def initialize
super
Expand All @@ -75,28 +39,69 @@ def each
while chunk = self.dequeue
yield chunk
end

@closed = true
end

def read
buffer = BinaryString.new
buffer = Async::IO::BinaryString.new

while chunk = self.dequeue
self.each do |chunk|
buffer << chunk
end

return buffer
end

alias join read

def write(chunk)
self.enqueue(chunk)
end

def close
@closed = true
self.enqueue(nil)
end
end

class BufferedBody
def initialize(body)
@chunks = []

body.each do |chunk|
@chunks << chunk
end
end

def each(&block)
@chunks.each(&block)
end

def read
@buffer ||= @chunks.join
end

alias join read

def closed?
true
end

module Reader
def read
self.body ? self.body.read : nil
end

def close
return if self.body.nil? or self.body.closed?

unless self.body.is_a? BufferedBody
self.body = BufferedBody.new(self.body)
end
end
end
end

class FixedBody
CHUNK_LENGTH = 1024*1024

Expand Down Expand Up @@ -127,6 +132,8 @@ def read

return buffer
end

alias join read
end
end
end
6 changes: 3 additions & 3 deletions lib/async/http/protocol/http11.rb
Expand Up @@ -173,8 +173,8 @@ def write_body(body, chunked = true)
buffer = String.new
body.each{|chunk| buffer << chunk}

@stream.write("Content-Length: #{chunk.bytesize}\r\n\r\n")
@stream.write(chunk)
@stream.write("Content-Length: #{buffer.bytesize}\r\n\r\n")
@stream.write(buffer)
end
end

Expand Down Expand Up @@ -208,7 +208,7 @@ def each
end

def read
buffer = BinaryString.new
buffer = Async::IO::BinaryString.new

self.each do |chunk|
buffer << chunk
Expand Down
21 changes: 10 additions & 11 deletions lib/async/http/protocol/http2.rb
Expand Up @@ -106,9 +106,6 @@ def receive_requests(task: Task.current, &block)
request.headers = {}
request.body = Body.new

# stream.on(:active) { } # fires when stream transitions to open state
# stream.on(:close) { } # stream is closed by client and server

stream.on(:headers) do |headers|
headers.each do |key, value|
if key == METHOD
Expand All @@ -124,12 +121,14 @@ def receive_requests(task: Task.current, &block)
end

stream.on(:data) do |chunk|
request.body.write(chunk)
request.body.write(chunk.to_s) unless chunk.empty?
end

stream.on(:half_close) do
response = yield request

request.body.close

# send response
headers = {STATUS => response[0].to_s}
headers.update(response[1])
Expand All @@ -140,7 +139,7 @@ def receive_requests(task: Task.current, &block)
stream.data(chunk, end_stream: false)
end

stream.close
stream.data("", end_stream: true)
end
end

Expand All @@ -165,8 +164,8 @@ def send_request(authority, method, path, headers = {}, body = nil)
body.each do |chunk|
stream.data(chunk, end_stream: false)
end

stream.close
stream.data("", end_stream: true)
end

finished = Async::Notification.new
Expand All @@ -193,16 +192,16 @@ def send_request(authority, method, path, headers = {}, body = nil)
end

stream.on(:data) do |chunk|
# Async.logger.debug(self) {"Stream data: #{chunk.inspect}"}
response.body.write(chunk.to_s)
Async.logger.debug(self) {"Stream data: #{chunk.inspect}"}
response.body.write(chunk.to_s) unless chunk.empty?
end

stream.on(:half_close) do
# Async.logger.debug(self) {"Stream half-closed."}
Async.logger.debug(self) {"Stream half-closed."}
end

stream.on(:close) do
# Async.logger.debug(self) {"Stream closed, sending signal."}
Async.logger.debug(self) {"Stream closed, sending signal."}
response.body.close
end

Expand Down
47 changes: 44 additions & 3 deletions spec/async/http/body_spec.rb
Expand Up @@ -29,7 +29,48 @@
require 'async/rspec/ssl'

RSpec.shared_examples Async::HTTP::Body do
it "client can get resource" do
it "can stream requests" do
client = Async::HTTP::Client.new(client_endpoint, described_class)

notification = Async::Notification.new

server = Async::HTTP::Server.new(server_endpoint, described_class) do |request, peer, address|
input = request.body
output = Async::HTTP::Body.new

Async::Task.current.async do |task|
input.each do |chunk|
output.write(chunk.reverse)
end

output.close
end

[200, {}, output]
end

server_task = reactor.async do
server.run
end

output = Async::HTTP::Body.new

reactor.async do |task|
output.write("Hello World!")
output.close
end

response = client.post("/", {}, output) do |response|
input = response.body
reversed = input.read
end

expect(response).to be_success
server_task.stop
client.close
end

it "can stream response" do
client = Async::HTTP::Client.new(client_endpoint, described_class)

notification = Async::Notification.new
Expand Down Expand Up @@ -70,7 +111,7 @@
end
end

RSpec.describe Async::HTTP::Protocol::HTTP1 do
RSpec.describe Async::HTTP::Protocol::HTTP1, timeout: 2 do
include_context Async::RSpec::Reactor

let(:endpoint) {Async::HTTP::URLEndpoint.parse('http://127.0.0.1:9296', reuse_port: true)}
Expand All @@ -80,7 +121,7 @@
it_should_behave_like Async::HTTP::Body
end

RSpec.describe Async::HTTP::Protocol::HTTPS do
RSpec.describe Async::HTTP::Protocol::HTTPS, timeout: 2 do
include_context Async::RSpec::Reactor
include_context Async::RSpec::SSL::ValidCertificate

Expand Down

0 comments on commit bbe30cd

Please sign in to comment.