Skip to content

Commit

Permalink
[6-stable] Add TCP keepalive in Twitter::Streaming::Connection (#856)
Browse files Browse the repository at this point in the history
* Add TCP keepalive in Twitter::Streaming::Connection

Without correct TCP keepalive setup, the Twitter streaming client is
prone to hang on long-lived connections. This is due to the nature of
the protocol: a single request is made, then the client simply waits for
data to read on a socket. Without keepalive, it may be indistinguishable
to the client whether the connection has failed or there is simply no
data to read.

These changes add the ability to set TCP keepalive providing the OS
supports it. They also provide a set of sensible defaults to the
keepalive for this implementation, while disabled by default.

* Add tests around keepalive setup
  • Loading branch information
prograhamer authored and sferik committed Nov 8, 2017
1 parent a83d6e0 commit d9cde55
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
26 changes: 24 additions & 2 deletions lib/twitter/streaming/connection.rb
Expand Up @@ -5,15 +5,23 @@
module Twitter
module Streaming
class Connection
attr_reader :tcp_socket_class, :ssl_socket_class, :keepalive

DEFAULT_KEEPALIVE_SETTINGS = {
idle_timeout: 60,
interval: 10,
count: 6,
}.freeze

def initialize(opts = {})
@tcp_socket_class = opts.fetch(:tcp_socket_class) { TCPSocket }
@ssl_socket_class = opts.fetch(:ssl_socket_class) { OpenSSL::SSL::SSLSocket }
@keepalive = DEFAULT_KEEPALIVE_SETTINGS.merge(opts.fetch(:keepalive) { {enabled: false} })
end
attr_reader :tcp_socket_class, :ssl_socket_class

def stream(request, response)
client_context = OpenSSL::SSL::SSLContext.new
client = @tcp_socket_class.new(Resolv.getaddress(request.socket_host), request.socket_port)
client = new_tcp_socket(request.socket_host, request.socket_port)
ssl_client = @ssl_socket_class.new(client, client_context)

ssl_client.connect
Expand All @@ -22,6 +30,20 @@ def stream(request, response)
response << body
end
end

private

def new_tcp_socket(host, port)
@tcp_socket_class.new(Resolv.getaddress(host), port).tap do |socket|
# Check that Socket::TCP_KEEPIDLE is present, so we know we can access these socket options
if @keepalive[:enabled] && defined?(Socket::TCP_KEEPIDLE)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, @keepalive[:idle_timeout])
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, @keepalive[:interval])
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, @keepalive[:count])
end
end
end
end
end
end
72 changes: 72 additions & 0 deletions spec/twitter/streaming/connection_spec.rb
Expand Up @@ -9,6 +9,10 @@
expect(connection.tcp_socket_class).to eq TCPSocket
expect(connection.ssl_socket_class).to eq OpenSSL::SSL::SSLSocket
end

it 'sets keepalive as disabled' do
expect(connection.keepalive).to include(enabled: false)
end
end

context 'custom socket classes provided in opts' do
Expand All @@ -24,5 +28,73 @@ class DummySSLSocket; end
expect(connection.ssl_socket_class).to eq DummySSLSocket
end
end

context 'custom keepalive settings' do
let(:keepalive_settings) do
{
enabled: true,
idle_timeout: 30,
interval: 15,
count: 10,
}
end

subject(:connection) { Twitter::Streaming::Connection.new(keepalive: keepalive_settings) }

it 'uses the custom keepalive settings' do
expect(connection.keepalive).to eq keepalive_settings
end
end
end

describe 'new_tcp_socket' do
subject(:connection) { Twitter::Streaming::Connection.new(keepalive: keepalive_settings) }

let(:socket) { instance_double(TCPSocket) }

before do
allow(Resolv).to receive(:getaddress).and_return('104.244.42.1')
allow(TCPSocket).to receive(:new).and_return(socket)
allow(socket).to receive(:setsockopt)
end

context 'with keepalive enabled' do
let(:keepalive_settings) do
{
enabled: true,
idle_timeout: 30,
interval: 15,
count: 10,
}
end

it 'sets the keepalive settings if able' do
stub_const('Socket::SO_KEEPALIVE', :SO_KEEPALIVE)
stub_const('Socket::TCP_KEEPIDLE', :TCP_KEEPIDLE)
stub_const('Socket::TCP_KEEPINTVL', :TCP_KEEPINTVL)
stub_const('Socket::TCP_KEEPCNT', :TCP_KEEPCNT)
stub_const('Socket::SOL_SOCKET', :SOL_SOCKET)
stub_const('Socket::IPPROTO_TCP', :IPPROTO_TCP)

connection.send(:new_tcp_socket, 'twitter.com', '80')

expect(socket).to have_received(:setsockopt).with(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
expect(socket).to have_received(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, 30)
expect(socket).to have_received(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, 15)
expect(socket).to have_received(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, 10)
end
end

context 'with keepalive disabled' do
let(:keepalive_settings) { {enabled: false} }

it 'does not attempt to set keepalive settings' do
stub_const('Socket::TCP_KEEPIDLE', :TCP_KEEPIDLE)

connection.send(:new_tcp_socket, 'twitter.com', '80')

expect(socket).not_to have_received(:setsockopt)
end
end
end
end

0 comments on commit d9cde55

Please sign in to comment.