Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Replace custom WebSocket code with faye-websocket #41

Merged
merged 1 commit into from

2 participants

@jcoglan

Hey there. Thanks a lot for the Thin and Rainbows backends -- could never have got started on faye-websocket without them. Having now extracted them, and added a more complete set of parsers, I thought I'd try porting Cramp to my library, and it turned out to be easy.

Fortunately your transport code is already nicely separated from the application APIs. This change just means we can maintain the WebSocket part independently of application frameworks.

@lifo lifo merged commit 9a6662c into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 16, 2012
  1. @jcoglan
This page is out of date. Refresh to see the latest.
View
3  Gemfile.lock
@@ -4,6 +4,7 @@ PATH
cramp (0.15.1)
activesupport (~> 3.0.9)
eventmachine (~> 1.0.0.beta.3)
+ faye-websocket (~> 0.3.0)
rack (~> 1.3.2)
thor (~> 0.14.6)
@@ -39,6 +40,8 @@ GEM
erubis (2.7.0)
escape_utils (0.2.3)
eventmachine (1.0.0.beta.3)
+ faye-websocket (0.3.0)
+ eventmachine (>= 0.12.0)
http_router (0.10.0)
rack (>= 1.0.0)
url_mount (~> 0.2.1)
View
9 cramp.gemspec
@@ -12,10 +12,11 @@ Gem::Specification.new do |s|
# Not in a very distant future
# s.required_ruby_version = '>=1.9.2'
- s.add_dependency('activesupport', '~> 3.0.9')
- s.add_dependency('rack', '~> 1.3.2')
- s.add_dependency('eventmachine', '~> 1.0.0.beta.3')
- s.add_dependency('thor', '~> 0.14.6')
+ s.add_dependency('activesupport', '~> 3.0.9')
+ s.add_dependency('rack', '~> 1.3.2')
+ s.add_dependency('eventmachine', '~> 1.0.0.beta.3')
+ s.add_dependency('faye-websocket', '~> 0.3.0')
+ s.add_dependency('thor', '~> 0.14.6')
s.files = Dir['README', 'MIT-LICENSE', 'lib/**/*', 'bin/**/*']
s.has_rdoc = false
View
1  lib/cramp.rb
@@ -14,6 +14,7 @@
require 'active_support/buffered_logger'
require 'rack'
+require 'faye/websocket'
begin
require 'fiber'
View
25 lib/cramp/action.rb
@@ -5,7 +5,14 @@ class Action < Abstract
def initialize(env)
super
- @env['websocket.receive_callback'] = method(:_on_data_receive)
+
+ if Faye::WebSocket.websocket?(env)
+ @web_socket = Faye::WebSocket.new(env)
+ @web_socket.onmessage = lambda do |event|
+ message = event.data
+ _invoke_data_callbacks(message) if message.is_a?(String)
+ end
+ end
end
protected
@@ -74,13 +81,7 @@ def render_sse(data, options = {})
end
def render_websocket(body, *)
- if websockets_protocol_10?
- data = encode(protocol10_parser.send_text_frame(body), 'BINARY')
- else
- data = ["\x00", body, "\xFF"].map(&method(:encode)) * ''
- end
-
- @body.call(data)
+ @web_socket.send(body)
end
CHUNKED_TERM = "\r\n"
@@ -112,13 +113,5 @@ def finish
super
end
- def websockets_protocol_10?
- [7, 8, 9, 10].include?(@env['HTTP_SEC_WEBSOCKET_VERSION'].to_i)
- end
-
- def protocol10_parser
- @protocol10_parser ||= Protocol10FrameParser.new
- end
-
end
end
View
20 lib/cramp/callbacks.rb
@@ -62,26 +62,8 @@ def callback_wrapper
end
end
- def _on_data_receive(data)
- websockets_protocol_10? ? _receive_protocol10_data(data) : _receive_protocol76_data(data)
- end
-
protected
- def _receive_protocol10_data(data)
- protocol10_parser.data << data
-
- messages = @protocol10_parser.process_data
- messages.each do |type, content|
- _invoke_data_callbacks(content) if type == :text
- end
- end
-
- def _receive_protocol76_data(data)
- data = data.split(/\000([^\377]*)\377/).select{|d| !d.empty? }.collect{|d| d.gsub(/^\x00|\xff$/, '') }
- data.each {|message| _invoke_data_callbacks(message) }
- end
-
def _invoke_data_callbacks(message)
self.class.on_data_callbacks.each do |callback|
callback_wrapper { send(callback, message) }
@@ -106,4 +88,4 @@ def handle_exception(exception)
end
end
-end
+end
View
2  lib/cramp/websocket.rb
@@ -5,7 +5,7 @@ class Websocket < Action
class << self
def backend=(backend)
raise "Websocket backend #{backend} is unknown" unless [:thin, :rainbows].include?(backend.to_sym)
- require "cramp/websocket/#{backend}_backend.rb"
+ Faye::WebSocket.load_adapter(backend.to_s)
end
end
View
92 lib/cramp/websocket/extension.rb
@@ -1,92 +0,0 @@
-require 'base64'
-require 'digest/sha1'
-
-module Cramp
- module WebsocketExtension
- WEBSOCKET_RECEIVE_CALLBACK = 'websocket.receive_callback'.freeze
-
- def protocol_class
- @env['HTTP_SEC_WEBSOCKET_VERSION'] ? Protocol10 : Protocol76
- end
-
- def websocket?
- ['WebSocket', 'websocket'].include?(@env['HTTP_UPGRADE'])
- end
-
- def secure_websocket?
- if @env.has_key?('HTTP_X_FORWARDED_PROTO')
- @env['HTTP_X_FORWARDED_PROTO'] == 'https'
- else
- @env['HTTP_ORIGIN'] =~ /^https:/i
- end
- end
-
- def websocket_url
- scheme = secure_websocket? ? 'wss:' : 'ws:'
- @env['websocket.url'] = "#{ scheme }//#{ @env['HTTP_HOST'] }#{ @env['REQUEST_URI'] }"
- end
-
- class WebSocketHandler
- def initialize(env, websocket_url, body = nil)
- @env = env
- @websocket_url = websocket_url
- @body = body
- end
- end
-
- class Protocol10 < WebSocketHandler
- MAGIC_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11".freeze
-
- def handshake
- digest = Base64.encode64(Digest::SHA1.digest("#{@env['HTTP_SEC_WEBSOCKET_KEY']}#{MAGIC_GUID}")).chomp
-
- upgrade = "HTTP/1.1 101 Switching Protocols\r\n"
- upgrade << "Upgrade: websocket\r\n"
- upgrade << "Connection: Upgrade\r\n"
- upgrade << "Sec-WebSocket-Accept: #{digest}\r\n\r\n"
- upgrade
- end
- end
-
- class Protocol76 < WebSocketHandler
- def handshake
- key1 = @env['HTTP_SEC_WEBSOCKET_KEY1']
- value1 = number_from_key(key1) / spaces_in_key(key1)
-
- key2 = @env['HTTP_SEC_WEBSOCKET_KEY2']
- value2 = number_from_key(key2) / spaces_in_key(key2)
-
- hash = Digest::MD5.digest(big_endian(value1) +
- big_endian(value2) +
- @body)
-
- upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
- upgrade << "Upgrade: WebSocket\r\n"
- upgrade << "Connection: Upgrade\r\n"
- upgrade << "Sec-WebSocket-Origin: #{@env['HTTP_ORIGIN']}\r\n"
- upgrade << "Sec-WebSocket-Location: #{@websocket_url}\r\n\r\n"
- upgrade << hash
- upgrade
- end
-
- private
-
- def number_from_key(key)
- key.scan(/[0-9]/).join('').to_i(10)
- end
-
- def spaces_in_key(key)
- key.scan(/ /).size
- end
-
- def big_endian(number)
- string = ''
- [24,16,8,0].each do |offset|
- string << (number >> offset & 0xFF).chr
- end
- string
- end
- end
-
- end
-end
View
241 lib/cramp/websocket/protocol10_frame_parser.rb
@@ -1,241 +0,0 @@
-# encoding: BINARY
-
-# The MIT License - Copyright (c) 2009 Ilya Grigorik
-# Thank you https://github.com/igrigorik/em-websocket
-#
-# Copyright (c) 2009 Ilya Grigorik
-#
-# Permission is hereby granted, free of charge, to any person obtaining
-# a copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish,
-# distribute, sublicense, and/or sell copies of the Software, and to
-# permit persons to whom the Software is furnished to do so, subject to
-# the following conditions:
-#
-# The above copyright notice and this permission notice shall be
-# included in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
-# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
-# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-module Cramp
- class Protocol10FrameParser
- class WebSocketError < RuntimeError; end
-
- class MaskedString < String
- # Read a 4 bit XOR mask - further requested bytes will be unmasked
- def read_mask
- if respond_to?(:encoding) && encoding.name != "ASCII-8BIT"
- raise "MaskedString only operates on BINARY strings"
- end
- raise "Too short" if bytesize < 4 # TODO - change
- @masking_key = String.new(self[0..3])
- end
-
- # Removes the mask, behaves like a normal string again
- def unset_mask
- @masking_key = nil
- end
-
- def slice_mask
- slice!(0, 4)
- end
-
- def getbyte(index)
- if @masking_key
- masked_char = super
- masked_char ? masked_char ^ @masking_key.getbyte(index % 4) : nil
- else
- super
- end
- end
-
- def getbytes(start_index, count)
- data = ''
- count.times do |i|
- data << getbyte(start_index + i)
- end
- data
- end
- end
-
- attr_accessor :data
-
- def initialize
- @data = MaskedString.new
- @application_data_buffer = '' # Used for MORE frames
- end
-
- def process_data
- messages = []
- error = false
-
- while !error && @data.size >= 2
- pointer = 0
-
- fin = (@data.getbyte(pointer) & 0b10000000) == 0b10000000
- # Ignoring rsv1-3 for now
- opcode = @data.getbyte(pointer) & 0b00001111
- pointer += 1
-
- mask = (@data.getbyte(pointer) & 0b10000000) == 0b10000000
- length = @data.getbyte(pointer) & 0b01111111
- pointer += 1
-
- raise WebSocketError, 'Data from client must be masked' unless mask
-
- payload_length = case length
- when 127 # Length defined by 8 bytes
- # Check buffer size
- if @data.getbyte(pointer+8-1) == nil
- debug [:buffer_incomplete, @data]
- error = true
- next
- end
-
- # Only using the last 4 bytes for now, till I work out how to
- # unpack 8 bytes. I'm sure 4GB frames will do for now :)
- l = @data.getbytes(pointer+4, 4).unpack('N').first
- pointer += 8
- l
- when 126 # Length defined by 2 bytes
- # Check buffer size
- if @data.getbyte(pointer+2-1) == nil
- debug [:buffer_incomplete, @data]
- error = true
- next
- end
-
- l = @data.getbytes(pointer, 2).unpack('n').first
- pointer += 2
- l
- else
- length
- end
-
- # Compute the expected frame length
- frame_length = pointer + payload_length
- frame_length += 4 if mask
-
- # Check buffer size
- if @data.getbyte(frame_length - 1) == nil
- debug [:buffer_incomplete, @data]
- error = true
- next
- end
-
- # Remove frame header
- @data.slice!(0...pointer)
- pointer = 0
-
- # Read application data (unmasked if required)
- @data.read_mask if mask
- pointer += 4 if mask
- application_data = @data.getbytes(pointer, payload_length)
- pointer += payload_length
- @data.unset_mask if mask
-
- # Throw away data up to pointer
- @data.slice!(0...pointer)
-
- frame_type = opcode_to_type(opcode)
-
- if frame_type == :continuation && !@frame_type
- raise WebSocketError, 'Continuation frame not expected'
- end
-
- if !fin
- debug [:moreframe, frame_type, application_data]
- @application_data_buffer << application_data
- @frame_type = frame_type
- else
- # Message is complete
- if frame_type == :continuation
- @application_data_buffer << application_data
- messages << [@frame_type, @application_data_buffer]
- @application_data_buffer = ''
- @frame_type = nil
- else
- messages << [frame_type, application_data]
- end
- end
- end # end while
-
- messages
- end
-
- def send_frame(frame_type, application_data)
- debug [:sending_frame, frame_type, application_data]
-
- # Protocol10FrameParser doesn't have any knowledge of :closing in Cramp
- # if @state == :closing && data_frame?(frame_type)
- # raise WebSocketError, "Cannot send data frame since connection is closing"
- # end
-
- frame = ''
-
- opcode = type_to_opcode(frame_type)
- byte1 = opcode | 0b10000000 # fin bit set, rsv1-3 are 0
- frame << byte1
-
- length = application_data.size
- if length <= 125
- byte2 = length # since rsv4 is 0
- frame << byte2
- elsif length < 65536 # write 2 byte length
- frame << 126
- frame << [length].pack('n')
- else # write 8 byte length
- frame << 127
- frame << [length >> 32, length & 0xFFFFFFFF].pack("NN")
- end
-
- frame << application_data
- end
-
- def send_text_frame(data)
- send_frame(:text, data)
- end
-
- private
-
- FRAME_TYPES = {
- :continuation => 0,
- :text => 1,
- :binary => 2,
- :close => 8,
- :ping => 9,
- :pong => 10,
- }
- FRAME_TYPES_INVERSE = FRAME_TYPES.invert
- # Frames are either data frames or control frames
- DATA_FRAMES = [:text, :binary, :continuation]
-
- def type_to_opcode(frame_type)
- FRAME_TYPES[frame_type] || raise("Unknown frame type")
- end
-
- def opcode_to_type(opcode)
- FRAME_TYPES_INVERSE[opcode] || raise(DataError, "Unknown opcode")
- end
-
- def data_frame?(type)
- DATA_FRAMES.include?(type)
- end
-
- def debug(*data)
- if @debug
- require 'pp'
- pp data
- puts
- end
- end
-
- end
-end
View
42 lib/cramp/websocket/rainbows.rb
@@ -1,42 +0,0 @@
-class Cramp::Websocket::Rainbows < Rainbows::EventMachine::Client
- include Cramp::WebsocketExtension
-
- def receive_data(data)
- case @state
- when :websocket
- callback = @env[WEBSOCKET_RECEIVE_CALLBACK]
- callback.call(data) if callback
- else
- super
- end
- end
-
- def on_read(data)
- if @state == :headers
- @hp.add_parse(data) or return want_more
- @state = :body
- if 0 == @hp.content_length && !websocket?
- app_call NULL_IO # common case
- else # nil or len > 0
- prepare_request_body
- end
- elsif @state == :body && websocket? && @hp.body_eof?
- @state = :websocket
- @input.rewind
-
- write(protocol_class.new(@env, websocket_url, @buf).handshake)
- app_call NULL_IO
- else
- super
- end
- rescue => e
- handle_error(e)
- end
-
- def write_response(status, headers, body, alive)
- write_headers(status, headers, alive) unless websocket?
- write_body_each(body)
- ensure
- body.close if body.respond_to?(:close)
- end
-end
View
8 lib/cramp/websocket/rainbows_backend.rb
@@ -1,8 +0,0 @@
-# :enddoc:
-require "rainbows"
-class Cramp::Websocket
- # we use autoload since Rainbows::EventMachine::Client should only be
- # loaded in the worker proceses and we want to be preload_app-friendly
- autoload :Rainbows, "cramp/websocket/rainbows"
-end
-Rainbows::O[:em_client_class] = "Cramp::Websocket::Rainbows"
View
53 lib/cramp/websocket/thin_backend.rb
@@ -1,53 +0,0 @@
-require 'thin'
-
-silence_warnings { Thin::Server::DEFAULT_TIMEOUT = 0 }
-
-class Thin::Connection
- # Called when data is received from the client.
- def receive_data(data)
- trace { data }
-
- case @serving
- when :websocket
- callback = @request.env[Thin::Request::WEBSOCKET_RECEIVE_CALLBACK]
- callback.call(data) if callback
- else
- if @request.parse(data)
- if @request.websocket?
- @response.persistent!
- @response.websocket_upgrade_data = @request.websocket_upgrade_data
- @serving = :websocket
- end
-
- process
- end
- end
- rescue Thin::InvalidRequest => e
- log "!! Invalid request"
- log_error e
- close_connection
- end
-end
-
-class Thin::Request
- include Cramp::WebsocketExtension
-
- def websocket_upgrade_data
- protocol_class.new(@env, websocket_url, body.read).handshake
- end
-
-end
-
-class Thin::Response
- # Headers for sending Websocket upgrade
- attr_accessor :websocket_upgrade_data
-
- def each
- websocket_upgrade_data ? yield(websocket_upgrade_data) : yield(head)
- if @body.is_a?(String)
- yield @body
- else
- @body.each { |chunk| yield chunk }
- end
- end
-end
View
32 test/controller/websocket_test.rb
@@ -1,32 +0,0 @@
-require 'test_helper'
-
-class WebSocketTest < Cramp::TestCase
-
- class WebSocketAction < Cramp::Websocket
- cattr_accessor :logs
- self.logs = []
-
- on_data :write_logs
-
- def write_logs(data)
- self.logs << data
- end
- end
-
- def app
- WebSocketAction
- end
-
- def test_sending_data_over_websocket
- env = Rack::MockRequest.env_for('/')
- env['async.callback'] = proc {|resp| }
-
- EM.run do
- catch(:async) { app.call(env) }
- env['websocket.receive_callback'].call("\000Hello Websock!\377")
- EM.stop
- end
-
- assert_equal ['Hello Websock!'], WebSocketAction.logs
- end
-end
Something went wrong with that request. Please try again.