Skip to content

Commit

Permalink
Merge a1038cc into 85a7ac7
Browse files Browse the repository at this point in the history
  • Loading branch information
trenton42 committed Feb 26, 2018
2 parents 85a7ac7 + a1038cc commit 3bea089
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 59 deletions.
20 changes: 14 additions & 6 deletions docs/source/NEWS.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

Release 18.1.0 (UNRELEASED)
---------------------------

Bugfixes
^^^^^^^^

- Fixed compatibility of `Collection.aggregate()` with PyMongo 3.6

Release 18.0.0 (2018-01-02)
---------------------------

Expand Down Expand Up @@ -30,8 +38,8 @@ Features
- Client authentication by X509 certificates. Use your client certificate when connecting
to MongoDB and then call ``Database.authenticate`` with certificate subject as username,
empty password and ``mechanism="MONGODB-X509"``.
- ``get_version()`` to approximate the behaviour of get_version in PyMongo. One noteable exception
is the omission of searching by random (unindexed) meta-data which should be considered a bad idea
- ``get_version()`` to approximate the behaviour of get_version in PyMongo. One noteable exception
is the omission of searching by random (unindexed) meta-data which should be considered a bad idea
as it may create *very* variable conditions in terms of loading and timing.
- New ``ConnectionPool.drop_database()`` method for easy and convenient destruction of all your precious data.
- ``count()`` to return the number of versions of any given file in GridFS.
Expand All @@ -41,14 +49,14 @@ API Changes

- ``find()``, ``find_one()``, ``find_with_cursor()``, ``count()`` and ``distinct()`` signatures
changed to more closely match PyMongo's counterparts. New signatures are:

- ``find(filter=None, projection=None, skip=0, limit=0, sort=None, **kwargs)``
- ``find_with_cursor(filter=None, projection=None, skip=0, limit=0, sort=None, **kwargs)``
- ``find_one(filter=None, projection=None, **kwargs)``
- ``count(filter=None, **kwargs)``
- ``distinct(key, filter=None, **kwargs)``
Old signatures are now deprecated and will be supported in this and one subsequent releases.

Old signatures are now deprecated and will be supported in this and one subsequent releases.
After that only new signatures will be valid.
- ``cursor`` argument to ``find()`` is deprecated. Please use ``find_with_cursor()`` directly
if you need to iterate over results by batches. ``cursor`` will be supported in this and
Expand Down Expand Up @@ -78,7 +86,7 @@ Features
- ``codec_options`` properties for ``ConnectionPool``, ``Database`` and ``Collection``.
``Collection.with_options(codec_options=CodecOptions(document_class=...))`` is now preferred
over ``Collection.find(..., as_class=...)``.

Bugfixes
^^^^^^^^

Expand Down
106 changes: 53 additions & 53 deletions txmongo/collection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2009-2015 The TxMongo Developers. All rights reserved.
# Use of this source code is governed by the Apache License that can be
# found in the LICENSE file.
"""Copyright 2009-2018 The TxMongo Developers. All rights reserved.
Use of this source code is governed by the Apache License that can be
found in the LICENSE file."""
# pylint: disable=W0622

from __future__ import absolute_import, division
import io
Expand All @@ -12,8 +13,10 @@
from bson.son import SON
from bson.codec_options import CodecOptions
from pymongo.bulk import _Bulk, _COMMANDS, _merge_command
from pymongo.errors import InvalidName, BulkWriteError, InvalidOperation, OperationFailure, DuplicateKeyError, \
from pymongo.errors import (
InvalidName, BulkWriteError, InvalidOperation, OperationFailure, DuplicateKeyError,
WriteError, WTimeoutError, WriteConcernError
)
from pymongo.message import _OP_MAP, _INSERT
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, \
DeleteResult, BulkWriteResult
Expand Down Expand Up @@ -91,7 +94,9 @@ def __init__(self, database, name, write_concern=None, codec_options=None):
msg = "TxMongo: collection names must not contain '$', '{0}'".format(repr(name))
raise InvalidName(msg)
if name[0] == "." or name[-1] == ".":
msg = "TxMongo: collection names must not start or end with '.', '{0}'".format(repr(name))
msg = "TxMongo: collection names must not start or end with '.', '{0}'".format(
repr(name)
)
raise InvalidName(msg)
if "\x00" in name:
raise InvalidName("TxMongo: collection names must not contain the null character.")
Expand Down Expand Up @@ -222,12 +227,10 @@ def on_ok(response):
first_batch = response["cursor"]["firstBatch"]
if first_batch:
return first_batch[0]
else:
return None
return None

