Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implement writing to a connection

  • Loading branch information...
commit 60b12646241d6823807615b347b434969dc6b762 1 parent 8f56d1b
@durran durran authored
View
23 lib/mongo/pool/connection.rb
@@ -91,14 +91,33 @@ def initialize(host, port, timeout = nil, options = {})
def read
end
- def write(message)
+ # Write messages to the connection in a single network call.
+ #
+ # @example Write the messages to the connection.
+ # connection.write([ insert ])
+ #
+ # @note All messages must be instances of Mongo::Protocol::Message.
+ #
+ # @param [ Array<Message> ] messages The messages to write.
+ # @param [ String ] buffer The buffer to write to.
+ #
+ # @return [ Integer ] The number of bytes written.
+ #
+ # @since 3.0.0
+ def write(messages, buffer = '')
+ messages.each do |message|
+ message.serialize(buffer)
+ end
+ ensure_connected do |socket|
+ socket.write(buffer)
+ end
end
private
attr_reader :socket, :ssl_opts
- def connected
+ def ensure_connected
connect! if socket.nil? || !socket.alive?
yield socket
end
View
71 lib/mongo/pool/socket/base.rb
@@ -60,46 +60,41 @@ def close
# Reads data from the socket instance.
#
- # @example
+ # @example Read from the socket.
# socket.read(4096)
#
- # @param length [Integer] The length of data to read.
+ # @param [ Integer ] length The length of data to read.
#
- # @return [Object] The data read from the socket.
+ # @return [ Object ] The data read from the socket.
+ #
+ # @since 3.0.0
def read(length)
handle_socket_error { @socket.read(length) }
end
# Writes data to the socket instance.
#
- # @example
+ # @example Write to the socket.
# socket.write(data)
#
- # @param *args [Object] The data to be written.
+ # @param [ Array<Object> ] args The data to be written.
#
- # @return [Integer] The length of bytes written to the socket.
+ # @return [ Integer ] The length of bytes written to the socket.
+ #
+ # @since 3.0.0
def write(*args)
- handle_socket_error { @socket.write(args) }
+ handle_socket_error { @socket.write(*args) }
end
private
- # Helper method to handle connection logic for tcp socket types and
- # all possible socket address families.
- #
- # @api private
- #
- # @example
- # handle_connect
- #
- # @return [Socket] The connected socket instance.
def handle_connect
- error = nil
- addr_info = ::Socket.getaddrinfo(@host, nil, AF_UNSPEC, SOCK_STREAM)
+ error = nil
+ addr_info = ::Socket.getaddrinfo(host, nil, AF_UNSPEC, SOCK_STREAM)
addr_info.each do |info|
begin
sock = create_socket(info[4])
- socket_addr = ::Socket.pack_sockaddr_in(@port, info[3])
+ socket_addr = ::Socket.pack_sockaddr_in(port, info[3])
sock.connect(socket_addr)
return sock
rescue IOError, SystemCallError => e
@@ -109,54 +104,24 @@ def handle_connect
raise error
end
- # Initializes a new socket instance with default options and encoding.
- #
- # @api private
- #
- # @example
- # create_socket(Socket::AF_INET)
- # create_socket(Socket::AF_INET6)
- # create_socket(Socket::AF_UNIX)
- #
- # @param family [Integer] The socket address family.
- #
- # @return [Socket] The newly created socket instance.
def create_socket(family)
sock = ::Socket.new(family, SOCK_STREAM, 0)
sock.set_encoding('binary') if sock.respond_to?(:set_encoding)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1) if family != AF_UNIX
-
- timeout_value = [@timeout, 0].pack('l_2')
+ timeout_value = [timeout, 0].pack('l_2')
sock.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout_value)
sock.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout_value)
-
sock
end
- # Utility method of handing socket exceptions, generating an
- # appropriate error message and raising them as a Mongo::Error
- # exception.
- #
- # @api private
- #
- # @example
- # handle_socket_error do
- # socket.write(payload)
- # end
- #
- # @return [Object] The yield result.
def handle_socket_error
yield
rescue Errno::ETIMEDOUT
- raise Mongo::SocketTimeoutError,
- 'Socket request timed out.'
+ raise Mongo::SocketTimeoutError, 'Socket request timed out.'
rescue IOError, SystemCallError
- raise Mongo::SocketError,
- 'A socket error occurred.'
+ raise Mongo::SocketError, 'A socket error occurred.'
rescue OpenSSL::SSL::SSLError
- raise Mongo::SocketError,
- 'SSL handshake failed. MongoDB ' +
- 'may not be configured with SSL support.'
+ raise Mongo::SocketError, 'SSL handshake failed. MongoDB may not be configured with SSL support.'
end
end
end
View
8 lib/mongo/protocol/message.rb
@@ -226,6 +226,14 @@ def self.deserialize_field(message, io, field)
field[:type].deserialize(io)
)
end
+
+
+ private
+
+ # @durran: temp for testing until read from connection is done.
+ def self.reset_request_id
+ @@request_id -= 1
+ end
end
end
end
View
64 spec/mongo/pool/connection_spec.rb
@@ -125,4 +125,68 @@
end
end
end
+
+ describe '#write' do
+
+ let(:connection) do
+ described_class.new('127.0.0.1', 27017, 5)
+ end
+
+ let(:documents) do
+ [{ 'name' => 'testing' }]
+ end
+
+ let(:message) do
+ Mongo::Protocol::Insert.new('mongo_test', 'users', documents)
+ end
+
+ let(:socket) do
+ connection.send(:socket)
+ end
+
+ context 'when providing a single message' do
+
+ let!(:serialized) do
+ message.serialize('')
+ end
+
+ before do
+ connection.connect!
+ Mongo::Protocol::Message.send(:reset_request_id)
+ expect(socket).to receive(:write).with(serialized)
+ end
+
+ it 'it writes the message to the socket' do
+ connection.write([ message ])
+ end
+ end
+
+ context 'when providing multiple messages' do
+
+ let(:selector) do
+ { :getlasterror => 1 }
+ end
+
+ let(:command) do
+ Mongo::Protocol::Query.new('mongo_test', '$cmd', selector, :limit => -1)
+ end
+
+ let(:buffer) { '' }
+
+ let!(:serialized) do
+ message.serialize(buffer)
+ command.serialize(buffer)
+ end
+
+ before do
+ connection.connect!
+ 2.times { Mongo::Protocol::Message.send(:reset_request_id) }
+ expect(socket).to receive(:write).with(serialized)
+ end
+
+ it 'it writes the message to the socket' do
+ connection.write([ message, command ])
+ end
+ end
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.