Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cable message encoding #24233

Merged
merged 1 commit into from
Mar 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 10 additions & 5 deletions actioncable/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
* Add ActiveSupport::Notifications to ActionCable::Channel.
* Pubsub: automatic stream decoding.

*Matthew Wear*
stream_for @room, coder: ActiveSupport::JSON do |message|
# `message` is a Ruby hash here instead of a JSON string

* Allow channel identifiers with no backslahes/escaping to be accepted
by the subscription storer.
The `coder` must respond to `#decode`. Defaults to `coder: nil`
which skips decoding entirely.

*Jon Moss*
*Jeremy Daer*

* Add ActiveSupport::Notifications to ActionCable::Channel.

*Matthew Wear*

* Safely support autoloading and class unloading, by preventing concurrent
loads, and disconnecting all cables during reload.
Expand Down
6 changes: 3 additions & 3 deletions actioncable/lib/action_cable/channel/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def transmit(data, via: nil)

payload = { channel_class: self.class.name, data: data, via: via }
ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
connection.transmit identifier: @identifier, message: data
end
end

Expand Down Expand Up @@ -274,7 +274,7 @@ def transmit_subscription_confirmation
logger.info "#{self.class.name} is transmitting the subscription confirmation"

ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
@subscription_confirmation_sent = true
end
end
Expand All @@ -289,7 +289,7 @@ def transmit_subscription_rejection
logger.info "#{self.class.name} is transmitting the subscription rejection"

ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
end
end
end
Expand Down
32 changes: 21 additions & 11 deletions actioncable/lib/action_cable/channel/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ module Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
#
# stream_for @room, -> (encoded_message) do
# message = ActiveSupport::JSON.decode(encoded_message)
#
# stream_for @room, coder: ActiveSupport::JSON do |message|
# if message['originated_at'].present?
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
Expand All @@ -71,16 +69,23 @@ module Streams

# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
# Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
# Defaults to `coder: nil` which does no decoding, passes raw messages.
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!

callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
if handler = callback || block
handler = -> message { handler.(coder.decode(message)) } if coder
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the callback in a decoding handler if a coder was provided.

else
handler = default_stream_handler(broadcasting, coder: coder)
end

streams << [ broadcasting, handler ]

connection.server.event_loop.post do
pubsub.subscribe(broadcasting, callback, lambda do
pubsub.subscribe(broadcasting, handler, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
Expand All @@ -90,8 +95,11 @@ def stream_from(broadcasting, callback = nil)
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
def stream_for(model, callback = nil)
stream_from(broadcasting_for([ channel_name, model ]), callback)
#
# Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
# Defaults to `coder: nil` which does no decoding, passes raw messages.
def stream_for(model, callback = nil, coder: nil, &block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it still make sense to support stream_for @room, -> (encoded_message) {}, when we capture the block, going forward?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. I meant to track down why we designed for a lambda arg vs a block before deprecating+removing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end

# Unsubscribes all streams associated with this channel from the pubsub queue.
Expand All @@ -109,9 +117,11 @@ def streams
@_streams ||= []
end

def default_stream_callback(broadcasting)
def default_stream_handler(broadcasting, coder:)
coder ||= ActiveSupport::JSON

-> (message) do
transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
transmit coder.decode(message), via: "streamed from #{broadcasting}"
end
end
end
Expand Down
38 changes: 24 additions & 14 deletions actioncable/lib/action_cable/connection/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class Base
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
delegate :event_loop, :pubsub, to: :server

def initialize(server, env)
@server, @env = server, env
def initialize(server, env, coder: ActiveSupport::JSON)
@server, @env, @coder = server, env, coder

@worker_pool = server.worker_pool
@logger = new_tagged_logger
Expand All @@ -67,7 +67,7 @@ def initialize(server, env)

# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
def process # :nodoc:
def process #:nodoc:
logger.info started_request_message

if websocket.possible? && allow_request_origin?
Expand All @@ -77,20 +77,22 @@ def process # :nodoc:
end
end

# Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded.
# The data is routed to the proper channel that the connection has subscribed to.
def receive(data_in_json)
# Decodes WebSocket messages and dispatches them to subscribed channels.
# WebSocket message transfer encoding is always JSON.
def receive(websocket_message) #:nodoc:
send_async :dispatch_websocket_message, websocket_message
end

def dispatch_websocket_message(websocket_message) #:nodoc:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer expect callers to reach in and do async dispatch. Do it ourselves.

if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
subscriptions.execute_command decode(websocket_message)
else
logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
end
end

# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data) # :nodoc:
websocket.transmit data
def transmit(cable_message) # :nodoc:
websocket.transmit encode(cable_message)
end

# Close the WebSocket connection.
Expand All @@ -115,7 +117,7 @@ def statistics
end

def beat
transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i)
transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end

def on_open # :nodoc:
Expand Down Expand Up @@ -152,6 +154,14 @@ def cookies
attr_reader :message_buffer

private
def encode(cable_message)
@coder.encode cable_message
end

def decode(websocket_message)
@coder.decode websocket_message
end

def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
Expand All @@ -178,7 +188,7 @@ def send_welcome_message
# Send welcome message to the internal connection monitor channel.
# This ensures the connection monitor state is reset after a successful
# websocket connection.
transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome])
transmit type: ActionCable::INTERNAL[:message_types][:welcome]
end

