Skip to content

Commit

Permalink
Client ensures subscribe command is confirmed. (#41581)
Browse files Browse the repository at this point in the history
A SubscriptionGuarantor maintains a set of pending subscriptions,
resending the subscribe command unless and until the subscription
is confirmed or rejected by the server or cancelled client-side.

A race condition in the ActionCable server - where an unsubscribe
is sent, followed rapidly by a subscribe, but handled in the reverse
order - necessitates this enhancement.  Indeed, the subscriptions created
and torn down by Turbo Streams amplifies the existence of this race
condition.
  • Loading branch information
spinosa committed Sep 26, 2021
1 parent c04cf69 commit 6d7c122
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 9 deletions.
12 changes: 12 additions & 0 deletions actioncable/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
* The Action Cable client now ensures successful channel subscriptions:

* The client maintains a set of pending subscriptions until either
the server confirms the subscription or the channel is torn down.
* Rectifies the race condition where an unsubscribe is rapidly followed
by a subscribe (on the same channel identifier) and the requests are
handled out of order by the ActionCable server, thereby ignoring the
subscribe command.

*Daniel Spinosa*


## Rails 7.0.0.alpha2 (September 15, 2021) ##

* No changes.
Expand Down
53 changes: 51 additions & 2 deletions actioncable/app/assets/javascripts/action_cable.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 52 additions & 3 deletions actioncable/app/assets/javascripts/actioncable.esm.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ Connection.prototype.events = {
return this.monitor.recordPing();

case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier);
return this.subscriptions.notify(identifier, "connected");

case message_types.rejection:
Expand Down Expand Up @@ -321,9 +322,47 @@ class Subscription {
}
}

class SubscriptionGuarantor {
constructor(subscriptions) {
this.subscriptions = subscriptions;
this.pendingSubscriptions = [];
}
guarantee(subscription) {
if (this.pendingSubscriptions.indexOf(subscription) == -1) {
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`);
this.pendingSubscriptions.push(subscription);
} else {
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`);
}
this.startGuaranteeing();
}
forget(subscription) {
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`);
this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription));
}
startGuaranteeing() {
this.stopGuaranteeing();
this.retrySubscribing();
}
stopGuaranteeing() {
clearTimeout(this.retryTimeout);
}
retrySubscribing() {
this.retryTimeout = setTimeout((() => {
if (this.subscriptions && typeof this.subscriptions.subscribe === "function") {
this.pendingSubscriptions.map((subscription => {
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`);
this.subscriptions.subscribe(subscription);
}));
}
}), 500);
}
}

class Subscriptions {
constructor(consumer) {
this.consumer = consumer;
this.guarantor = new SubscriptionGuarantor(this);
this.subscriptions = [];
}
create(channelName, mixin) {
Expand All @@ -338,7 +377,7 @@ class Subscriptions {
this.subscriptions.push(subscription);
this.consumer.ensureActiveConnection();
this.notify(subscription, "initialized");
this.sendCommand(subscription, "subscribe");
this.subscribe(subscription);
return subscription;
}
remove(subscription) {
Expand All @@ -356,14 +395,15 @@ class Subscriptions {
}));
}
forget(subscription) {
this.guarantor.forget(subscription);
this.subscriptions = this.subscriptions.filter((s => s !== subscription));
return subscription;
}
findAll(identifier) {
return this.subscriptions.filter((s => s.identifier === identifier));
}
reload() {
return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe")));
return this.subscriptions.map((subscription => this.subscribe(subscription)));
}
notifyAll(callbackName, ...args) {
return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args)));
Expand All @@ -377,6 +417,15 @@ class Subscriptions {
}
return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined));
}
subscribe(subscription) {
if (this.sendCommand(subscription, "subscribe")) {
this.guarantor.guarantee(subscription);
}
}
confirmSubscription(identifier) {
logger.log(`Subscription confirmed ${identifier}`);
this.findAll(identifier).map((subscription => this.guarantor.forget(subscription)));
}
sendCommand(subscription, command) {
const {identifier: identifier} = subscription;
return this.consumer.send({
Expand Down Expand Up @@ -439,4 +488,4 @@ function getConfig(name) {
}
}

export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger };
export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, SubscriptionGuarantor, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger };
53 changes: 51 additions & 2 deletions actioncable/app/assets/javascripts/actioncable.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@
return this.monitor.recordPing();

