Skip to content

Commit

Permalink
Use net_tcp_client gem to wrap socket and provide for better resiliency
Browse files Browse the repository at this point in the history
  • Loading branch information
cheerfulstoic committed May 16, 2018
1 parent 0a3de89 commit ff0d8a5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 119 deletions.
117 changes: 14 additions & 103 deletions lib/neo4j/core/cypher_session/adaptors/bolt.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
require 'neo4j/core/cypher_session/adaptors/bolt/pack_stream'
require 'neo4j/core/cypher_session/adaptors/bolt/chunk_writer_io'
require 'neo4j/core/cypher_session/responses/bolt'
require 'io/wait'
require 'socket'
require 'openssl'
require 'net/tcp_client'

# TODO: Work with `Query` objects?
module Neo4j
Expand All @@ -25,6 +23,10 @@ class Bolt < Base
def initialize(url, options = {})
self.url = url
@options = options
@net_tcp_client_options = {read_timeout: options[:read_timeout],
write_timeout: options[:write_timeout],
connect_timeout: options[:connect_timeout],
ssl: options.fetch(:ssl, {})}

open_socket
end
Expand All @@ -44,12 +46,6 @@ def connect
def query_set(transaction, queries, options = {})
setup_queries!(queries, transaction, skip_instrumentation: options[:skip_instrumentation])

if @socket.ready?
debug_remaining_buffer
fail "Making query, but expected there to be no buffer remaining!\n"\
"Queries: #{queries.map(&:cypher)}"
end

self.class.instrument_request(self) do
send_query_jobs(queries)

Expand All @@ -65,7 +61,7 @@ def version(session)
end

def connected?
!!@socket
!!@tcp_client
end

def indexes(session)
Expand Down Expand Up @@ -100,7 +96,7 @@ def self.transaction_class
end

def ssl?
@socket.is_a?(SecureSocketWrapper)
@tcp_client.socket.is_a?(OpenSSL::SSL::SSLSocket)
end

private
Expand All @@ -124,17 +120,6 @@ def handle_failure!(error_data)
fail CypherError.new_from(error_data['code'], error_data['message'])
end

def debug_remaining_buffer
logger.debug 'Remaining buffer:'

i = 0
while @socket.ready?
i += 1
logger.debug "Message set #{i}:"
flush_messages
end
end

def send_query_jobs(queries)
send_job do |job|
queries.each do |query|
Expand All @@ -153,20 +138,11 @@ def secure_connection?
end

def open_socket
ssl_options = @options.fetch(:ssl, {})
@socket = if ssl_options == false
SocketWrapper.new(host, port)
else
SecureSocketWrapper.new(host, port, ssl_options)
end
@tcp_client = Net::TCPClient.new(@net_tcp_client_options.merge(server: "#{host}:#{port}"))
rescue Errno::ECONNREFUSED => e
raise Neo4j::Core::CypherSession::ConnectionFailedError, e.message
end

# See below on how to upgrade a socket to a ssl_socket
# https://github.com/rocketjob/net_tcp_client/blob/master/lib/net/tcp_client/tcp_client.rb#L678
# maybe the verify step as well.

GOGOBOLT = "\x60\x60\xB0\x17"
def handshake
log_message :C, :handshake, nil
Expand All @@ -176,13 +152,12 @@ def handshake
agreed_version = recvmsg(4).unpack('l>*')[0]

if agreed_version.zero?
@socket.shutdown(Socket::SHUT_RDWR)
@socket.close
@tcp_client.close

fail "Couldn't agree on a version (Sent versions #{SUPPORTED_VERSIONS.inspect})"
end

logger.debug "Agreed to version: #{agreed_version}"
logger.debug { "Agreed to version: #{agreed_version}" }
end

def init
Expand Down Expand Up @@ -210,17 +185,13 @@ def send_job

def sendmsg(message)
log_message :C, message
@socket.send_message(message)
@tcp_client.write(message)
end

def recvmsg(size, timeout = timeout_option)
Timeout.timeout(timeout) do
@socket.receive_message(size) do |result|
log_message :S, result
end
def recvmsg(size)
@tcp_client.read(size) do |result|
log_message :S, result
end
rescue Timeout::Error
raise "Timed out waiting for #{size} bytes from Neo4j (after #{timeout} seconds)"
end

def flush_messages
Expand Down Expand Up @@ -260,10 +231,6 @@ def flush_response
[].tap { |r| while arg = unpacker.unpack_value!; r << arg; end }
end

def timeout_option
@options.fetch(:timeout) { 10 }
end

# Represents messages sent to or received from the server
class Message
TYPE_CODES = {
Expand Down Expand Up @@ -364,62 +331,6 @@ def to_s
@messages.join(' | ')
end
end

class SocketWrapper
extend Forwardable

def initialize(host, port)
@socket = TCPSocket.open(host, port)
end

def_delegators :@socket, :ready?, :shutdown, :close

def send_message(message)
@socket.send(message, 0)
end

def receive_message(size, &block)
@socket.recv(size).tap(&block)
end
end

class SecureSocketWrapper < SocketWrapper
def initialize(host, port, ssl_options)
super(host, port)

ssl_context = OpenSSL::SSL::SSLContext.new
ssl_context.set_params(ssl_params_from_options(ssl_options))

@ssl_socket = OpenSSL::SSL::SSLSocket.new(@socket, ssl_context).tap do |socket|
socket.sync_close = true
socket.connect
end
end

def_delegators :@ssl_socket, :close
# SSLSocket does not have a :shutdown method, so we'll use the
# underlying TCPSocket @socket. Not sure if we need to use ssl_socket.io
# instead when attempting to shutdown the socket

def ready?
@ssl_socket.state == 'SSLOK'
end

def send_message(message)
@ssl_socket.write(message)
end

def receive_message(size, &block)
@ssl_socket.read(size).tap(&block)
end

private

def ssl_params_from_options(ssl_options)
default_options = {verify_mode: OpenSSL::SSL::VERIFY_PEER, cert_store: nil}
ssl_options.is_a?(Hash) ? default_options.merge!(ssl_options) : default_options
end
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions neo4j-core.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ DESCRIPTION
s.add_dependency('json')
s.add_dependency('multi_json')
s.add_dependency('typhoeus', '>= 1.1.2')
s.add_dependency('net_tcp_client', '>= 2.0.1')

s.add_development_dependency('dryspec')
s.add_development_dependency('neo4j-rake_tasks', '>= 0.3.0')
Expand Down
16 changes: 0 additions & 16 deletions spec/neo4j/core/cypher_session/adaptors/bolt_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,4 @@

it_behaves_like 'Neo4j::Core::CypherSession::Adaptor'
end

describe 'timeouts' do
it 'defaults to 10' do
expect(Timeout).to receive(:timeout).with(10)
adaptor.send(:recvmsg, 1)
end

context 'when a timeout is configured' do
let(:extra_options) { {timeout: 20} }

it 'uses the configured timeout' do
expect(Timeout).to receive(:timeout).with(20)
adaptor.send(:recvmsg, 1)
end
end
end
end

0 comments on commit ff0d8a5

Please sign in to comment.