return self._database.command(
SON([("listCollections", 1),
("filter", {"name": self.name})])).addCallback(on_ok)
SON([("listCollections", 1), ("filter", {"name": self.name})])).addCallback(on_ok)

@timeout
def options(self, _deadline=None):
Expand Down Expand Up @@ -286,8 +289,7 @@ def new(filter=None, projection=None, skip=0, limit=0, sort=None, **kwargs):

if any(old_if):
return old(*args, **kwargs)
else:
return new(*args, **kwargs)
return new(*args, **kwargs)

@timeout
def find(self, *args, **kwargs):
Expand Down Expand Up @@ -345,8 +347,7 @@ def on_ok(result, this_func):
if docs:
rows.extend(docs)
return dfr.addCallback(this_func, this_func)
else:
return rows
return rows

return self.__real_find_with_cursor(filter, projection, skip, limit, sort,
**kwargs).addCallback(on_ok, on_ok)
Expand Down Expand Up @@ -561,11 +562,10 @@ def _get_write_concern(self, safe=None, **options):
elif safe:
if self.write_concern.acknowledged:
return self.write_concern
else:
# Edge case: MongoConnection(w=0).db.coll.insert(..., safe=True)
# In this case safe=True must issue getLastError without args
# even if connection-level write concern was unacknowledged
return WriteConcern()
# Edge case: MongoConnection(w=0).db.coll.insert(..., safe=True)
# In this case safe=True must issue getLastError without args
# even if connection-level write concern was unacknowledged
return WriteConcern()

return WriteConcern(w=0)

Expand Down Expand Up @@ -625,8 +625,8 @@ def on_proto(proto):

write_concern = self._get_write_concern(safe, **kwargs)
if write_concern.acknowledged:
return proto.get_last_error(str(self._database), **write_concern.document)\
.addCallback(lambda _: ids)
return proto.get_last_error(
str(self._database), **write_concern.document).addCallback(lambda _: ids)

return ids

Expand All @@ -639,10 +639,9 @@ def _insert_one(self, document, _deadline):
("ordered", True),
("writeConcern", self.write_concern.document)])
return self._database.command(command, _deadline=_deadline)
else:
# falling back to OP_INSERT in case of unacknowledged op
return self.insert([document], _deadline=_deadline)\
.addCallback(lambda _: None)

# falling back to OP_INSERT in case of unacknowledged op
return self.insert([document], _deadline=_deadline).addCallback(lambda _: None)

@timeout
def insert_one(self, document, _deadline=None):
Expand Down Expand Up @@ -708,7 +707,7 @@ def prepare_command():
key = str(idx).encode('ascii')
value = BSON.encode(doc)

enough_size = buf.tell() + len(key)+2 + len(value) - docs_start > max_bson
enough_size = buf.tell() + len(key) + 2 + len(value) - docs_start > max_bson
enough_count = idx >= max_count
if enough_size or enough_count:
yield idx_offset, prepare_command()
Expand Down Expand Up @@ -846,9 +845,9 @@ def on_ok(raw_response):

return self._database.command(command, _deadline=_deadline).addCallback(on_ok)

else:
return self.update(filter, update, upsert=upsert, multi=multi,
_deadline=_deadline).addCallback(lambda _: None)
return self.update(
filter, update, upsert=upsert, multi=multi, _deadline=_deadline
).addCallback(lambda _: None)

@timeout
def update_one(self, filter, update, upsert=False, _deadline=None):
Expand Down Expand Up @@ -959,8 +958,8 @@ def save(self, doc, safe=None, **kwargs):
oid = doc.get("_id")
if oid:
return self.update({"_id": oid}, doc, safe=safe, upsert=True, **kwargs)
else:
return self.insert(doc, safe=safe, **kwargs)

return self.insert(doc, safe=safe, **kwargs)

@timeout
def remove(self, spec, safe=None, single=False, flags=0, **kwargs):
Expand Down Expand Up @@ -999,9 +998,9 @@ def on_ok(raw_response):
return raw_response
return self._database.command(command, _deadline=_deadline).addCallback(on_ok)

else:
return self.remove(filter, single=not multi, _deadline=_deadline)\
.addCallback(lambda _: None)
return self.remove(
filter, single=not multi, _deadline=_deadline
).addCallback(lambda _: None)

