Skip to content

Commit

Permalink
Added support for batchsize parameter in collection.py/ (#226)
Browse files Browse the repository at this point in the history
* Added support for batchsize parameter in collection.py/__real_find_with_cursor
Handle CursorNotFound in handle_REPLY

* Pulled in changes from upstream/master (Collection.aggregate)

* fixed naming convention issues (PEP8 compliance)
added tests

* Cleaned up imports

* - Styling issues and consistency in variable naming/error messages
- Ensure compatibility with Twisted 14 in tests
  • Loading branch information
evert2410 authored and psi29a committed Mar 9, 2018
1 parent 311644d commit fcae7d7
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 5 deletions.
8 changes: 8 additions & 0 deletions docs/source/NEWS.rst
@@ -1,6 +1,9 @@
Changelog
=========




Release 18.1.0 (UNRELEASED)
---------------------------

Expand All @@ -18,6 +21,11 @@ Bugfixes

- Fixed compatibility with PyMongo 3.6

Features
^^^^^^^^

- Added support for paged request: implementation of batchsize parameter in Collection.find_with_cursor


Release 17.1.0 (2017-08-11)
---------------------------
Expand Down
26 changes: 25 additions & 1 deletion tests/test_protocol.py
Expand Up @@ -17,11 +17,14 @@

from bson import BSON
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python.compat import unicode

from tests.utils import SingleCollectionTest

from txmongo.protocol import MongoClientProtocol, MongoDecoder, Insert, Query, \
KillCursors, Getmore, Update, Delete, UPDATE_MULTI, UPDATE_UPSERT, \
DELETE_SINGLE_REMOVE
DELETE_SINGLE_REMOVE, CursorNotFound


class _FakeTransport(object):
Expand Down Expand Up @@ -86,3 +89,24 @@ def test_EncodeDecodeDelete(self):
request = Delete(flags=DELETE_SINGLE_REMOVE, collection="coll",
selector=BSON.encode({'x': 42}))
self.__test_encode_decode(request)

class TestCursors(SingleCollectionTest):

@defer.inlineCallbacks
def test_CursorNotFound(self):

yield self.coll.insert([{'v': i} for i in range(140)], safe=True)

protocol = yield self.conn.getprotocol()

query = Query(query={},n_to_return=10,collection=str(self.coll))

query_result = yield protocol.send_QUERY(query)

cursor_id = query_result.cursor_id

yield protocol.send_KILL_CURSORS(KillCursors(cursors=[cursor_id]))

self.assertFailure(protocol.send_GETMORE(Getmore(collection = str(self.coll),cursor_id = cursor_id,n_to_return = 10)),
CursorNotFound)

36 changes: 36 additions & 0 deletions tests/test_queries.py
Expand Up @@ -94,6 +94,42 @@ def test_FindWithCursorLimit(self):
docs, d = yield d
self.assertEqual(total, 150)

@defer.inlineCallbacks
def test_FindWithCursorBatchsize(self):
yield self.coll.insert([{'v': i} for i in range(140)], safe=True)

docs, d = yield self.coll.find_with_cursor(batch_size=50)
lengths = []
while docs:
lengths.append(len(docs))
docs, d = yield d
self.assertEqual(lengths, [50,50,40])

@defer.inlineCallbacks
def test_FindWithCursorBatchsizeLimit(self):
yield self.coll.insert([{'v': i} for i in range(140)], safe=True)

docs, d = yield self.coll.find_with_cursor(batch_size=50,limit=10)
lengths = []
while docs:
lengths.append(len(docs))
docs, d = yield d
self.assertEqual(lengths, [10])

@defer.inlineCallbacks
def test_FindWithCursorZeroBatchsize(self):
yield self.coll.insert([{'v': i} for i in range(140)], safe=True)

docs, d = yield self.coll.find_with_cursor(batch_size=0)
lengths = []
while docs:
lengths.append(len(docs))
docs, d = yield d
print(lengths)
self.assertEqual(lengths, [101,39])



@defer.inlineCallbacks
def test_LargeData(self):
yield self.coll.insert([{'v': ' '*(2**19)} for _ in range(4)], safe=True)
Expand Down
25 changes: 22 additions & 3 deletions txmongo/collection.py
Expand Up @@ -367,7 +367,7 @@ def __apply_find_filter(spec, c_filter):

@timeout
def find_with_cursor(self, *args, **kwargs):
"""find_with_cursor(filter=None, projection=None, skip=0, limit=0, sort=None, **kwargs)
"""find_with_cursor(filter=None, projection=None, skip=0, limit=0, sort=None, batch_size=0, **kwargs)
Find documents in a collection and return them in one batch at a time.
Expand All @@ -389,7 +389,8 @@ def query():
new_kwargs = self._find_args_compat(*args, **kwargs)
return self.__real_find_with_cursor(**new_kwargs)

def __real_find_with_cursor(self, filter=None, projection=None, skip=0, limit=0, sort=None, **kwargs):
def __real_find_with_cursor(self, filter=None, projection=None, skip=0, limit=0, sort=None, batch_size=0,**kwargs):

if filter is None:
filter = SON()

Expand All @@ -401,6 +402,8 @@ def __real_find_with_cursor(self, filter=None, projection=None, skip=0, limit=0,
raise TypeError("TxMongo: skip must be an instance of int.")
if not isinstance(limit, int):
raise TypeError("TxMongo: limit must be an instance of int.")
if not isinstance(batch_size, int):
raise TypeError("TxMongo: batch_size must be an instance of int.")

projection = self._normalize_fields_projection(projection)

Expand All @@ -414,8 +417,15 @@ def after_connection(protocol):

check_deadline(kwargs.pop("_deadline", None))

if batch_size and limit:
n_to_return = min(batch_size,limit)
elif batch_size:
n_to_return = batch_size
else:
n_to_return = limit

query = Query(flags=flags, collection=str(self),
n_to_skip=skip, n_to_return=limit,
n_to_skip=skip, n_to_return=n_to_return,
query=filter, fields=projection)

deferred_query = protocol.send_QUERY(query)
Expand All @@ -427,8 +437,10 @@ def after_connection(protocol):
# reference between closure and function object which will add unnecessary
# work for GC.
def after_reply(reply, protocol, this_func, fetched=0):

documents = reply.documents
docs_count = len(documents)

if limit > 0:
docs_count = min(docs_count, limit - fetched)
fetched += docs_count
Expand All @@ -439,8 +451,13 @@ def after_reply(reply, protocol, this_func, fetched=0):
out = [document.decode(codec_options=options) for document in documents[:docs_count]]

if reply.cursor_id:
# please note that this will not be the case if batch_size = 1
# it is documented (parameter numberToReturn for OP_QUERY)
# https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#wire-op-query
if limit == 0:
to_fetch = 0 # no limit
if batch_size:
to_fetch = batch_size
elif limit < 0:
# We won't actually get here because MongoDB won't
# create cursor when limit < 0
Expand All @@ -449,6 +466,8 @@ def after_reply(reply, protocol, this_func, fetched=0):
to_fetch = limit - fetched
if to_fetch <= 0:
to_fetch = None # close cursor
elif batch_size:
to_fetch = min(batch_size,to_fetch)

if to_fetch is None:
protocol.send_KILL_CURSORS(KillCursors(cursors=[reply.cursor_id]))
Expand Down
7 changes: 6 additions & 1 deletion txmongo/protocol.py
Expand Up @@ -22,7 +22,7 @@
import logging
from pymongo import auth
from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError, OperationFailure, \
NotMasterError
NotMasterError, CursorNotFound
from random import SystemRandom
import struct
from twisted.internet import defer, protocol, error
Expand Down Expand Up @@ -381,6 +381,11 @@ def handle_REPLY(self, request):
df.errback(err)
if fail_conn:
self.transport.loseConnection()
elif request.response_flags & REPLY_CURSOR_NOT_FOUND:
# Inspired by pymongo handling
msg = "Cursor not found, cursor id: %d" % (request.cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
df.errback(CursorNotFound(msg, 43, errobj))
else:
df.callback(request)

Expand Down

0 comments on commit fcae7d7

Please sign in to comment.