Skip to content

Commit

Permalink
Revert "Revert "Eliminate the EventMachine dependency""
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewd committed Jan 29, 2016
1 parent c8818df commit 74497ea
Show file tree
Hide file tree
Showing 27 changed files with 385 additions and 106 deletions.
7 changes: 2 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ PATH
actioncable (5.0.0.beta1.1)
actionpack (= 5.0.0.beta1.1)
coffee-rails (~> 4.1.0)
eventmachine (~> 1.0)
faye-websocket (~> 0.10.0)
nio4r (~> 1.2)
websocket-driver (~> 0.6.1)
actionmailer (5.0.0.beta1.1)
actionpack (= 5.0.0.beta1.1)
Expand Down Expand Up @@ -128,9 +127,6 @@ GEM
erubis (2.7.0)
eventmachine (1.0.9.1)
execjs (2.6.0)
faye-websocket (0.10.2)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
ffi (1.9.10)
ffi (1.9.10-x64-mingw32)
ffi (1.9.10-x86-mingw32)
Expand Down Expand Up @@ -166,6 +162,7 @@ GEM
mysql2 (0.4.2)
mysql2 (0.4.2-x64-mingw32)
mysql2 (0.4.2-x86-mingw32)
nio4r (1.2.0)
nokogiri (1.6.7.2)
mini_portile2 (~> 2.0.0.rc2)
nokogiri (1.6.7.2-x64-mingw32)
Expand Down
3 changes: 1 addition & 2 deletions actioncable/actioncable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version

s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'eventmachine', '~> 1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'nio4r', '~> 1.2'
s.add_dependency 'websocket-driver', '~> 0.6.1'

s.add_development_dependency 'em-hiredis', '~> 0.3.0'
Expand Down
4 changes: 2 additions & 2 deletions actioncable/lib/action_cable/channel/periodic_timers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def active_periodic_timers

def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end

def stop_periodic_timers
active_periodic_timers.each { |timer| timer.cancel }
active_periodic_timers.each { |timer| timer.shutdown }
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion actioncable/lib/action_cable/channel/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def stream_from(broadcasting, callback = nil)
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]

EM.next_tick do
Concurrent.global_io_executor.post do
pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
Expand Down
5 changes: 4 additions & 1 deletion actioncable/lib/action_cable/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ module Connection
eager_autoload do
autoload :Authorization
autoload :Base
autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
autoload :MessageBuffer
autoload :WebSocket
autoload :Stream
autoload :StreamEventLoop
autoload :Subscriptions
autoload :TaggedLoggerProxy
autoload :WebSocket
end
end
end
32 changes: 20 additions & 12 deletions actioncable/lib/action_cable/connection/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ class Base
include Authorization

attr_reader :server, :env, :subscriptions, :logger
delegate :worker_pool, :pubsub, to: :server
delegate :stream_event_loop, :worker_pool, :pubsub, to: :server

def initialize(server, env)
@server, @env = server, env

@logger = new_tagged_logger

@websocket = ActionCable::Connection::WebSocket.new(env)
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)

Expand All @@ -70,10 +70,6 @@ def process
logger.info started_request_message

if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }

respond_to_successful_request
else
respond_to_invalid_request
Expand Down Expand Up @@ -121,6 +117,22 @@ def beat
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end

def on_open # :nodoc:
send_async :handle_open
end

def on_message(message) # :nodoc:
message_buffer.append message
end

def on_error(message) # :nodoc:
# ignore
end

def on_close # :nodoc:
send_async :handle_close
end

protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
Expand All @@ -139,7 +151,7 @@ def cookies
attr_reader :message_buffer

private
def on_open
def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
Expand All @@ -150,11 +162,7 @@ def on_open
respond_to_invalid_request
end

def on_message(message)
message_buffer.append message
end

def on_close
def handle_close
logger.info finished_request_message

server.remove_connection(self)
Expand Down
152 changes: 152 additions & 0 deletions actioncable/lib/action_cable/connection/client_socket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
require 'websocket/driver'

