Skip to content

Commit

Permalink
No cramp and use celluloid workers to run callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
lifo committed Feb 5, 2015
1 parent 55c956b commit 7fef6b0
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 16 deletions.
2 changes: 0 additions & 2 deletions Gemfile
@@ -1,8 +1,6 @@
source 'http://rubygems.org'
gemspec

gem 'cramp', github: "lifo/cramp"

group :test do
gem 'rake'
gem 'puma'
Expand Down
1 change: 0 additions & 1 deletion action_cable.gemspec
Expand Up @@ -10,7 +10,6 @@ Gem::Specification.new do |s|
s.homepage = 'http://basecamp.com'

s.add_dependency('activesupport', '~> 4.2.0')
s.add_dependency('cramp', '~> 0.15.4')

s.files = Dir['README', 'lib/**/*']
s.has_rdoc = false
Expand Down
3 changes: 2 additions & 1 deletion lib/action_cable.rb
@@ -1,12 +1,13 @@
require 'cramp'
require 'active_support'
require 'active_support/json'
require 'active_support/concern'
require 'active_support/core_ext/hash/indifferent_access'
require 'active_support/callbacks'

module ActionCable
VERSION = '0.0.1'

autoload :Channel, 'action_cable/channel'
autoload :Worker, 'action_cable/worker'
autoload :Server, 'action_cable/server'
end
4 changes: 2 additions & 2 deletions lib/action_cable/channel/base.rb
Expand Up @@ -41,13 +41,13 @@ def receive(data)

def subscribe
self.class.on_subscribe_callbacks.each do |callback|
EM.next_tick { send(callback) }
send(callback)
end
end

def unsubscribe
self.class.on_unsubscribe_callbacks.each do |callback|
EM.next_tick { send(callback) }
send(callback)
end
end

Expand Down
43 changes: 35 additions & 8 deletions lib/action_cable/server.rb
@@ -1,23 +1,47 @@
require 'set'
require 'faye/websocket'
require 'celluloid'

module ActionCable
class Server < Cramp::Websocket
on_data :received_data
on_finish :cleanup_subscriptions
Celluloid::Actor[:worker_pool] = ActionCable::Worker.pool(size: 100)

module ActionCable
class Server
class_attribute :registered_channels
self.registered_channels = Set.new

class << self
def register_channels(*channel_classes)
self.registered_channels += channel_classes
end

def call(env)
new(env).process
end
end

def initialize(*)
@subscriptions = {}
def initialize(env)
@env = env
end

def process
if Faye::WebSocket.websocket?(@env)
@subscriptions = {}

@websocket = Faye::WebSocket.new(@env)

super
@websocket.on(:message) do |event|
message = event.data
Celluloid::Actor[:worker_pool].async.received_data(self, message) if message.is_a?(String)
end

@websocket.on(:close) do |event|
Celluloid::Actor[:worker_pool].async.cleanup_subscriptions(self)
end

@websocket.rack_response
else
invalid_request
end
end

def received_data(data)
Expand All @@ -40,7 +64,7 @@ def cleanup_subscriptions
end

def broadcast(data)
render data
@websocket.send data
end

private
Expand Down Expand Up @@ -71,5 +95,8 @@ def unsubscribe_channel(data)
@subscriptions.delete(id_key)
end

def invalid_request
[404, {'Content-Type' => 'text/plain'}, ['Page not found']]
end
end
end
19 changes: 19 additions & 0 deletions lib/action_cable/worker.rb
@@ -0,0 +1,19 @@
module ActionCable
class Worker
include ActiveSupport::Callbacks
include Celluloid

define_callbacks :work

def received_data(connection, data)
run_callbacks :work do
connection.received_data(data)
end
end

def cleanup_subscriptions(connection)
connection.cleanup_subscriptions
end

end
end
3 changes: 1 addition & 2 deletions test/test_helper.rb
Expand Up @@ -15,9 +15,8 @@
require 'logger'
logger = Logger.new(File.join(File.dirname(__FILE__), "tests.log"))
logger.level = Logger::DEBUG
Cramp.logger = logger

class ActionCableTest < Cramp::TestCase
class ActionCableTest < ActiveSupport::TestCase
PORT = 420420

setup :start_puma_server
Expand Down

0 comments on commit 7fef6b0

Please sign in to comment.