/
cursor.rb
279 lines (241 loc) · 8.41 KB
/
cursor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# Copyright (C) 2008-2009 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'mongo/message'
require 'mongo/util/byte_buffer'
require 'mongo/util/bson'
module Mongo
# A cursor over query results. Returned objects are hashes.
class Cursor
include Enumerable
RESPONSE_HEADER_SIZE = 20
attr_reader :db, :collection, :query
# Create a new cursor.
#
# Should not be called directly by application developers.
def initialize(db, collection, query, admin=false)
@db, @collection, @query, @admin = db, collection, query, admin
@cache = []
@closed = false
@query_run = false
end
# Return the next object or nil if there are no more. Raises an error
# if necessary.
def next_object
refill_via_get_more if num_remaining == 0
o = @cache.shift
if o && o['$err']
err = o['$err']
# If the server has stopped being the master (e.g., it's one of a
# pair but it has died or something like that) then we close that
# connection. If the db has auto connect option and a pair of
# servers, next request will re-open on master server.
@db.close if err == "not master"
raise err
end
o
end
# Get the size of the results set for this query.
#
# Returns the number of objects in the results set for this query. Does
# not take limit and skip into account. Raises OperationFailure on a
# database error.
def count
command = OrderedHash["count", @collection.name,
"query", @query.selector,
"fields", @query.fields()]
response = @db.db_command(command)
return response['n'].to_i if response['ok'] == 1
return 0 if response['errmsg'] == "ns missing"
raise OperationFailure, "Count failed: #{response['errmsg']}"
end
# Sort this cursor's result
#
# Takes either a hash of field names as keys and 1/-1 as values; 1 ==
# ascending, -1 == descending, or array of field names (all assumed to be
# sorted in ascending order).
#
# Raises InvalidOperation if this cursor has already been used.
#
# This method overrides any sort order specified in the Collection#find
# method, and only the last sort applied has an effect
def sort(order)
check_modifiable
@query.order_by = order
self
end
# Limits the number of results to be returned by this cursor.
#
# Raises InvalidOperation if this cursor has already been used.
#
# This method overrides any limit specified in the Collection#find method,
# and only the last limit applied has an effect.
def limit(number_to_return)
check_modifiable
raise ArgumentError, "limit requires an integer" unless number_to_return.is_a? Integer
@query.number_to_return = number_to_return
self
end
# Skips the first +number_to_skip+ results of this cursor.
#
# Raises InvalidOperation if this cursor has already been used.
#
# This method overrides any offset specified in the Collection#find method,
# and only the last skip applied has an effect.
def skip(number_to_skip)
check_modifiable
raise ArgumentError, "skip requires an integer" unless number_to_skip.is_a? Integer
@query.number_to_skip = number_to_skip
self
end
# Iterate over each document in this cursor, yielding it to the given
# block.
#
# Iterating over an entire cursor will close it.
def each
num_returned = 0
while more? && (@query.number_to_return <= 0 || num_returned < @query.number_to_return)
yield next_object()
num_returned += 1
end
end
# Return all of the documents in this cursor as an array of hashes.
#
# Raises InvalidOperation if this cursor has already been used (including
# any previous calls to this method).
#
# Use of this method is discouraged - iterating over a cursor is much
# more efficient in most cases.
def to_a
raise InvalidOperation, "can't call Cursor#to_a on a used cursor" if @query_run
rows = []
num_returned = 0
while more? && (@query.number_to_return <= 0 || num_returned < @query.number_to_return)
rows << next_object()
num_returned += 1
end
rows
end
# Returns an explain plan record for this cursor.
def explain
limit = @query.number_to_return
@query.explain = true
@query.number_to_return = -limit.abs
c = Cursor.new(@db, @collection, @query)
explanation = c.next_object
c.close
@query.explain = false
@query.number_to_return = limit
explanation
end
# Close the cursor.
#
# Note: if a cursor is read until exhausted (read until OP_QUERY or
# OP_GETMORE returns zero for the cursor id), there is no need to
# close it by calling this method.
#
# Collection#find takes an optional block argument which can be used to
# ensure that your cursors get closed. See the documentation for
# Collection#find for details.
def close
@db.send_to_db(KillCursorsMessage.new(@cursor_id)) if @cursor_id
@cache = []
@cursor_id = 0
@closed = true
end
# Returns true if this cursor is closed, false otherwise.
def closed?; @closed; end
private
def read_all
read_message_header
read_response_header
read_objects_off_wire
end
def read_objects_off_wire
while doc = next_object_on_wire
@cache << doc
end
end
def read_message_header
MessageHeader.new.read_header(@db)
end
def read_response_header
header_buf = ByteBuffer.new
header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*"))
raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE
header_buf.rewind
@result_flags = header_buf.get_int
@cursor_id = header_buf.get_long
@starting_from = header_buf.get_int
@n_returned = header_buf.get_int
@n_remaining = @n_returned
end
def num_remaining
refill_via_get_more if @cache.length == 0
@cache.length
end
# Internal method, not for general use. Return +true+ if there are
# more records to retrieve. We do not check @query.number_to_return;
# #each is responsible for doing that.
def more?
num_remaining > 0
end
def next_object_on_wire
# if @n_remaining is 0 but we have a non-zero cursor, there are more
# to fetch, so do a GetMore operation, but don't do it here - do it
# when someone pulls an object out of the cache and it's empty
return nil if @n_remaining == 0
object_from_stream
end
def refill_via_get_more
if send_query_if_needed or @cursor_id == 0
return
end
@db._synchronize {
@db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id))
read_all
}
end
def object_from_stream
buf = ByteBuffer.new
buf.put_array(@db.receive_full(4).unpack("C*"))
buf.rewind
size = buf.get_int
buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4)
@n_remaining -= 1
buf.rewind
BSON.new.deserialize(buf)
end
def send_query_if_needed
# Run query first time we request an object from the wire
if @query_run
false
else
@db._synchronize {
@db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query))
@query_run = true
read_all
}
true
end
end
def to_s
"DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from, n_returned=#@n_returned)"
end
def check_modifiable
if @query_run || @closed
raise InvalidOperation, "Cannot modify the query once it has been run or closed."
end
end
end
end