module ActionCable
module Connection
#--
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
class ClientSocket # :nodoc:
def self.determine_url(env)
scheme = secure_request?(env) ? 'wss:' : 'ws:'
"#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
end

def self.secure_request?(env)
return true if env['HTTPS'] == 'on'
return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
return true if env['rack.url_scheme'] == 'https'

return false
end

CONNECTING = 0
OPEN = 1
CLOSING = 2
CLOSED = 3

attr_reader :env, :url

def initialize(env, event_target, stream_event_loop)
@env = env
@event_target = event_target
@stream_event_loop = stream_event_loop

@url = ClientSocket.determine_url(@env)

@driver = @driver_started = nil

@ready_state = CONNECTING

# The driver calls +env+, +url+, and +write+
@driver = ::WebSocket::Driver.rack(self)

@driver.on(:open) { |e| open }
@driver.on(:message) { |e| receive_message(e.data) }
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }

@stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)

if callback = @env['async.callback']
callback.call([101, {}, @stream])
end
end

def start_driver
return if @driver.nil? || @driver_started
@driver_started = true
@driver.start
end

def rack_response
start_driver
[ -1, {}, [] ]
end

def write(data)
@stream.write(data)
end

def transmit(message)
return false if @ready_state > OPEN
case message
when Numeric then @driver.text(message.to_s)
when String then @driver.text(message)
when Array then @driver.binary(message)
else false
end
end

def close(code = nil, reason = nil)
code ||= 1000
reason ||= ''

unless code == 1000 or (code >= 3000 and code <= 4999)
raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
"The code must be either 1000, or between 3000 and 4999. " +
"#{code} is neither."
end

@ready_state = CLOSING unless @ready_state == CLOSED
@driver.close(reason, code)
end

def parse(data)
@driver.parse(data)
end

def client_gone
finalize_close
end

def alive?
@ready_state == OPEN
end

private
def open
return unless @ready_state == CONNECTING
@ready_state = OPEN

@event_target.on_open
end

def receive_message(data)
return unless @ready_state == OPEN

@event_target.on_message(data)
end

def emit_error(message)
return if @ready_state >= CLOSING

@event_target.on_error(message)
end

def begin_close(reason, code)
return if @ready_state == CLOSED
@ready_state = CLOSING
@close_params = [reason, code]

if @stream
@stream.shutdown
else
finalize_close
end
end

def finalize_close
return if @ready_state == CLOSED
@ready_state = CLOSED

reason = @close_params ? @close_params[0] : ''
code = @close_params ? @close_params[1] : 1006

@event_target.on_close(code, reason)
end
end
end
end
4 changes: 2 additions & 2 deletions actioncable/lib/action_cable/connection/internal_channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ def subscribe_to_internal_channel
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]

EM.next_tick { pubsub.subscribe(internal_channel, callback) }
Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end

def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
@_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
@_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
end
end

Expand Down
59 changes: 59 additions & 0 deletions actioncable/lib/action_cable/connection/stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
module ActionCable
module Connection
#--
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
class Stream
def initialize(event_loop, socket)
@event_loop = event_loop
@socket_object = socket
@stream_send = socket.env['stream.send']

@rack_hijack_io = nil

hijack_rack_socket
end

def each(&callback)
@stream_send ||= callback
end

def close
shutdown
@socket_object.client_gone
end

def shutdown
clean_rack_hijack
end

def write(data)
return @rack_hijack_io.write(data) if @rack_hijack_io
return @stream_send.call(data) if @stream_send
rescue EOFError
@socket_object.client_gone
end

def receive(data)
@socket_object.parse(data)
end

private
def hijack_rack_socket
return unless @socket_object.env['rack.hijack']

@socket_object.env['rack.hijack'].call
@rack_hijack_io = @socket_object.env['rack.hijack_io']

@event_loop.attach(@rack_hijack_io, self)
end

def clean_rack_hijack
return unless @rack_hijack_io
@event_loop.detach(@rack_hijack_io, self)
@rack_hijack_io = nil
end
end
end
end
Loading

0 comments on commit 74497ea

Please sign in to comment.