Skip to content

Commit

Permalink
Merge pull request #22950 from maclover7/adapterize-storage-actioncable
Browse files Browse the repository at this point in the history
Adapterize storage for ActionCable
  • Loading branch information
matthewd committed Jan 20, 2016
2 parents 8f208e0 + ae31da2 commit 56a9341
Show file tree
Hide file tree
Showing 28 changed files with 347 additions and 74 deletions.
7 changes: 1 addition & 6 deletions Gemfile.lock
Expand Up @@ -32,9 +32,8 @@ PATH
actioncable (5.0.0.beta1)
actionpack (= 5.0.0.beta1)
coffee-rails (~> 4.1.0)
em-hiredis (~> 0.3.0)
eventmachine (~> 1.0)
faye-websocket (~> 0.10.0)
redis (~> 3.0)
websocket-driver (~> 0.6.1)
actionmailer (5.0.0.beta1)
actionpack (= 5.0.0.beta1)
Expand Down Expand Up @@ -140,9 +139,6 @@ GEM
delayed_job_active_record (4.1.0)
activerecord (>= 3.0, < 5)
delayed_job (>= 3.0, < 5)
em-hiredis (0.3.0)
eventmachine (~> 1.0)
hiredis (~> 0.5.0)
erubis (2.7.0)
eventmachine (1.0.9.1)
execjs (2.6.0)
Expand All @@ -154,7 +150,6 @@ GEM
ffi (1.9.10-x86-mingw32)
globalid (0.3.6)
activesupport (>= 4.1.0)
hiredis (0.5.2)
hitimes (1.2.3)
hitimes (1.2.3-x86-mingw32)
i18n (0.7.0)
Expand Down
8 changes: 5 additions & 3 deletions actioncable/actioncable.gemspec
Expand Up @@ -21,11 +21,13 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version

s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'eventmachine', '~> 1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_dependency 'em-hiredis', '~> 0.3.0'
s.add_dependency 'redis', '~> 3.0'

s.add_development_dependency 'puma'
s.add_development_dependency 'em-hiredis', '~> 0.3.0'
s.add_development_dependency 'mocha'
s.add_development_dependency 'pg'
s.add_development_dependency 'puma'
s.add_development_dependency 'redis', '~> 3.0'
end
1 change: 1 addition & 0 deletions actioncable/lib/action_cable.rb
Expand Up @@ -47,4 +47,5 @@ module ActionCable
autoload :Connection
autoload :Channel
autoload :RemoteConnections
autoload :SubscriptionAdapter
end
4 changes: 2 additions & 2 deletions actioncable/lib/action_cable/channel/base.rb
Expand Up @@ -133,8 +133,8 @@ def initialize(connection, identifier, params = {})
@identifier = identifier
@params = params

# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
@defer_subscription_confirmation = false

@reject_subscription = nil
Expand Down
6 changes: 3 additions & 3 deletions actioncable/lib/action_cable/channel/streams.rb
Expand Up @@ -76,10 +76,10 @@ def stream_from(broadcasting, callback = nil)
streams << [ broadcasting, callback ]

EM.next_tick do
pubsub.subscribe(broadcasting, &callback).callback do |reply|
pubsub.subscribe(broadcasting, callback, lambda do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end
end)
end
end

Expand All @@ -92,7 +92,7 @@ def stream_for(model, callback = nil)

def stop_all_streams
streams.each do |broadcasting, callback|
pubsub.unsubscribe_proc broadcasting, callback
pubsub.unsubscribe broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end
Expand Down
2 changes: 1 addition & 1 deletion actioncable/lib/action_cable/connection/base.rb
Expand Up @@ -60,7 +60,7 @@ def initialize(server, env)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)

@_internal_redis_subscriptions = nil
@_internal_subscriptions = nil
@started_at = Time.now
end

Expand Down
12 changes: 6 additions & 6 deletions actioncable/lib/action_cable/connection/internal_channel.rb
Expand Up @@ -5,24 +5,24 @@ module InternalChannel
extend ActiveSupport::Concern

private
def internal_redis_channel
def internal_channel
"action_cable/#{connection_identifier}"
end

def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
@_internal_redis_subscriptions ||= []
@_internal_redis_subscriptions << [ internal_redis_channel, callback ]
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]

EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
EM.next_tick { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end

def unsubscribe_from_internal_channel
if @_internal_redis_subscriptions.present?
@_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
if @_internal_subscriptions.present?
@_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
end
end

Expand Down
6 changes: 3 additions & 3 deletions actioncable/lib/action_cable/engine.rb
Expand Up @@ -24,11 +24,11 @@ class Railtie < Rails::Engine # :nodoc:
options = app.config.action_cable
options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?

app.paths.add "config/redis/cable", with: "config/redis/cable.yml"
app.paths.add "config/cable", with: "config/cable.yml"

ActiveSupport.on_load(:action_cable) do
if (redis_cable_path = Pathname.new(app.config.paths["config/redis/cable"].first)).exist?
self.redis = Rails.application.config_for(redis_cable_path).with_indifferent_access
if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist?
self.cable = Rails.application.config_for(config_path).with_indifferent_access
end

options.each { |k,v| send("#{k}=", v) }
Expand Down
2 changes: 1 addition & 1 deletion actioncable/lib/action_cable/remote_connections.rb
Expand Up @@ -39,7 +39,7 @@ def initialize(server, ids)

# Uses the internal channel to disconnect the connection.
def disconnect
server.broadcast internal_redis_channel, type: 'disconnect'
server.broadcast internal_channel, type: 'disconnect'
end

# Returns all the identifiers that were applied to this connection.
Expand Down
17 changes: 2 additions & 15 deletions actioncable/lib/action_cable/server/base.rb
@@ -1,5 +1,3 @@
require 'em-hiredis'

module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
Expand Down Expand Up @@ -47,20 +45,9 @@ def channel_classes
end
end

# The redis pubsub adapter used for all streams/broadcasting.
# Adapter used for all streams/broadcasting.
def pubsub
@pubsub ||= redis.pubsub
end

# The EventMachine Redis instance used by the pubsub adapter.
def redis
@redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
redis.on(:reconnect_failed) do
logger.info "[ActionCable] Redis reconnect failed."
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
# @connections.map &:close
end
end
@pubsub ||= config.pubsub_adapter.new(self)
end

# All the identifiers applied to the connection class associated with this server.
Expand Down
9 changes: 1 addition & 8 deletions actioncable/lib/action_cable/server/broadcasting.rb
@@ -1,5 +1,3 @@
require 'redis'

module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
Expand Down Expand Up @@ -31,11 +29,6 @@ def broadcaster_for(broadcasting)
Broadcaster.new(self, broadcasting)
end

# The redis instance used for broadcasting. Not intended for direct user use.
def broadcasting_redis
@broadcasting_redis ||= Redis.new(config.redis)
end

private
class Broadcaster
attr_reader :server, :broadcasting
Expand All @@ -46,7 +39,7 @@ def initialize(server, broadcasting)

def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
Expand Down
24 changes: 21 additions & 3 deletions actioncable/lib/action_cable/server/configuration.rb
Expand Up @@ -5,9 +5,9 @@ module Server
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
attr_accessor :redis, :channels_path
attr_accessor :channels_path
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :url
attr_accessor :cable, :url

def initialize
@log_tags = []
Expand All @@ -29,7 +29,25 @@ def channel_class_names
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
end
end

# Returns constant of subscription adapter specified in config/cable.yml.
# If the adapter cannot be found, this will default to the Redis adapter.
# Also makes sure proper dependencies are required.
def pubsub_adapter
adapter = (cable.fetch('adapter') { 'redis' })
path_to_adapter = "action_cable/subscription_adapter/#{adapter}"
begin
require path_to_adapter
rescue Gem::LoadError => e
raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)."
rescue LoadError => e
raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace
end

adapter = adapter.camelize
adapter = 'PostgreSQL' if adapter == 'Postgresql'
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
end
end
end

5 changes: 5 additions & 0 deletions actioncable/lib/action_cable/subscription_adapter.rb
@@ -0,0 +1,5 @@
module ActionCable
module SubscriptionAdapter
autoload :Base, 'action_cable/subscription_adapter/base'
end
end
24 changes: 24 additions & 0 deletions actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -0,0 +1,24 @@
module ActionCable
module SubscriptionAdapter
class Base
attr_reader :logger, :server

def initialize(server)
@server = server
@logger = @server.logger
end

def broadcast(channel, payload)
raise NotImplementedError
end

def subscribe(channel, message_callback, success_callback = nil)
raise NotImplementedError
end

def unsubscribe(channel, message_callback)
raise NotImplementedError
end
end
end
end
98 changes: 98 additions & 0 deletions actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -0,0 +1,98 @@
gem 'pg', '~> 0.18'
require 'pg'
require 'thread'

module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
end
end

def subscribe(channel, callback, success_callback = nil)
listener.subscribe_to(channel, callback, success_callback)
end

def unsubscribe(channel, callback)
listener.unsubscribe_from(channel, callback)
end

def with_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection

unless pg_conn.is_a?(PG::Connection)
raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
end

yield pg_conn
end
end

private
def listener
@listener ||= Listener.new(self)
end

class Listener
def initialize(adapter)
@adapter = adapter
@subscribers = Hash.new { |h,k| h[k] = [] }
@sync = Mutex.new
@queue = Queue.new

Thread.new do
Thread.current.abort_on_exception = true
listen
end
end

def listen
@adapter.with_connection do |pg_conn|
loop do
until @queue.empty?
action, channel, callback = @queue.pop(true)
escaped_channel = pg_conn.escape_identifier(channel)

if action == :listen
pg_conn.exec("LISTEN #{escaped_channel}")
::EM.next_tick(&callback) if callback
elsif action == :unlisten
pg_conn.exec("UNLISTEN #{escaped_channel}")
end
end

pg_conn.wait_for_notify(1) do |chan, pid, message|
@subscribers[chan].each do |callback|
::EM.next_tick { callback.call(message) }
end
end
end
end
end

def subscribe_to(channel, callback, success_callback)
@sync.synchronize do
if @subscribers[channel].empty?
@queue.push([:listen, channel, success_callback])
end

@subscribers[channel] << callback
end
end

def unsubscribe_from(channel, callback)
@sync.synchronize do
@subscribers[channel].delete(callback)

if @subscribers[channel].empty?
@queue.push([:unlisten, channel])
end
end
end
end
end
end
end

0 comments on commit 56a9341

Please sign in to comment.