From 7fef6b01a3011438d48136e3f95bb9a823e87ec6 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Thu, 5 Feb 2015 16:35:11 +0530 Subject: [PATCH] No cramp and use celluloid workers to run callbacks --- Gemfile | 2 -- action_cable.gemspec | 1 - lib/action_cable.rb | 3 ++- lib/action_cable/channel/base.rb | 4 +-- lib/action_cable/server.rb | 43 ++++++++++++++++++++++++++------ lib/action_cable/worker.rb | 19 ++++++++++++++ test/test_helper.rb | 3 +-- 7 files changed, 59 insertions(+), 16 deletions(-) create mode 100644 lib/action_cable/worker.rb diff --git a/Gemfile b/Gemfile index 3ef2cb6af8c3b..7dfe51bf00030 100644 --- a/Gemfile +++ b/Gemfile @@ -1,8 +1,6 @@ source 'http://rubygems.org' gemspec -gem 'cramp', github: "lifo/cramp" - group :test do gem 'rake' gem 'puma' diff --git a/action_cable.gemspec b/action_cable.gemspec index 63ba751e9d4ab..f6fcc92fee7e7 100644 --- a/action_cable.gemspec +++ b/action_cable.gemspec @@ -10,7 +10,6 @@ Gem::Specification.new do |s| s.homepage = 'http://basecamp.com' s.add_dependency('activesupport', '~> 4.2.0') - s.add_dependency('cramp', '~> 0.15.4') s.files = Dir['README', 'lib/**/*'] s.has_rdoc = false diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 7df2a8c5eba36..993c260e4989e 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -1,12 +1,13 @@ -require 'cramp' require 'active_support' require 'active_support/json' require 'active_support/concern' require 'active_support/core_ext/hash/indifferent_access' +require 'active_support/callbacks' module ActionCable VERSION = '0.0.1' autoload :Channel, 'action_cable/channel' + autoload :Worker, 'action_cable/worker' autoload :Server, 'action_cable/server' end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index ae8822d2a2112..e311cc97e95fd 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -41,13 +41,13 @@ def receive(data) def subscribe self.class.on_subscribe_callbacks.each do |callback| - EM.next_tick { send(callback) } + send(callback) end end def unsubscribe self.class.on_unsubscribe_callbacks.each do |callback| - EM.next_tick { send(callback) } + send(callback) end end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index cdf8ea0f66057..ea22f0014ec11 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,10 +1,11 @@ require 'set' +require 'faye/websocket' +require 'celluloid' -module ActionCable - class Server < Cramp::Websocket - on_data :received_data - on_finish :cleanup_subscriptions +Celluloid::Actor[:worker_pool] = ActionCable::Worker.pool(size: 100) +module ActionCable + class Server class_attribute :registered_channels self.registered_channels = Set.new @@ -12,12 +13,35 @@ class << self def register_channels(*channel_classes) self.registered_channels += channel_classes end + + def call(env) + new(env).process + end end - def initialize(*) - @subscriptions = {} + def initialize(env) + @env = env + end + + def process + if Faye::WebSocket.websocket?(@env) + @subscriptions = {} + + @websocket = Faye::WebSocket.new(@env) - super + @websocket.on(:message) do |event| + message = event.data + Celluloid::Actor[:worker_pool].async.received_data(self, message) if message.is_a?(String) + end + + @websocket.on(:close) do |event| + Celluloid::Actor[:worker_pool].async.cleanup_subscriptions(self) + end + + @websocket.rack_response + else + invalid_request + end end def received_data(data) @@ -40,7 +64,7 @@ def cleanup_subscriptions end def broadcast(data) - render data + @websocket.send data end private @@ -71,5 +95,8 @@ def unsubscribe_channel(data) @subscriptions.delete(id_key) end + def invalid_request + [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + end end end diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb new file mode 100644 index 0000000000000..46b5f7edc0ea2 --- /dev/null +++ b/lib/action_cable/worker.rb @@ -0,0 +1,19 @@ +module ActionCable + class Worker + include ActiveSupport::Callbacks + include Celluloid + + define_callbacks :work + + def received_data(connection, data) + run_callbacks :work do + connection.received_data(data) + end + end + + def cleanup_subscriptions(connection) + connection.cleanup_subscriptions + end + + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index 5251e711b7740..10a482728195a 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -15,9 +15,8 @@ require 'logger' logger = Logger.new(File.join(File.dirname(__FILE__), "tests.log")) logger.level = Logger::DEBUG -Cramp.logger = logger -class ActionCableTest < Cramp::TestCase +class ActionCableTest < ActiveSupport::TestCase PORT = 420420 setup :start_puma_server