Skip to content

Commit

Permalink
Compress large payloads in all Action Cable subscription adapters wit…
Browse files Browse the repository at this point in the history
…h ActiveSupport::Gzip
  • Loading branch information
brunoprietog committed Mar 24, 2024
1 parent 68b20b6 commit 195e0a0
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 5 deletions.
6 changes: 6 additions & 0 deletions actioncable/CHANGELOG.md
@@ -1,3 +1,9 @@
* Compress large payloads in all subscription adapters with ActiveSupport::Gzip

This will also help broadcast payloads on average 2 to 3 times larger on adapters such as PostgreSQL that has a size limit

*Bruno Prieto*

* Add two new assertion methods for ActionCable test cases: `assert_has_no_stream`
and `assert_has_no_stream_for`. These methods can be used to assert that a
stream has been stopped, e.g. via `stop_stream` or `stop_stream_for`. They complement
Expand Down
42 changes: 42 additions & 0 deletions actioncable/lib/action_cable/subscription_adapter/compressor.rb
@@ -0,0 +1,42 @@
# frozen_string_literal: true

require "active_support/gzip"
require "active_support/core_ext/numeric/bytes"
require "active_support/core_ext/string/access"

module ActionCable
module SubscriptionAdapter
class Compressor
SCHEMA_VERSION = "v1"
PREFIX_SIZE = "#{SCHEMA_VERSION}/./".size

def initialize(threshold: 1.kilobyte, compressor: ActiveSupport::Gzip)
@threshold = threshold
@compressor = compressor
end

def compress(data)
return "#{SCHEMA_VERSION}/0/#{data}" if data.size < @threshold

"#{SCHEMA_VERSION}/1/#{@compressor.compress(data)}"
end

def decompress(data)
return data unless valid_schema?(data)

return data.from(PREFIX_SIZE) unless compressed?(data)

@compressor.decompress(data.from(PREFIX_SIZE))
end

def compressed?(data)
data.start_with?("#{SCHEMA_VERSION}/1/")
end

private
def valid_schema?(data)
data.start_with?(/#{SCHEMA_VERSION}\/(0|1)\//)
end
end
end
end
13 changes: 10 additions & 3 deletions actioncable/lib/action_cable/subscription_adapter/postgresql.rb
Expand Up @@ -14,11 +14,14 @@ class PostgreSQL < Base # :nodoc:
def initialize(*)
super
@listener = nil
@compressor = Compressor.new
end

def broadcast(channel, payload)
payload = @compressor.compress(payload)
with_broadcast_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel_identifier(channel))}, '#{pg_conn.escape_string(payload)}'")
escaped_payload = @compressor.compressed?(payload) ? pg_conn.escape_bytea(payload) : pg_conn.escape_string(payload)
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel_identifier(channel))}, '#{escaped_payload}'")
end
end

Expand Down Expand Up @@ -63,7 +66,11 @@ def channel_identifier(channel)
end

def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
@listener || @server.mutex.synchronize do
@listener ||= Listener.new(self, @server.event_loop).tap do |listener|
listener.compress_with(@compressor)
end
end
end

def verify!(pg_conn)
Expand Down Expand Up @@ -105,7 +112,7 @@ def listen
end

pg_conn.wait_for_notify(1) do |chan, pid, message|
broadcast(chan, message)
broadcast(chan, pg_conn.unescape_bytea(message))
end
end
end
Expand Down
9 changes: 7 additions & 2 deletions actioncable/lib/action_cable/subscription_adapter/redis.rb
Expand Up @@ -23,10 +23,11 @@ def initialize(*)
super
@listener = nil
@redis_connection_for_broadcasts = nil
@compressor = Compressor.new
end

def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
redis_connection_for_broadcasts.publish(channel, @compressor.compress(payload))
end

def subscribe(channel, callback, success_callback = nil)
Expand All @@ -47,7 +48,11 @@ def redis_connection_for_subscriptions

private
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, config_options, @server.event_loop) }
@listener || @server.mutex.synchronize do
@listener ||= Listener.new(self, config_options, @server.event_loop).tap do |listener|
listener.compress_with(@compressor)
end
end
end

def redis_connection_for_broadcasts
Expand Down
Expand Up @@ -10,6 +10,10 @@ def initialize
@sync = Mutex.new
end

def compress_with(compressor)
@compressor = compressor
end

def add_subscriber(channel, subscriber, on_success)
@sync.synchronize do
new_channel = !@subscribers.key?(channel)
Expand All @@ -36,6 +40,7 @@ def remove_subscriber(channel, subscriber)
end

def broadcast(channel, message)
message = @compressor.decompress(message) if @compressor
list = @sync.synchronize do
return if !@subscribers.key?(channel)
@subscribers[channel].dup
Expand Down
10 changes: 10 additions & 0 deletions actioncable/test/subscription_adapter/common.rb
Expand Up @@ -128,4 +128,14 @@ def test_long_identifiers
end
end
end

def test_large_payload_broadcast
message = "hello world" * 10000

subscribe_as_queue("channel") do |queue|
@tx_adapter.broadcast("channel", message)

assert_equal message, queue.pop
end
end
end
35 changes: 35 additions & 0 deletions actioncable/test/subscription_adapter/compressor_test.rb
@@ -0,0 +1,35 @@
# frozen_string_literal: true

require "test_helper"
require "active_support/core_ext/numeric/bytes"
require "active_support/core_ext/string/access"

class ActionCable::SubscriptionAdapter::CompressorTest < ActionCable::TestCase
SCHEMA_VERSION = ActionCable::SubscriptionAdapter::Compressor::SCHEMA_VERSION

setup do
@compressor = ActionCable::SubscriptionAdapter::Compressor.new
@uncompressed_data = "hello" * 1000
@compressed_data = @compressor.compress(@uncompressed_data)
end

test "#compress returns uncompressed data if it's smaller than the threshold" do
assert @compressor.compress("hello").start_with?("#{SCHEMA_VERSION}/0/")
end

test "#compress returns compressed data if it's larger than the threshold" do
assert @compressor.compress(@uncompressed_data).start_with?("#{SCHEMA_VERSION}/1/")
end

test "#decompress returns uncompressed data if it's not compressed" do
assert_equal "hello", @compressor.decompress("#{SCHEMA_VERSION}/0/hello")
end

test "#decompress returns uncompressed data if it's compressed" do
assert_equal @uncompressed_data, @compressor.decompress(@compressed_data)
end

test "#decompress returns original data if schema is not valid" do
assert_equal "hello", @compressor.decompress("hello")
end
end
8 changes: 8 additions & 0 deletions guides/source/action_cable_overview.md
Expand Up @@ -780,7 +780,15 @@ The PostgreSQL adapter uses Active Record's connection pool, and thus the
application's `config/database.yml` database configuration, for its connection.
This may change in the future. [#27214](https://github.com/rails/rails/issues/27214)

```yaml
production:
adapter: postgresql
channel_prefix: appname_production
```

NOTE: PostgreSQL has a [8000 bytes limit](https://www.postgresql.org/docs/current/sql-notify.html) on `NOTIFY` (the command used under the hood for sending notifications) which might be a constraint when dealing with large payloads.
Nevertheless, Action Cable compresses large payloads to address this limitation,
which should be sufficient for most use cases.

### Allowed Request Origins

Expand Down

0 comments on commit 195e0a0

Please sign in to comment.