Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Slack Socket level ping/pong #226

Merged
merged 20 commits into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### 0.13.2 (Next)

* Your contribution here.
* [#226](https://github.com/slack-ruby/slack-ruby-client/pull/226): Added periodic ping that reconnects on failure for `async-websocket` - [@RodneyU215](https://github.com/RodneyU215), [@dblock](https://github.com/dblock), [@ioquatix](https://github.com/ioquatix).

### 0.13.1 (2018/9/30)

Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ The following settings are supported.
setting | description
----------------|-----------------------------------------------------------------------------------------------------
token | Slack API token.
websocket_ping | The number of seconds that indicates how often the WebSocket should send ping frames, default is 30.
websocket_ping | How long the socket can be idle before sending a ping message to confirm it's still connected, default is 30.
websocket_proxy | Connect via proxy, include `:origin` and `:headers`.
store_class | Local store class name, default is an in-memory `Slack::RealTime::Stores::Store`.
start_method | Optional start method, either `:rtm_start` or `:rtm_connect`.
Expand All @@ -321,6 +321,17 @@ See a fully working example in [examples/hi_real_time](examples/hi_real_time/hi.

![](examples/hi_real_time/hi.gif)

#### Caveats

##### `websocket_ping`
This setting determines how long the socket can be idle before sending a ping message to confirm it's still connected.

To disable this feature; set `websocket_ping` to 0.

It's important to note that if a ping message was sent and no response was received within the amount of time specified in `websocket_ping`; the client will attempt to reestablish it's connection to the message server.

`websocket_ping` is currently only implemented for `async-websocket`. We hope to [implement this for EventMachine and Celluloid in the future.](https://github.com/slack-ruby/slack-ruby-client/issues/223)

### Connection Methods

The RealTime client uses either [rtm.start](https://api.slack.com/methods/rtm.start) or [rtm.connect](https://api.slack.com/methods/rtm.connect) to open a connection. The former retrieves a lot of team information while the latter only serves connection purposes and is preferred. You should let the library choose the right method for you based on the `store_class` used and override this behavior with `start_method` when necessary.
Expand Down
30 changes: 27 additions & 3 deletions lib/slack/real_time/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ def on(type, &block)
# Start RealTime client and block until it disconnects.
def start!(&block)
@callback = block if block_given?
@socket = build_socket
build_socket
@socket.start_sync(self)
end

# Start RealTime client and return immediately.
# The RealTime::Client will run in the background.
def start_async(&block)
@callback = block if block_given?
@socket = build_socket
build_socket
@socket.start_async(self)
end

Expand Down Expand Up @@ -102,6 +102,30 @@ def run_loop
end
end

def run_ping!
return if websocket_ping.nil? || websocket_ping < 1
begin
loop do
yield websocket_ping if block_given?
run_ping
end
rescue Slack::RealTime::Client::ClientNotStartedError
@socket.restart_async(self)
retry if started?
end
end

def run_ping
return if @socket.time_since_last_message < websocket_ping

if @socket.time_since_last_message > (websocket_ping * 2)
@socket.disconnect!
@socket.close
end

ping
end

protected

# @return [Slack::RealTime::Socket]
Expand All @@ -111,7 +135,7 @@ def build_socket
data = Slack::Messages::Message.new(start)
@url = data.url
@store = @store_class.new(data) if @store_class
socket_class.new(@url, socket_options)
@socket = socket_class.new(@url, socket_options)
end

def rtm_start_method
Expand Down
22 changes: 20 additions & 2 deletions lib/slack/real_time/concurrency/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,30 @@ class Socket < Slack::RealTime::Socket

def start_async(client)
Thread.new do
::Async::Reactor.run do
client.run_loop
::Async::Reactor.run do |task|
task.async do
client.run_loop
end
task.async do |subtask|
client.run_ping! do |delay|
subtask.sleep delay
end
end
end
end
end

def restart_async(client)
::Async::Reactor.run do
client.build_socket
client.run_loop
end
end

def current_time
Async::Clock.now
end

def connect!
super
run_loop
Expand Down
17 changes: 17 additions & 0 deletions lib/slack/real_time/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def initialize(url, options = {})
@options = options
@driver = nil
@logger = options.delete(:logger) || Slack::RealTime::Config.logger || Slack::Config.logger
@last_message_at = nil
end

def send_data(message)
Expand All @@ -30,6 +31,10 @@ def connect!
connect
logger.debug("#{self.class}##{__method__}") { driver.class }

driver.on :message do
@last_message_at = current_time
end

yield driver if block_given?
end

Expand All @@ -53,6 +58,18 @@ def start_async(_client)
raise NotImplementedError, "Expected #{self.class} to implement #{__method__}."
end

def restart_async(_client)
raise NotImplementedError, "Expected #{self.class} to implement #{__method__}."
end

def time_since_last_message
current_time - @last_message_at
end

def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def close
@driver = nil
end
Expand Down
40 changes: 40 additions & 0 deletions spec/slack/real_time/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,46 @@
end
end
end
describe '#start_async' do
let(:socket) { double(Slack::RealTime::Socket, connected?: true) }
before do
allow(Slack::RealTime::Socket).to receive(:new).with(url, ping: 30, logger: Slack::Logger.default).and_return(socket)
allow(socket).to receive(:connect!)
allow(socket).to receive(:start_async)
client.start_async
end
describe '#run_ping' do
it 'sends ping messages when the connection is idle' do
allow(socket).to receive(:time_since_last_message).and_return(30)
expect(socket).to receive(:send_data).with('{"type":"ping","id":1}')
client.run_ping
end
it 'disconnects the websocket when the connection is idle for too long' do
allow(socket).to receive(:time_since_last_message).and_return(75)
allow(socket).to receive(:connected?).and_return(false)

expect(socket).to receive(:disconnect!)
expect(socket).to receive(:close)
expect { client.run_ping }.to raise_error Slack::RealTime::Client::ClientNotStartedError
end
end
describe '#run_ping!' do
it 'returns if websocket_ping is less than 1' do
client.websocket_ping = 0
expect(client).to_not receive(:run_ping)
client.run_ping!
end
it 'reconnects the websocket if an exception is thrown' do
allow(socket).to receive(:time_since_last_message).and_return(75)
allow(socket).to receive(:disconnect!)
allow(socket).to receive(:close)
allow(socket).to receive(:connected?).and_return(false)

expect(socket).to receive(:restart_async)
client.run_ping!
end
end
end
end
context 'client with starter store', vcr: { cassette_name: 'web/rtm_connect' } do
let(:client) { Slack::RealTime::Client.new(store_class: Slack::RealTime::Stores::Starter) }
Expand Down
1 change: 1 addition & 0 deletions spec/slack/real_time/concurrency/eventmachine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
describe '#connect!' do
before do
allow(ws).to receive(:on).with(:close)
allow(ws).to receive(:on).with(:message)
end
it 'connects' do
allow(Faye::WebSocket::Client).to receive(:new).and_return(ws)
Expand Down