Skip to content

Commit

Permalink
Add proper backwards and forwards compatibility for PONG responses
Browse files Browse the repository at this point in the history
  • Loading branch information
monorkin committed Nov 12, 2023
1 parent 34d9ae1 commit 9da4ce9
Show file tree
Hide file tree
Showing 11 changed files with 821 additions and 120 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@
!/app/assets/builds/.keep

/tags
/videos
117 changes: 5 additions & 112 deletions app/javascript/controllers/device_simulator_controller.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Controller } from "@hotwired/stimulus"
import { createConsumer, INTERNAL, adapters } from "@rails/actioncable"
import { createConsumer } from "@rails/actioncable"
import installClientSideHeartbeatMonkeyPath from "monkey_patches/action_cable/client_initiated_heartbeats"
import installHeartbeatWithPongMonkeyPath from "monkey_patches/action_cable/heartbeat_with_pong"

export default class extends Controller {
static values = {
Expand Down Expand Up @@ -105,116 +107,7 @@ export default class extends Controller {
}

monkeyPatchConsumer() {
console.log("🩹 MonkeyPatching the ActionCable consumer to add client-initiated heartbeats")

const now = () => new Date().getTime()
const indexOf = [].indexOf
const supportedProtocols = ["actioncable-v1.1-json", "actioncable-v1-json", "actioncable-unsupported"]

// Monkey patch Connection
const newConnectionOpen = function() {
console.log("🩹 Invoked monkey patched Connection#open")

if (this.isActive()) {
console.log(`Attempted to open WebSocket, but existing socket is ${this.getState()}`)
return false
} else {
console.log("🩹 Sending consumer sub-protocols first")
const socketProtocols = [...supportedProtocols]

console.log(`1 Opening WebSocket, current state is ${this.getState()}, subprotocols: ${socketProtocols}`)
if (this.webSocket) { this.uninstallEventHandlers() }
this.webSocket = new adapters.WebSocket(this.consumer.url, socketProtocols)
console.log(`2 Opening WebSocket, current state is ${this.getState()}, subprotocols: ${socketProtocols}`)
this.installEventHandlers()
this.monitor.start()
return true
}
}

this.consumer.connection.open = newConnectionOpen.bind(this.consumer.connection)

const newIsProtocolSupported = function() {
return indexOf.call(supportedProtocols, this.getProtocol()) >= 0
}

this.consumer.connection.isProtocolSupported = newIsProtocolSupported.bind(this.consumer.connection)

const originalMessageEvent = this.consumer.connection.events.message
const newMessageEvent = function(event) {
if (!this.isProtocolSupported()) { return }
const message = JSON.parse(event.data)
if (message.type === "pong") {
console.log("🩹 Received heartbeat pong")
if (message.timestamp) {
this.monitor.latency = Date.now() - message.timestamp
}
return this.monitor.recordPing()
}
else {
originalMessageEvent.apply(this, [event])
}
}
this.consumer.connection.events.message = newMessageEvent.bind(this.consumer.connection)

// Monkey patch ConnectionMonitor
this.consumer.connection.monitor.hearbeatInterval = 2

const shouldInitiateHeartbeat = function() {
console.log(`🩹 Checkign if protocol '${this.connection?.getProtocol()}' is supported`)
return this.connection?.getProtocol() === "actioncable-v1.1-json"
}
this.consumer.connection.monitor.shouldInitiateHeartbeat = shouldInitiateHeartbeat.bind(this.consumer.connection.monitor)

const beat = function() {
this.beatTimeout = setTimeout(() => {
if (this.shouldInitiateHeartbeat()) {
console.log("🩹 Sending heartbeat")
this.connection.send({ type: "ping", timestamp: Date.now() })
}
else {
console.log("🩹 Skipping heartbeat because protocol is not supported")
}
this.beat()
}
, this.hearbeatInterval * 1000)
}
this.consumer.connection.monitor.beat = beat.bind(this.consumer.connection.monitor)

const startBeating = function() {
console.log("🩹 Invoked monkey patched ConnectionMonitor#startBeating")
this.stopBeating()
this.beat()
}
this.consumer.connection.monitor.startBeating = startBeating.bind(this.consumer.connection.monitor)

const stopBeating = function() {
console.log("🩹 Invoked monkey patched ConnectionMonitor#stopBeating")
if (this.beatTimeout) clearTimeout(this.beatTimeout)
}
this.consumer.connection.monitor.stopBeating = stopBeating.bind(this.consumer.connection.monitor)

const start = function() {
if (!this.isRunning()) {
this.startedAt = now()
delete this.stoppedAt
this.startPolling()
this.startBeating()
addEventListener("visibilitychange", this.visibilityDidChange)
console.log(`ConnectionMonitor started. stale threshold = ${this.constructor.staleThreshold} s`)
}
}
this.consumer.connection.monitor.start = start.bind(this.consumer.connection.monitor)

const stop = function() {
if (this.isRunning()) {
this.stoppedAt = now()
this.stopPolling()
this.stopBeating()
removeEventListener("visibilitychange", this.visibilityDidChange)
console.log("ConnectionMonitor stopped")
}
}
this.consumer.connection.monitor.stop = stop.bind(this.consumer.connection.monitor)
// installClientSideHeartbeatMonkeyPath(this.consumer)
installHeartbeatWithPongMonkeyPath(this.consumer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { INTERNAL, adapters } from "@rails/actioncable"

export default function install(consumer) {
console.log("🩹 MonkeyPatching the ActionCable consumer to add client-initiated heartbeats")

const now = () => new Date().getTime()
const indexOf = [].indexOf
const supportedProtocols = ["actioncable-v1.1-json", "actioncable-v1-json", "actioncable-unsupported"]

// Monkey patch Connection
const newConnectionOpen = function() {
console.log("🩹 Invoked monkey patched Connection#open")

if (this.isActive()) {
console.log(`Attempted to open WebSocket, but existing socket is ${this.getState()}`)
return false
} else {
console.log("🩹 Sending consumer sub-protocols first")
const socketProtocols = [...supportedProtocols]

console.log(`1 Opening WebSocket, current state is ${this.getState()}, subprotocols: ${socketProtocols}`)
if (this.webSocket) { this.uninstallEventHandlers() }
this.webSocket = new adapters.WebSocket(consumer.url, socketProtocols)
console.log(`2 Opening WebSocket, current state is ${this.getState()}, subprotocols: ${socketProtocols}`)
this.installEventHandlers()
this.monitor.start()
return true
}
}

consumer.connection.open = newConnectionOpen.bind(consumer.connection)

const newIsProtocolSupported = function() {
return indexOf.call(supportedProtocols, this.getProtocol()) >= 0
}

consumer.connection.isProtocolSupported = newIsProtocolSupported.bind(consumer.connection)

const originalMessageEvent = consumer.connection.events.message
const newMessageEvent = function(event) {
if (!this.isProtocolSupported()) { return }
const message = JSON.parse(event.data)
if (message.type === "pong") {
console.log("🩹 Received heartbeat pong")
if (message.timestamp) {
this.monitor.latency = Date.now() - message.timestamp
}
return this.monitor.recordPing()
}
else {
originalMessageEvent.apply(this, [event])
}
}
consumer.connection.events.message = newMessageEvent.bind(consumer.connection)

// Monkey patch ConnectionMonitor
consumer.connection.monitor.hearbeatInterval = 2

const shouldInitiateHeartbeat = function() {
console.log(`🩹 Checkign if protocol '${this.connection?.getProtocol()}' is supported`)
return this.connection?.getProtocol() === "actioncable-v1.1-json"
}
consumer.connection.monitor.shouldInitiateHeartbeat = shouldInitiateHeartbeat.bind(consumer.connection.monitor)

const beat = function() {
this.beatTimeout = setTimeout(() => {
if (this.shouldInitiateHeartbeat()) {
console.log("🩹 Sending heartbeat")
this.connection.send({ type: "ping", timestamp: Date.now() })
}
else {
console.log("🩹 Skipping heartbeat because protocol is not supported")
}
this.beat()
}
, this.hearbeatInterval * 1000)
}
consumer.connection.monitor.beat = beat.bind(consumer.connection.monitor)

const startBeating = function() {
console.log("🩹 Invoked monkey patched ConnectionMonitor#startBeating")
this.stopBeating()
this.beat()
}
consumer.connection.monitor.startBeating = startBeating.bind(consumer.connection.monitor)

const stopBeating = function() {
console.log("🩹 Invoked monkey patched ConnectionMonitor#stopBeating")
if (this.beatTimeout) clearTimeout(this.beatTimeout)
}
consumer.connection.monitor.stopBeating = stopBeating.bind(consumer.connection.monitor)

const start = function() {
if (!this.isRunning()) {
this.startedAt = now()
delete this.stoppedAt
this.startPolling()
this.startBeating()
addEventListener("visibilitychange", this.visibilityDidChange)
console.log(`ConnectionMonitor started. stale threshold = ${this.constructor.staleThreshold} s`)
}
}
consumer.connection.monitor.start = start.bind(consumer.connection.monitor)

const stop = function() {
if (this.isRunning()) {
this.stoppedAt = now()
this.stopPolling()
this.stopBeating()
removeEventListener("visibilitychange", this.visibilityDidChange)
console.log("ConnectionMonitor stopped")
}
}
consumer.connection.monitor.stop = stop.bind(consumer.connection.monitor)
}
58 changes: 58 additions & 0 deletions app/javascript/monkey_patches/action_cable/heartbeat_with_pong.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { INTERNAL, adapters } from "@rails/actioncable"

export default function install(consumer) {
console.log("🩹 MonkeyPatching the ActionCable consumer to respond to PINGs with PONGs")

const now = () => new Date().getTime()
const indexOf = [].indexOf
const supportedProtocols = ["actioncable-v1.1-json", "actioncable-v1-json", "actioncable-unsupported"]

// Monkey patch Connection
const newConnectionOpen = function() {
console.log("🩹 Invoked monkey patched Connection#open")

if (this.isActive()) {
console.log(`Attempted to open WebSocket, but existing socket is ${this.getState()}`)
return false
} else {
console.log("🩹 Sending consumer sub-protocols first")
const socketProtocols = [...supportedProtocols]

console.log(`1 Opening WebSocket, current state is ${this.getState()}, subprotocols: ${socketProtocols}`)
if (this.webSocket) { this.uninstallEventHandlers() }
this.webSocket = new adapters.WebSocket(consumer.url, socketProtocols)
console.log(`2 Opening WebSocket, current state is ${this.getState()}, subprotocols: ${socketProtocols}`)
this.installEventHandlers()
this.monitor.start()
return true
}
}

consumer.connection.open = newConnectionOpen.bind(consumer.connection)

const newIsProtocolSupported = function() {
return indexOf.call(supportedProtocols, this.getProtocol()) >= 0
}

consumer.connection.isProtocolSupported = newIsProtocolSupported.bind(consumer.connection)

const originalMessageHandler = consumer.connection.events.message

consumer.connection.events.message = function(event) {
console.log("🩹 Invoked patched ActionCable.Connection.events.message")

const {type, message} = JSON.parse(event.data)
console.log(`🩹 Received message of type ${type}`)
switch (type) {
case "ping":
console.log("🩹 Received server-side PING")
console.log(`🩹 Sending back PONG with message: ${message}`)
this.send({ type: "pong", message: message })
console.log("🩹 Logging received PING")
return this.monitor.recordPing()
default:
console.log("🩹 Unpatched message. Passing it through to original handler...")
originalMessageHandler.apply(this, [event])
}
}
}

0 comments on commit 9da4ce9

Please sign in to comment.