@timeout
def delete_one(self, filter, _deadline=None):
Expand Down Expand Up @@ -1048,8 +1047,8 @@ def create_index(self, sort_fields, **kwargs):
kwargs["bucketSize"] = kwargs.pop("bucket_size")

index.update(kwargs)
return self._database.system.indexes.insert(index, safe=True)\
.addCallback(lambda _: name)
return self._database.system.indexes.insert(
index, safe=True).addCallback(lambda _: name)

@timeout
def ensure_index(self, sort_fields, _deadline=None, **kwargs):
Expand Down Expand Up @@ -1081,8 +1080,8 @@ def on_ok(indexes_info):
assert indexes_info["cursor"]["id"] == 0
return indexes_info["cursor"]["firstBatch"]
codec = CodecOptions(document_class=SON)
return self._database.command("listIndexes", self.name, codec_options=codec)\
.addCallback(on_ok)
return self._database.command(
"listIndexes", self.name, codec_options=codec).addCallback(on_ok)

@timeout
def index_information(self, _deadline=None):
Expand All @@ -1100,7 +1099,6 @@ def on_ok(raw):

return self.__index_information_3_0().addErrback(on_3_0_fail).addCallback(on_ok)


@timeout
def rename(self, new_name, _deadline=None):
"""rename(new_name)"""
Expand All @@ -1126,19 +1124,21 @@ def on_ok(raw):
if full_response:
return raw
return raw.get("result")
return self._database.command("aggregate", self._collection_name, pipeline = pipeline,
_deadline = _deadline).addCallback(on_ok)
return self._database.command(
"aggregate", self._collection_name, pipeline=pipeline, _deadline=_deadline
).addCallback(on_ok)

@timeout
def map_reduce(self, map, reduce, full_response=False, **kwargs):
params = {"map": map, "reduce": reduce}
params.update(**kwargs)

def on_ok(raw):
if full_response:
return raw
return raw.get("results")
return self._database.command("mapreduce", self._collection_name, **params)\
.addCallback(on_ok)
return self._database.command(
"mapreduce", self._collection_name, **params).addCallback(on_ok)

@timeout
def find_and_modify(self, query=None, update=None, upsert=False, **kwargs):
Expand Down Expand Up @@ -1200,8 +1200,9 @@ def _new_find_and_modify(self, filter, projection, sort, upsert=None,

no_obj_error = "No matching object found"

return self._database.command(cmd, allowable_errors=[no_obj_error], _deadline=_deadline)\
.addCallback(lambda result: result.get("value"))
return self._database.command(
cmd, allowable_errors=[no_obj_error], _deadline=_deadline
).addCallback(lambda result: result.get("value"))

@timeout
def find_one_and_delete(self, filter, projection=None, sort=None, _deadline=None):
Expand Down Expand Up @@ -1297,7 +1298,6 @@ def on_all_done(_):

return iterate(iterate, on_cmd_result).addCallback(on_all_done)


def _execute_batch_command(self, command_type, documents, ordered):
assert command_type in _OP_MAP

Expand Down Expand Up @@ -1325,7 +1325,7 @@ def accumulate_result(reply, idx_offset):

actual_write_concern = self.write_concern
if ordered and self.write_concern.acknowledged is False:
actual_write_concern = WriteConcern(w = 1)
actual_write_concern = WriteConcern(w=1)

batches = self._generate_batch_commands(self._collection_name, _COMMANDS[command_type],
_OP_MAP[command_type], documents, ordered,
Expand All @@ -1348,20 +1348,20 @@ def iterate(iterate_func):
def on_batch_result(result):
if "writeErrors" in result:
return defer.succeed(None)
else:
return iterate_func(iterate_func)
return iterate_func(iterate_func)

return batch_result.addCallback(on_batch_result)
else:
all_responses.append(batch_result)
return iterate_func(iterate_func)
else:

all_responses.append(batch_result)
return iterate_func(iterate_func)

return iterate_func(iterate_func)

def done(_):
def on_fail(failure):
failure.trap(defer.FirstError)
failure.value.subFailure.raiseException()

if self.write_concern.acknowledged and not ordered:
return defer.gatherResults(all_responses, consumeErrors=True)\
.addErrback(on_fail)
Expand Down

0 comments on commit 3bea089

Please sign in to comment.