case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier);
return this.subscriptions.notify(identifier, "connected");

case message_types.rejection:
Expand Down Expand Up @@ -310,9 +311,46 @@
return this.consumer.subscriptions.remove(this);
}
}
class SubscriptionGuarantor {
constructor(subscriptions) {
this.subscriptions = subscriptions;
this.pendingSubscriptions = [];
}
guarantee(subscription) {
if (this.pendingSubscriptions.indexOf(subscription) == -1) {
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`);
this.pendingSubscriptions.push(subscription);
} else {
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`);
}
this.startGuaranteeing();
}
forget(subscription) {
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`);
this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription));
}
startGuaranteeing() {
this.stopGuaranteeing();
this.retrySubscribing();
}
stopGuaranteeing() {
clearTimeout(this.retryTimeout);
}
retrySubscribing() {
this.retryTimeout = setTimeout((() => {
if (this.subscriptions && typeof this.subscriptions.subscribe === "function") {
this.pendingSubscriptions.map((subscription => {
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`);
this.subscriptions.subscribe(subscription);
}));
}
}), 500);
}
}
class Subscriptions {
constructor(consumer) {
this.consumer = consumer;
this.guarantor = new SubscriptionGuarantor(this);
this.subscriptions = [];
}
create(channelName, mixin) {
Expand All @@ -327,7 +365,7 @@
this.subscriptions.push(subscription);
this.consumer.ensureActiveConnection();
this.notify(subscription, "initialized");
this.sendCommand(subscription, "subscribe");
this.subscribe(subscription);
return subscription;
}
remove(subscription) {
Expand All @@ -345,14 +383,15 @@
}));
}
forget(subscription) {
this.guarantor.forget(subscription);
this.subscriptions = this.subscriptions.filter((s => s !== subscription));
return subscription;
}
findAll(identifier) {
return this.subscriptions.filter((s => s.identifier === identifier));
}
reload() {
return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe")));
return this.subscriptions.map((subscription => this.subscribe(subscription)));
}
notifyAll(callbackName, ...args) {
return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args)));
Expand All @@ -366,6 +405,15 @@
}
return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined));
}
subscribe(subscription) {
if (this.sendCommand(subscription, "subscribe")) {
this.guarantor.guarantee(subscription);
}
}
confirmSubscription(identifier) {
logger.log(`Subscription confirmed ${identifier}`);
this.findAll(identifier).map((subscription => this.guarantor.forget(subscription)));
}
sendCommand(subscription, command) {
const {identifier: identifier} = subscription;
return this.consumer.send({
Expand Down Expand Up @@ -428,6 +476,7 @@
exports.Consumer = Consumer;
exports.INTERNAL = INTERNAL;
exports.Subscription = Subscription;
exports.SubscriptionGuarantor = SubscriptionGuarantor;
exports.Subscriptions = Subscriptions;
exports.adapters = adapters;
exports.createConsumer = createConsumer;
Expand Down
1 change: 1 addition & 0 deletions actioncable/app/javascript/action_cable/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ Connection.prototype.events = {
case message_types.ping:
return this.monitor.recordPing()
case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier)
return this.subscriptions.notify(identifier, "connected")
case message_types.rejection:
return this.subscriptions.reject(identifier)
Expand Down
2 changes: 2 additions & 0 deletions actioncable/app/javascript/action_cable/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Consumer, { createWebSocketURL } from "./consumer"
import INTERNAL from "./internal"
import Subscription from "./subscription"
import Subscriptions from "./subscriptions"
import SubscriptionGuarantor from "./subscription_guarantor"
import adapters from "./adapters"
import logger from "./logger"

Expand All @@ -14,6 +15,7 @@ export {
INTERNAL,
Subscription,
Subscriptions,
SubscriptionGuarantor,
adapters,
createWebSocketURL,
logger,
Expand Down
Loading

0 comments on commit 6d7c122

Please sign in to comment.