Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Revert "Merge pull request #22977 from rails/revert-22934-master"
This reverts commit d0393fc, reversing
changes made to 3b7ccad.
  • Loading branch information
dhh committed Jan 16, 2016
1 parent f5065ef commit 01c3200
Show file tree
Hide file tree
Showing 15 changed files with 46 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Expand Up @@ -31,8 +31,8 @@ PATH
specs:
actioncable (5.0.0.beta1)
actionpack (= 5.0.0.beta1)
celluloid (~> 0.17.2)
coffee-rails (~> 4.1.0)
concurrent-ruby (~> 1.0.0)
em-hiredis (~> 0.3.0)
faye-websocket (~> 0.10.0)
redis (~> 3.0)
Expand Down
2 changes: 1 addition & 1 deletion actioncable/README.md
Expand Up @@ -427,7 +427,7 @@ messages back and forth over the WebSocket cable connection. This dependency may
be alleviated in the future, but for the moment that's what it is. So be sure to have
Redis installed and running.

The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [celluloid](https://github.com/celluloid/celluloid).
The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby).


## Deployment
Expand Down
1 change: 0 additions & 1 deletion actioncable/actioncable.gemspec
Expand Up @@ -23,7 +23,6 @@ Gem::Specification.new do |s|
s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_dependency 'celluloid', '~> 0.17.2'
s.add_dependency 'em-hiredis', '~> 0.3.0'
s.add_dependency 'redis', '~> 3.0'

Expand Down
2 changes: 1 addition & 1 deletion actioncable/lib/action_cable/channel/periodic_timers.rb
Expand Up @@ -28,7 +28,7 @@ def active_periodic_timers
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
connection.worker_pool.async.run_periodic_timer(self, callback)
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion actioncable/lib/action_cable/connection/base.rb
Expand Up @@ -103,7 +103,7 @@ def close

# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments)
worker_pool.async_invoke(self, method, *arguments)
end

# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
Expand Down
3 changes: 0 additions & 3 deletions actioncable/lib/action_cable/process/logging.rb
@@ -1,10 +1,7 @@
require 'action_cable/server'
require 'eventmachine'
require 'celluloid'

EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n")
end

Celluloid.logger = ActionCable.server.logger
3 changes: 0 additions & 3 deletions actioncable/lib/action_cable/server/base.rb
@@ -1,6 +1,3 @@
# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10
require 'celluloid/current'

require 'em-hiredis'

module ActionCable
Expand Down
55 changes: 42 additions & 13 deletions actioncable/lib/action_cable/server/worker.rb
@@ -1,39 +1,68 @@
require 'celluloid'
require 'active_support/callbacks'
require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent'

module ActionCable
module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use.
class Worker
include ActiveSupport::Callbacks
include Celluloid

attr_reader :connection
thread_mattr_accessor :connection
define_callbacks :work
include ActiveRecordConnectionManagement

def initialize(max_size: 5)
@pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end

def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
end
end

def invoke(receiver, method, *args)
@connection = receiver
begin
self.connection = receiver

run_callbacks :work do
receiver.send method, *args
run_callbacks :work do
receiver.send method, *args
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")

receiver.handle_exception if receiver.respond_to?(:handle_exception)
ensure
self.connection = nil
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
end

receiver.handle_exception if receiver.respond_to?(:handle_exception)
def async_run_periodic_timer(channel, callback)
@pool.post do
run_periodic_timer(channel, callback)
end
end

def run_periodic_timer(channel, callback)
@connection = channel.connection
begin
self.connection = channel.connection

run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
ensure
self.connection = nil
end
end

private

def logger
ActionCable.server.logger
end
Expand Down
1 change: 0 additions & 1 deletion actioncable/test/connection/authorization_test.rb
Expand Up @@ -10,7 +10,6 @@ def connect
end

def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
Expand Down
1 change: 0 additions & 1 deletion actioncable/test/connection/base_test.rb
Expand Up @@ -14,7 +14,6 @@ def disconnect
end

def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
Expand Down
1 change: 0 additions & 1 deletion actioncable/test/connection/cross_site_forgery_test.rb
Expand Up @@ -6,7 +6,6 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase

class Connection < ActionCable::Connection::Base
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
Expand Down
1 change: 0 additions & 1 deletion actioncable/test/connection/string_identifier_test.rb
Expand Up @@ -10,7 +10,6 @@ def connect
end

def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
Expand Down
1 change: 0 additions & 1 deletion actioncable/test/connection/subscriptions_test.rb
Expand Up @@ -5,7 +5,6 @@ class Connection < ActionCable::Connection::Base
attr_reader :websocket

def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
Expand Down
5 changes: 0 additions & 5 deletions actioncable/test/test_helper.rb
Expand Up @@ -14,11 +14,6 @@
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }

$CELLULOID_DEBUG = false
$CELLULOID_TEST = false
require 'celluloid'
Celluloid.logger = Logger.new(StringIO.new)

require 'faye/websocket'
class << Faye::WebSocket
remove_method :ensure_reactor_running
Expand Down
2 changes: 0 additions & 2 deletions actioncable/test/worker_test.rb
Expand Up @@ -17,8 +17,6 @@ def connection
end

setup do
Celluloid.boot

@worker = ActionCable::Server::Worker.new
@receiver = Receiver.new
end
Expand Down

0 comments on commit 01c3200

Please sign in to comment.