Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Allows batch_size and timeout to be received by Moped

  • Loading branch information...
commit 73f11163ad77872a1a9bd1020a55ec20ec324cc0 1 parent a0303a6
@gshiftlabs-dbuttineau gshiftlabs-dbuttineau authored durran committed
View
28 lib/moped/cursor.rb
@@ -41,12 +41,29 @@ def each
#
# @since 1.0.0
def get_more
- reply = @node.get_more @database, @collection, @cursor_id, @limit
+ reply = @node.get_more @database, @collection, @cursor_id, request_limit
@limit -= reply.count if limited?
@cursor_id = reply.cursor_id
reply.documents
end
+ # Determine the request limit for the query
+ #
+ # @example What is the cursor request_limit
+ # cursor.request_limit
+ #
+ # @return [ Integer ]
+ #
+ # @since 1.0.0
+
+ def request_limit
+ if limited?
+ @batch_size < @limit ? @batch_size : @limit
+ else
+ @batch_size
+ end
+ end
+
# Initialize the new cursor.
#
# @example Create the new cursor.
@@ -66,13 +83,14 @@ def initialize(session, query_operation)
@cursor_id = 0
@limit = query_operation.limit
@limited = @limit > 0
+ @batch_size = query_operation.batch_size || @limit
@options = {
request_id: query_operation.request_id,
flags: query_operation.flags,
limit: query_operation.limit,
skip: query_operation.skip,
- fields: query_operation.fields
+ fields: query_operation.fields,
}
end
@@ -111,9 +129,13 @@ def limited?
def load_docs
consistency = session.consistency
@options[:flags] |= [:slave_ok] if consistency == :eventual
+ @options[:flags] |= [:no_cursor_timeout] if @options[:no_timeout]
+
+ options = @options.clone
+ options[:limit] = @options[:batch_size] if @options[:batch_size]
reply, @node = session.context.with_node do |node|
- [ node.query(@database, @collection, @selector, @options), node ]
+ [ node.query(@database, @collection, @selector, options), node ]
end
@limit -= reply.count if limited?
View
10 lib/moped/protocol/query.rb
@@ -78,6 +78,12 @@ def op_code
# @return [String, Symbol] the collection to query
attr_reader :collection
+ attr_accessor :batch_size
+
+ def no_timeout=(enable)
+ @flags |= [:no_cursor_timeout] if enable
+ end
+
# Create a new query command.
#
# @example
@@ -105,10 +111,11 @@ def initialize(database, collection, selector, options = {})
@full_collection_name = "#{database}.#{collection}"
@selector = selector
@request_id = options[:request_id]
- @flags = options[:flags]
+ @flags = options[:flags] || []
@limit = options[:limit]
@skip = options[:skip]
@fields = options[:fields]
+ @batch_size = options[:batch_size]
end
def log_inspect
@@ -121,6 +128,7 @@ def log_inspect
fields << ["flags=%s", flags.inspect]
fields << ["limit=%s", limit.inspect]
fields << ["skip=%s", skip.inspect]
+ fields << ["batch_size=%s", batch_size.inspect]
fields << ["fields=%s", self.fields.inspect]
f, v = fields.transpose
f.join(" ") % v
View
28 lib/moped/query.rb
@@ -173,6 +173,34 @@ def limit(limit)
self
end
+ # Set the query's batch size.
+ #
+ # @example Set the batch size.
+ # db[:people].find.batch_size(20)
+ #
+ # @param [ Integer ] limit The number of documents per batch.
+ #
+ # @return [ Query ] self
+ #
+ # @since 1.0.0
+ def batch_size(batch_size)
+ operation.batch_size = batch_size
+ self
+ end
+
+ # Disable cursor timeout
+ #
+ # @example Disable cursor timeout.
+ # db[:people].find.no_timeout
+ #
+ # @return [ Query ] self
+ #
+ # @since 1.0.0
+ def no_timeout
+ operation.no_timeout = true
+ self
+ end
+
# Execute a $findAndModify on the query.
#
# @example Find and modify a document, returning the original.
View
71 spec/moped/cursor_spec.rb
@@ -0,0 +1,71 @@
+require "spec_helper"
+
+describe Moped::Cursor do
+
+ describe "#request_limit" do
+
+ let(:session) do
+ Moped::Session.new([ "localhost:27017" ], database: "moped_test")
+ end
+
+ context "when the query has a limit" do
+
+ let(:query) do
+ session[:users].find.limit(10)
+ end
+
+ let(:cursor) do
+ described_class.new(session, query.operation)
+ end
+
+ it "returns the query limit" do
+ cursor.request_limit.should eq(10)
+ end
+ end
+
+ context "when the query has no limit" do
+
+ let(:query) do
+ session[:users].find
+ end
+
+ let(:cursor) do
+ described_class.new(session, query.operation)
+ end
+
+ it "returns 0" do
+ cursor.request_limit.should eq(0)
+ end
+ end
+
+ context "when the query has a batch size" do
+
+ let(:query) do
+ session[:users].find.batch_size(10)
+ end
+
+ let(:cursor) do
+ described_class.new(session, query.operation)
+ end
+
+ it "returns the batch size" do
+ cursor.request_limit.should eq(10)
+ end
+ end
+
+ context "when the query has a limit and batch size" do
+
+ let(:query) do
+ session[:users].find.limit(1000).batch_size(100)
+ end
+
+ let(:cursor) do
+ described_class.new(session, query.operation)
+ end
+
+ it "returns the smaller value" do
+ cursor.request_limit.should eq(100)
+ end
+ end
+ end
+end
View
10 spec/moped/protocol/query_spec.rb
@@ -84,6 +84,16 @@
end
end
+ context "with a batch_size option" do
+ let(:query) do
+ described_class.new "db", "coll", {}, batch_size: 5
+ end
+
+ it "sets the batch_size" do
+ query.batch_size.should eq 5
+ end
+ end
+
context "with a fields option" do
let(:query) do
described_class.new "db", "coll", {}, fields: { a: 1 }
Please sign in to comment.
Something went wrong with that request. Please try again.