-
Notifications
You must be signed in to change notification settings - Fork 0
/
heartbeat_with_pong.rb
114 lines (98 loc) · 4.82 KB
/
heartbeat_with_pong.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
require "action_cable"
module MonkeyPatches
module ActionCable
module HeartbeatWithPong
class << self
def install!
Rails.logger.debug "Installing ActionCable client-side hearbeat moneky patch"
::ActionCable::INTERNAL[:message_types][:pong] = "pong"
::ActionCable::INTERNAL[:disconnect_reasons][:heartbeat_timeout] = "heartbeat_timeout"
::ActionCable::INTERNAL[:protocols] = ::ActionCable::INTERNAL[:protocols].dup.prepend("actioncable-v1.1-json").freeze
::ActionCable::Connection::Base.prepend(ConnectionExtensions)
::ActionCable::Connection::Subscriptions.prepend(SubscriptionsExtensions)
end
end
module ConnectionExtensions
# Patched to check if the client responded with a PONG message to the
# server's hearbeat PING.
#
# If the client supports PONGs and didn't respond with a PONG message
# to the server's PING within the expected timeframe, the client is
# assumed to have disconnected and the connection is closed.
def beat
return super unless expects_pong_response_to_heartbeat?
if connection_half_open?
logger.debug "🩹 PONG received too long ago. Closing connection (#{connection_identifier}) due to client-side heartbeat timeout"
subscriptions.unsubscribe_from_all
close(reason: ::ActionCable::INTERNAL[:disconnect_reasons][:heartbeat_timeout])
return server.remove_connection(self)
end
# Send a PING to the client, but with a more precise timestamp
logger.debug "🩹 Transmiting new PING message"
transmit type: ::ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_f
rescue Exception => e
rescue_with_handler(e)
logger.error "Beat error [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end
# Processes incoming PONG messages from the client
def register_client_pong!(data)
logger.debug "🩹 Invoked ActionCable::Connection::Base#register_client_pong! data: #{data}"
@last_pong_at = Time.now
latency = Time.now.to_f - data["message"].to_f
if latency.negative?
logger.info "🩹 We have a time traveler! Latency: #{latency}ms (#{connection_identifier})"
else
logger.info "🩹 Latency: #{latency}ms (#{connection_identifier})"
ActiveSupport::Notifications.instrument(
"connection.latency",
value: latency,
action: :timing,
connection_identifier: connection_identifier,
identifiers: identifiers.map { |id| [id, instance_variable_get("@#{id}")] }.to_h
)
end
rescue Exception => e
rescue_with_handler(e)
logger.error "#register_client_pong! error [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end
# Checks if the connection is expecting PONG messages from the client in
# response to a heartbeat PING
def expects_pong_response_to_heartbeat?
protocol&.start_with?("actioncable-v1.1-")
end
# Returns true if the connection is considered to be half-open
#
# A half-open connection means that the client has disconnected
# without closing the connection, so the server's OS keeps the connection
# open hoping that the client will continue the connection.
#
# To detect a half-open connection we check if the connection was
# started within, or if last heartbeat PONG response came within, a
# certain timeframe.
def connection_half_open?
last_message_timestamp = @last_pong_at || @started_at
last_message_timestamp&.before?(half_open_connection_treshold.ago)
end
# Returns the time after which a connection is considered to be half-open
def half_open_connection_treshold
::ActionCable::Server::Connections::BEAT_INTERVAL.seconds * 2
end
end
module SubscriptionsExtensions
# Patched to process incoming PONG messages from the client.
def execute_command(data)
logger.debug "🩹 Invoked patched ActionCable::Connection::Subscriptions#execute_command"
if data["type"] == ::ActionCable::INTERNAL[:message_types][:pong]
logger.debug "🩹 Received client PONG message"
return @connection.register_client_pong!(data)
end
logger.info "🩹 Invoked standard execute_command handler"
super(data)
rescue Exception => e
@connection.rescue_with_handler(e)
logger.error "Could not execute command from (#{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end
end
end
end
end