Skip to content

Commit

Permalink
Merge 7de99f0 into 2074262
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaSkriblovsky committed Jul 6, 2018
2 parents 2074262 + 7de99f0 commit 69e4824
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/source/NEWS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Bugfixes

- In combination with PyMongo 3.6.0 `bulk_write` might sometimes raise
KeyError when bulk operation was interrupted (by failover, for example)
- Compatibility with PyMongo 3.7


Release 18.1.0 (2018-03-21)
Expand Down
1 change: 0 additions & 1 deletion tests/mongod.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def start(self):
b"--noprealloc", b"--nojournal",
b"--smallfiles", b"--nssize", b"1",
b"--oplogSize", b"1",
b"--nohttpinterface",
]
if self.auth: args.append(b"--auth")
if self.replset: args.extend([b"--replSet", self.replset])
Expand Down
4 changes: 2 additions & 2 deletions tests/test_bulk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from bson import BSON
from pymongo import InsertOne
from pymongo.errors import BulkWriteError, OperationFailure
from pymongo.errors import BulkWriteError, OperationFailure, NotMasterError
from pymongo.operations import UpdateOne, DeleteOne, UpdateMany, ReplaceOne
from pymongo.results import BulkWriteResult
from pymongo.write_concern import WriteConcern
Expand Down Expand Up @@ -242,4 +242,4 @@ def fake_send_query(*args):
with patch('txmongo.protocol.MongoProtocol.send_QUERY', side_effect=fake_send_query):
yield self.assertFailure(
self.coll.bulk_write([UpdateOne({}, {'$set': {'x': 42}}, upsert=True)], ordered=True),
OperationFailure)
OperationFailure, NotMasterError)
11 changes: 9 additions & 2 deletions tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def test_TextIndex(self):
def __3_2_or_higher(self):
return self.db.command("buildInfo").addCallback(lambda info: info["versionArray"] >= [3, 2])

def __3_6_or_higher(self):
return self.db.command("buildInfo").addCallback(lambda info: info["versionArray"] >= [3, 6])

@defer.inlineCallbacks
def __test_simple_filter(self, filter, optionname, optionvalue):
# Checking that `optionname` appears in profiler log with specified value
Expand All @@ -88,7 +91,9 @@ def __test_simple_filter(self, filter, optionname, optionvalue):
yield self.coll.find({}, filter=filter)
yield self.db.command("profile", 0)

if (yield self.__3_2_or_higher()):
if (yield self.__3_6_or_higher()):
profile_filter = {"command." + optionname: optionvalue}
elif (yield self.__3_2_or_higher()):
# query options format in system.profile have changed in MongoDB 3.2
profile_filter = {"query." + optionname: optionvalue}
else:
Expand Down Expand Up @@ -123,7 +128,9 @@ def test_FilterMerge(self):
yield self.coll.find({}, filter=qf.sort(qf.ASCENDING('x')) + qf.comment(comment))
yield self.db.command("profile", 0)

if (yield self.__3_2_or_higher()):
if (yield self.__3_6_or_higher()):
profile_filter = {"command.sort.x": 1, "command.comment": comment}
elif (yield self.__3_2_or_higher()):
profile_filter = {"query.sort.x": 1, "query.comment": comment}
else:
profile_filter = {"query.$orderby.x": 1, "query.$comment": comment}
Expand Down
3 changes: 2 additions & 1 deletion tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ def test_CheckResult(self):
"delete", "mycol", check=True,
allowable_errors=[
"missing deletes field",
"The deletes option is required to the delete command."
"The deletes option is required to the delete command.",
"BSON field 'delete.deletes' is missing but a required field"
]
)
self.assertFalse(result["ok"])
Expand Down
46 changes: 44 additions & 2 deletions txmongo/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from bson.code import Code
from bson.son import SON
from bson.codec_options import CodecOptions
from pymongo.bulk import _Bulk, _COMMANDS, _merge_command
from pymongo.bulk import _Bulk, _COMMANDS, _UOP
from pymongo.errors import InvalidName, BulkWriteError, InvalidOperation, OperationFailure, DuplicateKeyError, \
WriteError, WTimeoutError, WriteConcernError
from pymongo.helpers import _check_command_response
from pymongo.message import _OP_MAP, _INSERT
from pymongo.message import _OP_MAP, _INSERT, _DELETE, _UPDATE
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, \
DeleteResult, BulkWriteResult
from pymongo.common import validate_ok_for_update, validate_ok_for_replace, \
Expand Down Expand Up @@ -62,6 +62,48 @@ def _check_write_command_response(result):
_raise_write_concern_error(error)