def allow_request_origin?
Expand Down
4 changes: 1 addition & 3 deletions actioncable/lib/action_cable/connection/internal_channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def internal_channel

def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
callback = -> (message) { process_internal_message decode(message) }
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]

Expand All @@ -27,8 +27,6 @@ def unsubscribe_from_internal_channel
end

def process_internal_message(message)
message = ActiveSupport::JSON.decode(message)

case message['type']
when 'disconnect'
logger.info "Removing connection (#{connection_identifier})"
Expand Down
4 changes: 2 additions & 2 deletions actioncable/lib/action_cable/connection/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ def process!

protected
attr_reader :connection
attr_accessor :buffered_messages
attr_reader :buffered_messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💅: Could meld this together with the other attr_reader just above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did that, actually, but I put it back because all the other attrs are multiline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny, peeked in some files and the first two I clicked which used attrs, channels/base and subscriptions, did so on one line. But it doesn't matter 😁


private
def valid?(message)
message.is_a?(String)
end

def receive(message)
connection.send_async :receive, message
connection.receive message
end

def buffer(message)
Expand Down
25 changes: 6 additions & 19 deletions actioncable/lib/action_cable/connection/subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ def execute_command(data)
end

def add(data)
id_options = decode_hash(data['identifier'])
identifier = normalize_identifier(id_options)
id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access

subscription_klass = connection.server.channel_classes[id_options[:channel]]

if subscription_klass
subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options)
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
end

def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
remove_subscription subscriptions[normalize_identifier(data['identifier'])]
remove_subscription subscriptions[data['identifier']]
end

def remove_subscription(subscription)
Expand All @@ -46,7 +46,7 @@ def remove_subscription(subscription)
end

def perform_action(data)
find(data).perform_action(decode_hash(data['data']))
find(data).perform_action ActiveSupport::JSON.decode(data['data'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this reintroduce the issue of how we handle JSON that has no backslashes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! See the fourth bullet in the commit message for rationale.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - should we have a sample of what the new payload should look like?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New payload = ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the fact that we are taking in the identifier and data as "Hashes / JSON objects rather than as opaque JSON-encoded strings" -- this removal effectively breaks compatibility with allowing non-backslashed JSON in, and we should be explicit in communicating this to end users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. There's no new payload; it's the same format. Permitting non-JSON-encoded identifier and data values was enabled for a while on master, but it hasn't been released and none of our tooling uses it. I'll reference the original PRs and issues to close the loop.

end

def identifiers
Expand All @@ -63,21 +63,8 @@ def unsubscribe_from_all
private
delegate :logger, to: :connection

def normalize_identifier(identifier)
identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash)
identifier
end

# If `data` is a Hash, this means that the original JSON
# sent by the client had no backslashes in it, and does
# not need to be decoded again.
def decode_hash(data)
data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash)
data.with_indifferent_access
end

def find(data)
if subscription = subscriptions[normalize_identifier(data['identifier'])]
if subscription = subscriptions[data['identifier']]
subscription
else
raise "Unable to find subscription with identifier: #{data['identifier']}"
Expand Down
19 changes: 10 additions & 9 deletions actioncable/lib/action_cable/server/broadcasting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,28 @@ module Server
# new Notification data['title'], body: data['body']
module Broadcasting
# Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded.
def broadcast(broadcasting, message)
broadcaster_for(broadcasting).broadcast(message)
def broadcast(broadcasting, message, coder: ActiveSupport::JSON)
broadcaster_for(broadcasting, coder: coder).broadcast(message)
end

# Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that
# may need multiple spots to transmit to a specific broadcasting over and over.
def broadcaster_for(broadcasting)
Broadcaster.new(self, String(broadcasting))
def broadcaster_for(broadcasting, coder: ActiveSupport::JSON)
Broadcaster.new(self, String(broadcasting), coder: coder)
end

private
class Broadcaster
attr_reader :server, :broadcasting
attr_reader :server, :broadcasting, :coder

def initialize(server, broadcasting)
@server, @broadcasting = server, broadcasting
def initialize(server, broadcasting, coder:)
@server, @broadcasting, @coder = server, broadcasting, coder
end

def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}"
encoded = coder ? coder.encode(message) : message
server.pubsub.broadcast broadcasting, encoded
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions actioncable/test/channel/base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ def rm_rf
test "transmitting data" do
@channel.perform_action 'action' => :get_latest

expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" }
expected = { "identifier" => "{id: 1}", "message" => { "data" => "latest" }}
assert_equal expected, @connection.last_transmission
end

test "subscription confirmation" do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
assert_equal expected, @connection.last_transmission
end

Expand Down
2 changes: 1 addition & 1 deletion actioncable/test/channel/rejection_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def subscribed
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", { id: 1 }

expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "reject_subscription"
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
assert_equal expected, @connection.last_transmission
end

Expand Down