Permalink
Browse files

Servers can dispatch messages

  • Loading branch information...
1 parent 848e543 commit 1b417d1c6f0b8368dee4b319297571d82b02ad92 @durran durran committed Feb 13, 2014
View
@@ -114,6 +114,24 @@ def timeout
@timeout ||= options[:timeout] || TIMEOUT
end
+ # Yield the block to a connection, while handling checkin/checkout logic.
+ #
+ # @example Execute with a connection.
+ # pool.with_connection do |connection|
+ # connection.read
+ # end
+ #
+ # @return [ Object ] The result of the block.
+ #
+ # @since 3.0.0
+ def with_connection
+ begin
+ yield(checkout)
+ ensure
+ checkin
+ end
+ end
+
private
attr_reader :queue
@@ -42,6 +42,18 @@ def initialize(database, collection, number_to_return, cursor_id)
@cursor_id = cursor_id
end
+ # Get more messages require replies from the database.
+ #
+ # @example Does the message require a reply?
+ # message.replyable?
+ #
+ # @return [ true ] Always true for get more.
+ #
+ # @since 3.0.0
+ def replyable?
+ true
+ end
+
private
# The operation code required to specify a GetMore message.
@@ -47,6 +47,19 @@ class Message
# @return [Fixnum] The request id for this message
attr_reader :request_id
+ # The default for messages is not to require a reply after sending a
+ # message to the server.
+ #
+ # @example Does the message require a reply?
+ # message.replyable?
+ #
+ # @return [ false ] The default is to not require a reply.
+ #
+ # @since 3.0.0
+ def replyable?
+ false
+ end
+
# Serializes message into bytes that can be sent on the wire
#
# @param buffer [String] buffer where the message should be inserted
@@ -66,6 +66,18 @@ def initialize(database, collection, selector, options = {})
@flags = options[:flags] || []
end
+ # Query messages require replies from the database.
+ #
+ # @example Does the message require a reply?
+ # message.replyable?
+ #
+ # @return [ true ] Always true for queries.
+ #
+ # @since 3.0.0
+ def replyable?
+ true
+ end
+
private
# The operation code required to specify a Query message.
View
@@ -48,24 +48,6 @@ def ==(other)
address == other.address
end
- # Returns whether or not the server is alive - ie it is connected to and
- # healthy.
- #
- # @example Is the server alive?
- # server.alive?
- #
- # @return [ true, false ] If the server is alive and healthy.
- #
- # @since 3.0.0
- def alive?
- !!@alive
- end
-
- # @todo: Send the operation to the connection.
- def execute(operation)
-
- end
-
def initialize(address, options = {})
@address = Address.new(address)
@options = options
@@ -90,6 +72,27 @@ def refresh!
end
end
+ # Dispatch the provided messages to the server. If the last message
+ # requires a response a reply will be returned.
+ #
+ # @example Dispatch the messages.
+ # server.dispatch([ insert, command ])
+ #
+ # @note This method is named dispatch since 'send' is a core Ruby method on
+ # all objects.
+ #
+ # @param [ Array<Message> ] messages The messages to dispatch.
+ #
+ # @return [ Protocol::Reply ] The reply if needed.
+ #
+ # @since 3.0.0
+ def dispatch(messages)
+ with_connection do |connection|
+ connection.write(messages)
+ connection.read if messages.last.replyable?
+ end
+ end
+
# Get the refresh interval for the server. This will be defined via an option
# or will default to 5.
#
@@ -106,7 +109,7 @@ def refresh_interval
private
def pool
- # @pool ||= Pool.get(self)
+ @pool ||= Pool.get(self)
end
def refresh_command
@@ -117,5 +120,9 @@ def refresh_command
:limit => -1, :read => cluster.client.read_preference
)
end
+
+ def with_connection
+ pool.with_connection { |conn| yield(conn) }
+ end
end
end
@@ -109,6 +109,13 @@
end
end
+ describe '#replyable?' do
+
+ it 'returns false' do
+ expect(message).to_not be_replyable
+ end
+ end
+
describe '#serialize' do
let(:bytes) { message.serialize }
@@ -102,6 +102,13 @@
end
end
+ describe '#replyable?' do
+
+ it 'returns true' do
+ expect(message).to be_replyable
+ end
+ end
+
describe '#serialize' do
let(:bytes) { message.serialize }
@@ -111,6 +111,13 @@
end
end
+ describe '#replyable?' do
+
+ it 'returns false' do
+ expect(message).to_not be_replyable
+ end
+ end
+
describe '#serialize' do
let(:bytes) { message.serialize }
@@ -64,6 +64,13 @@
end
end
+ describe '#replyable?' do
+
+ it 'returns false' do
+ expect(message).to_not be_replyable
+ end
+ end
+
describe '#serialize' do
let(:bytes) { message.serialize }
@@ -133,6 +133,13 @@
end
end
+ describe '#replyable?' do
+
+ it 'returns true' do
+ expect(message).to be_replyable
+ end
+ end
+
describe '#serialize' do
let(:bytes) { message.serialize }
@@ -114,6 +114,13 @@
end
end
+ describe '#replyable?' do
+
+ it 'returns false' do
+ expect(message).to_not be_replyable
+ end
+ end
+
describe '#serialize' do
let(:bytes) { message.serialize }
View
@@ -2,49 +2,6 @@
describe Mongo::Server do
- describe '#alive?' do
-
- let(:address) do
- '127.0.0.1:27017'
- end
-
- let(:server) do
- described_class.new(address)
- end
-
- context 'when the server has been refreshed' do
-
- context 'when the server is alive' do
-
- before do
- server.instance_variable_set(:@alive, true)
- end
-
- it 'returns true' do
- expect(server).to be_alive
- end
- end
-
- context 'when the server is not alive' do
-
- before do
- server.instance_variable_set(:@alive, false)
- end
-
- it 'returns false' do
- expect(server).to_not be_alive
- end
- end
- end
-
- context 'when the server has not been refreshed' do
-
- it 'returns false' do
- expect(server).to_not be_alive
- end
- end
- end
-
describe '#initialize' do
let(:address) do
@@ -72,7 +29,7 @@
end
end
- describe '#refresh' do
+ describe '#refresh!' do
let(:address) do
'127.0.0.1:27017'
@@ -152,4 +109,71 @@
end
end
end
+
+ describe '#dispatch' do
+
+ let(:server) do
+ described_class.new('127.0.0.1:27017')
+ end
+
+ let(:documents) do
+ [{ 'name' => 'testing' }]
+ end
+
+ let(:insert) do
+ Mongo::Protocol::Insert.new('mongo_test', 'users', documents)
+ end
+
+ let(:query) do
+ Mongo::Protocol::Query.new('mongo_test', 'users', {})
+ end
+
+ let(:delete) do
+ Mongo::Protocol::Delete.new('mongo_test', 'users', {})
+ end
+
+ context 'when providing a single message' do
+
+ before do
+ server.dispatch([ insert ])
+ end
+
+ let(:reply) do
+ server.dispatch([ query ])
+ end
+
+ # @todo: Can remove this once we have more implemented with global hooks.
+ after do
+ server.dispatch([ delete ])
+ end
+
+ it 'it dispatchs the message to the socket' do
+ expect(reply.documents.first['name']).to eq('testing')
+ end
+ end
+
+ context 'when providing multiple messages' do
+
+ let(:selector) do
+ { :getlasterror => 1 }
+ end
+
+ let(:command) do
+ Mongo::Protocol::Query.new('mongo_test', '$cmd', selector, :limit => -1)
+ end
+
+ let(:reply) do
+ server.dispatch([ insert, command ])
+ end
+
+ # @todo: Can remove this once we have more implemented with global hooks.
+ after do
+ server.dispatch([ delete ])
+ end
+
+ it 'it dispatchs the message to the socket' do
+ expect(reply.documents.first['ok']).to eq(1.0)
+ end
+ end
+ end
end

0 comments on commit 1b417d1

Please sign in to comment.