def _merge_command(run, full_result, results):
"""Merge a group of results from write commands into the full result.
"""
for offset, result in results:

affected = result.get("n", 0)

if run.op_type == _INSERT:
full_result["nInserted"] += affected

elif run.op_type == _DELETE:
full_result["nRemoved"] += affected

elif run.op_type == _UPDATE:
upserted = result.get("upserted")
if upserted:
n_upserted = len(upserted)
for doc in upserted:
doc["index"] = run.index(doc["index"] + offset)
full_result["upserted"].extend(upserted)
full_result["nUpserted"] += n_upserted
full_result["nMatched"] += (affected - n_upserted)
else:
full_result["nMatched"] += affected
full_result["nModified"] += result["nModified"]

write_errors = result.get("writeErrors")
if write_errors:
for doc in write_errors:
# Leave the server response intact for APM.
replacement = doc.copy()
idx = doc["index"] + offset
replacement["index"] = run.index(idx)
# Add the failed operation to the error document.
replacement[_UOP] = run.ops[idx]
full_result["writeErrors"].append(replacement)

wc_error = result.get("writeConcernError")
if wc_error:
full_result["writeConcernErrors"].append(wc_error)


@comparable
class Collection(object):
"""Creates new :class:`Collection` object
Expand Down
57 changes: 53 additions & 4 deletions txmongo/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

from __future__ import absolute_import, division
import base64
from bson import BSON, SON, Binary
import hashlib

from bson import BSON, SON, Binary, PY3
from collections import namedtuple
from hashlib import sha1
import hmac
Expand All @@ -30,6 +32,53 @@
from twisted.python.compat import unicode


if PY3:
_from_bytes = int.from_bytes
_to_bytes = int.to_bytes
else:
from binascii import (hexlify as _hexlify, unhexlify as _unhexlify)

def _from_bytes(value, dummy, _int=int, _hexlify=_hexlify):
"""An implementation of int.from_bytes for python 2.x."""
return _int(_hexlify(value), 16)


def _to_bytes(value, length, dummy, _unhexlify=_unhexlify):
"""An implementation of int.to_bytes for python 2.x."""
fmt = '%%0%dx' % (2 * length,)
return _unhexlify(fmt % value)


try:
# The fastest option, if it's been compiled to use OpenSSL's HMAC.
from backports.pbkdf2 import pbkdf2_hmac as _hi
except ImportError:
try:
# Python 2.7.8+, or Python 3.4+.
from hashlib import pbkdf2_hmac as _hi
except ImportError:

def _hi(hash_name, data, salt, iterations):
"""A simple implementation of PBKDF2-HMAC."""
mac = hmac.HMAC(data, None, getattr(hashlib, hash_name))

def _digest(msg, mac=mac):
"""Get a digest for msg."""
_mac = mac.copy()
_mac.update(msg)
return _mac.digest()

from_bytes = _from_bytes
to_bytes = _to_bytes

_u1 = _digest(salt + b'\x00\x00\x00\x01')
_ui = from_bytes(_u1, 'big')
for _ in range(iterations - 1):
_u1 = _digest(_u1)
_ui ^= from_bytes(_u1, 'big')
return to_bytes(_ui, mac.digest_size, 'big')


INT_MAX = 2147483647

OP_REPLY = 1
Expand Down Expand Up @@ -467,9 +516,9 @@ def authenticate_scram_sha1(self, database_name, username, password):
raise MongoAuthenticationError("TxMongo: server returned an invalid nonce.")

without_proof = b"c=biws,r=" + rnonce
salted_pass = auth._hi(auth._password_digest(username, password).encode("utf-8"),
base64.standard_b64decode(salt),
iterations)
salted_pass = _hi('sha1', auth._password_digest(username, password).encode("utf-8"),
base64.standard_b64decode(salt),
iterations)
client_key = hmac.HMAC(salted_pass, b"Client Key", sha1).digest()
stored_key = sha1(client_key).digest()
auth_msg = b','.join((first_bare, server_first, without_proof))
Expand Down

0 comments on commit 69e4824

Please sign in to comment.