Permalink
Browse files

Merge pull request #89 from tarcieri/reel-websockets

Reel websocket support
  • Loading branch information...
2 parents bfd357e + 99028bb commit 27366405db53a0d1877836ca05f65444b13e5b01 @tarcieri tarcieri committed Jan 13, 2013
Showing with 93 additions and 16 deletions.
  1. +23 −16 lib/webmachine/adapters/reel.rb
  2. +1 −0 spec/spec_helper.rb
  3. +69 −0 spec/webmachine/adapters/reel_spec.rb
View
39 lib/webmachine/adapters/reel.rb
@@ -14,30 +14,37 @@ def run
:port => configuration.port,
:host => configuration.ip
}.merge(configuration.adapter_options)
+
server = ::Reel::Server.supervise(options[:host], options[:port], &method(:process))
trap("INT"){ server.terminate; exit 0 }
sleep
end
def process(connection)
while wreq = connection.request
- header = Webmachine::Headers[wreq.headers.dup]
- host_parts = header.fetch('Host').split(':')
- path_parts = wreq.url.split('?')
- requri = URI::HTTP.build({}.tap do |h|
- h[:host] = host_parts.first
- h[:port] = host_parts.last.to_i if host_parts.length == 2
- h[:path] = path_parts.first
- h[:query] = path_parts.last if path_parts.length == 2
- end)
- request = Webmachine::Request.new(wreq.method.to_s.upcase,
- requri,
- header,
- LazyRequestBody.new(wreq))
- response = Webmachine::Response.new
- @dispatcher.dispatch(request,response)
+ case wreq
+ when ::Reel::Request
+ header = Webmachine::Headers[wreq.headers.dup]
+ host_parts = header.fetch('Host').split(':')
+ path_parts = wreq.url.split('?')
+ requri = URI::HTTP.build({}.tap do |h|
+ h[:host] = host_parts.first
+ h[:port] = host_parts.last.to_i if host_parts.length == 2
+ h[:path] = path_parts.first
+ h[:query] = path_parts.last if path_parts.length == 2
+ end)
+ request = Webmachine::Request.new(wreq.method.to_s.upcase,
+ requri,
+ header,
+ LazyRequestBody.new(wreq))
+ response = Webmachine::Response.new
+ @dispatcher.dispatch(request,response)
- connection.respond ::Reel::Response.new(response.code, response.headers, response.body)
+ connection.respond ::Reel::Response.new(response.code, response.headers, response.body)
+ when ::Reel::WebSocket
+ handler = configuration.adapter_options[:websocket_handler]
+ handler.call(wreq) if handler
+ end
end
end
end
View
1 spec/spec_helper.rb
@@ -10,6 +10,7 @@
config.filter_run :focus => true
config.run_all_when_everything_filtered = true
config.treat_symbols_as_metadata_keys_with_true_values = true
+ config.formatter = :documentation if ENV['CI']
if defined?(::Java)
config.seed = Time.now.utc
else
View
69 spec/webmachine/adapters/reel_spec.rb
@@ -19,5 +19,74 @@
it 'implements #process' do
adapter.should respond_to(:process)
end
+
+ context 'websockets' do
+ let(:configuration) do
+ config = Webmachine::Configuration.default
+
+ # FIXME: It seems existing specs leave another server running
+ config.port += 1
+ config
+ end
+ let(:example_host) { "www.example.com" }
+ let(:example_path) { "/example"}
+ let(:example_url) { "ws://#{example_host}#{example_path}" }
+ let :handshake_headers do
+ {
+ "Host" => example_host,
+ "Upgrade" => "websocket",
+ "Connection" => "Upgrade",
+ "Sec-WebSocket-Key" => "dGhlIHNhbXBsZSBub25jZQ==",
+ "Origin" => "http://example.com",
+ "Sec-WebSocket-Protocol" => "chat, superchat",
+ "Sec-WebSocket-Version" => "13"
+ }
+ end
+ let(:client_message) { "Hi server!" }
+ let(:server_message) { "Hi client!" }
+
+ it 'supports websockets' do
+ configuration.adapter_options[:websocket_handler] = proc do |socket|
+ socket.read.should eq client_message
+ socket << server_message
+ end
+
+ reel_server(described_class.new(configuration, dispatcher)) do |client|
+ client << WebSocket::ClientHandshake.new(:get, example_url, handshake_headers).to_data
+
+ # Discard handshake response
+ # FIXME: hax
+ client.readpartial(4096)
+
+ client << WebSocket::Message.new(client_message).to_data
+ parser = WebSocket::Parser.new
+ parser.append client.readpartial(4096) until message = parser.next_message
+
+ message.should eq server_message
+ end
+ end
+ end
+
+ def reel_server(adptr = adapter)
+ thread = Thread.new { adptr.run }
+ begin
+ timeout(5) do
+ begin
+ sock = TCPSocket.new(adptr.configuration.ip, adptr.configuration.port)
+ begin
+ yield(sock)
+ ensure
+ sock.close
+ end
+ rescue Errno::ECONNREFUSED
+ Thread.pass
+ retry
+ end
+ end
+ ensure
+ # FIXME: graceful shutdown would be nice
+ thread.kill
+ end
+ end
end
end

0 comments on commit 2736640

Please sign in to comment.