Skip to content

Commit

Permalink
Merge pull request #269 from IlyaSkriblovsky/267-and-268
Browse files Browse the repository at this point in the history
#267, #268, #270 and #271 combined
  • Loading branch information
IlyaSkriblovsky committed Aug 24, 2020
2 parents 1a1ee6e + fb684f4 commit 7226996
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 151 deletions.
37 changes: 6 additions & 31 deletions .travis.yml
@@ -1,15 +1,14 @@
language: python
python:
- "2.7"
- "3.5"
- "pypy"
- "3.8"
- "pypy3"


env:
- TOX_ENV=tw166
- TOX_ENV=tw175
- TOX_ENV=tw179
- TOX_ENV=tw189
- TOX_ENV=tw1910
- TOX_ENV=tw203
- TOX_ENV=twlatest
- TOX_ENV=twtrunk

Expand All @@ -18,38 +17,14 @@ matrix:
fast_finish: true
allow_failures:
- env: TOX_ENV=pyflakes
- python: pypy
env: TOX_ENV=twtrunk
- python: pypy3
env: TOX_ENV=twtrunk

exclude:
- python: 2.7
env: TOX_ENV=twlatest
- python: 2.7
env: TOX_ENV=twtrunk

include:
- python: 3.5
- python: 3.8
env: TOX_ENV=pyflakes
- python: 3.5
- python: 3.8
env: TOX_ENV=manifest
- python: 3.4
env: TOX_ENV=tw179
- python: 3.4
env: TOX_ENV=tw166
- python: 2.7
env: TOX_ENV=tw140
- python: 2.7
env: TOX_ENV=tw150
- python: 2.7
env: TOX_ENV=tw155
- python: pypy
env: TOX_ENV=tw140
- python: pypy
env: TOX_ENV=tw150
- python: pypy
env: TOX_ENV=tw155



Expand Down
4 changes: 2 additions & 2 deletions MANIFEST.in
Expand Up @@ -6,11 +6,11 @@ include tox.ini
include *.spec
include AUTHORS
include CONTRIBUTORS
prune Makefile
include Makefile
prune debian
prune docs
recursive-include examples *.py
recursive-include examples *.sh
recursive-include examples *.tac
recursive-include tests *.py
recursive-include txmongo *.py
recursive-include txmongo *.py
6 changes: 2 additions & 4 deletions tests/mongod.py
Expand Up @@ -20,7 +20,6 @@

from twisted.python.compat import intToBytes
from twisted.python.filepath import FilePath
from twisted.python.compat import _PY3
from twisted.internet import defer, reactor
from twisted.internet.error import ProcessDone

Expand Down Expand Up @@ -53,9 +52,8 @@ def __init__(self, port=27017, auth=False, replset=None, dbpath=None, args=()):
self.__datadir = dbpath
self.__rmdatadir = False

if _PY3:
# Ensure it is always bytes
self.__datadir = FilePath(self.__datadir).asBytesMode().path
# Ensure it is always bytes
self.__datadir = FilePath(self.__datadir).asBytesMode().path


def start(self):
Expand Down
6 changes: 1 addition & 5 deletions tests/test_objects.py
Expand Up @@ -28,7 +28,6 @@
from txmongo._gridfs.errors import NoFile
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python.compat import _PY3
try:
from twisted._version import version as twisted_version
except ImportError:
Expand All @@ -45,10 +44,7 @@ def test_MongoObjects(self):
conn = yield txmongo.MongoConnection(mongo_host, mongo_port)
mydb = conn.mydb
self.assertEqual(isinstance(mydb, database.Database), True)
if _PY3:
self.assertEqual(repr(mydb), "Database(Connection('127.0.0.1', 27017), 'mydb')")
else:
self.assertEqual(repr(mydb), "Database(Connection('127.0.0.1', 27017), u'mydb')")
self.assertEqual(repr(mydb), "Database(Connection('127.0.0.1', 27017), 'mydb')")
self.assertEqual(repr(mydb("mydb2")), repr(mydb.__call__("mydb2")))
mycol = mydb.mycol
self.assertEqual(isinstance(mycol, collection.Collection), True)
Expand Down
31 changes: 13 additions & 18 deletions tests/test_replicaset.py
Expand Up @@ -143,15 +143,13 @@ def test_SwitchToMasterOnConnect(self):

@defer.inlineCallbacks
def test_AutoReconnect(self):
self.patch(_Connection, 'maxDelay', 5)

try:
uri = "mongodb://localhost:{0}/?w={1}".format(self.ports[0], len(self.ports))
conn = ConnectionPool(uri)
conn = ConnectionPool(uri, max_delay=5)

yield conn.db.coll.insert({'x': 42}, safe=True)

yield self.__mongod[0].stop()
self.__mongod[0].kill(signal.SIGSTOP)

while True:
try:
Expand All @@ -162,14 +160,14 @@ def test_AutoReconnect(self):
pass

finally:
self.__mongod[0].kill(signal.SIGCONT)
yield conn.disconnect()
self.flushLoggedErrors(AutoReconnect)

@defer.inlineCallbacks
def test_AutoReconnect_from_primary_step_down(self):
self.patch(_Connection, 'maxDelay', 5)
uri = "mongodb://localhost:{0}/?w={1}".format(self.ports[0], len(self.ports))
conn = ConnectionPool(uri)
conn = ConnectionPool(uri, max_delay=5)

# this will force primary to step down, triggering an AutoReconnect that bubbles up
# through the connection pool to the client
Expand All @@ -180,15 +178,13 @@ def test_AutoReconnect_from_primary_step_down(self):

@defer.inlineCallbacks
def test_find_with_timeout(self):
self.patch(_Connection, 'maxDelay', 5)

try:
uri = "mongodb://localhost:{0}/?w={1}".format(self.ports[0], len(self.ports))
conn = ConnectionPool(uri, initial_delay=3)
conn = ConnectionPool(uri, retry_delay=3, max_delay=5)

yield conn.db.coll.insert({'x': 42}, safe=True)

yield self.__mongod[0].stop()
self.__mongod[0].kill(signal.SIGSTOP)

while True:
try:
Expand All @@ -200,20 +196,19 @@ def test_find_with_timeout(self):
pass

finally:
self.__mongod[0].kill(signal.SIGCONT)
yield conn.disconnect()
self.flushLoggedErrors(AutoReconnect)

@defer.inlineCallbacks
def test_find_with_deadline(self):
self.patch(_Connection, 'maxDelay', 5)

try:
uri = "mongodb://localhost:{0}/?w={1}".format(self.ports[0], len(self.ports))
conn = ConnectionPool(uri, initial_delay=3)
conn = ConnectionPool(uri, retry_delay=3, max_delay=5)

yield conn.db.coll.insert({'x': 42}, safe=True)

yield self.__mongod[0].stop()
self.__mongod[0].kill(signal.SIGSTOP)

while True:
try:
Expand All @@ -225,20 +220,19 @@ def test_find_with_deadline(self):
pass

finally:
self.__mongod[0].kill(signal.SIGCONT)
yield conn.disconnect()
self.flushLoggedErrors(AutoReconnect)

@defer.inlineCallbacks
def test_TimeExceeded_insert(self):
self.patch(_Connection, 'maxDelay', 5)

try:
uri = "mongodb://localhost:{0}/?w={1}".format(self.ports[0], len(self.ports))
conn = ConnectionPool(uri, initial_delay=3)
conn = ConnectionPool(uri, retry_delay=3, max_delay=5)

yield conn.db.coll.insert({'x': 42}, safe=True)

yield self.__mongod[0].stop()
self.__mongod[0].kill(signal.SIGSTOP)

while True:
try:
Expand All @@ -250,6 +244,7 @@ def test_TimeExceeded_insert(self):
pass

finally:
self.__mongod[0].kill(signal.SIGCONT)
yield conn.disconnect()
self.flushLoggedErrors(AutoReconnect)

Expand Down
12 changes: 4 additions & 8 deletions tox.ini
@@ -1,7 +1,6 @@
[tox]
envlist =
{tw155,tw150,tw140},
{tw166,tw175,tw179,twtrunk,twlatest},
{tw189,tw1910,tw203,twtrunk,twlatest},
pyflakes, manifest


Expand All @@ -14,12 +13,9 @@ deps =
pycrypto
twlatest: Twisted
twtrunk: https://github.com/twisted/twisted/archive/trunk.zip
tw179: Twisted==17.9.0
tw175: Twisted==17.5.0
tw166: Twisted==16.6.0
tw155: Twisted==15.5
tw150: Twisted==15.0
tw140: Twisted==14.0
tw203: Twisted==20.3.0
tw1910: Twisted==19.10.0
tw189: Twisted==18.9.0
setenv =
PYTHONPATH = {toxinidir}
commands =
Expand Down
86 changes: 4 additions & 82 deletions txmongo/collection.py
Expand Up @@ -11,11 +11,9 @@
from bson.code import Code
from bson.son import SON
from bson.codec_options import CodecOptions
from pymongo.bulk import _Bulk, _COMMANDS, _UOP
from pymongo.errors import InvalidName, BulkWriteError, InvalidOperation, OperationFailure, DuplicateKeyError, \
WriteError, WTimeoutError, WriteConcernError
from pymongo.helpers import _check_command_response
from pymongo.message import _OP_MAP, _INSERT, _DELETE, _UPDATE
from pymongo.bulk import _Bulk, _COMMANDS
from pymongo.errors import InvalidName, BulkWriteError, InvalidOperation, OperationFailure
from pymongo.message import _OP_MAP, _INSERT
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, \
DeleteResult, BulkWriteResult
from pymongo.common import validate_ok_for_update, validate_ok_for_replace, \
Expand All @@ -25,89 +23,13 @@
from txmongo.filter import _QueryFilter
from txmongo.protocol import DELETE_SINGLE_REMOVE, UPDATE_UPSERT, UPDATE_MULTI, \
Query, Getmore, Insert, Update, Delete, KillCursors
from txmongo.pymongo_internals import _check_write_command_response, _merge_command, _check_command_response
from txmongo.utils import check_deadline, timeout
from txmongo import filter as qf
from twisted.internet import defer
from twisted.python.compat import unicode, comparable


# Copied from pymongo/helpers.py:193 at commit 47b0d8ebfd6cefca80c1e4521b47aec7cf8f529d
def _raise_last_write_error(write_errors):
# If the last batch had multiple errors only report
# the last error to emulate continue_on_error.
error = write_errors[-1]
if error.get("code") == 11000:
raise DuplicateKeyError(error.get("errmsg"), 11000, error)
raise WriteError(error.get("errmsg"), error.get("code"), error)


# Copied from pymongo/helpers.py:202 at commit 47b0d8ebfd6cefca80c1e4521b47aec7cf8f529d
def _raise_write_concern_error(error):
if "errInfo" in error and error["errInfo"].get('wtimeout'):
# Make sure we raise WTimeoutError
raise WTimeoutError(
error.get("errmsg"), error.get("code"), error)
raise WriteConcernError(
error.get("errmsg"), error.get("code"), error)


# Copied from pymongo/helpers.py:211 at commit 47b0d8ebfd6cefca80c1e4521b47aec7cf8f529d
def _check_write_command_response(result):
"""Backward compatibility helper for write command error handling.
"""
# Prefer write errors over write concern errors
write_errors = result.get("writeErrors")
if write_errors:
_raise_last_write_error(write_errors)

error = result.get("writeConcernError")
if error:
_raise_write_concern_error(error)


# Copied from pymongo/bulk.py:93 at commit 96aaf2f5279fb9eee5d0c1a2ce53d243b2772eee
def _merge_command(run, full_result, results):
"""Merge a group of results from write commands into the full result.
"""
for offset, result in results:

affected = result.get("n", 0)

if run.op_type == _INSERT:
full_result["nInserted"] += affected

elif run.op_type == _DELETE:
full_result["nRemoved"] += affected

elif run.op_type == _UPDATE:
upserted = result.get("upserted")
if upserted:
n_upserted = len(upserted)
for doc in upserted:
doc["index"] = run.index(doc["index"] + offset)
full_result["upserted"].extend(upserted)
full_result["nUpserted"] += n_upserted
full_result["nMatched"] += (affected - n_upserted)
else:
full_result["nMatched"] += affected
full_result["nModified"] += result["nModified"]

write_errors = result.get("writeErrors")
if write_errors:
for doc in write_errors:
# Leave the server response intact for APM.
replacement = doc.copy()
idx = doc["index"] + offset
replacement["index"] = run.index(idx)
# Add the failed operation to the error document.
replacement[_UOP] = run.ops[idx]
full_result["writeErrors"].append(replacement)

wc_error = result.get("writeConcernError")
if wc_error:
full_result["writeConcernErrors"].append(wc_error)


@comparable
class Collection(object):
"""Creates new :class:`Collection` object
Expand Down
2 changes: 1 addition & 1 deletion txmongo/database.py
Expand Up @@ -4,9 +4,9 @@

from bson.son import SON
from bson.codec_options import DEFAULT_CODEC_OPTIONS
from pymongo.helpers import _check_command_response
from twisted.python.compat import unicode
from txmongo.collection import Collection
from txmongo.pymongo_internals import _check_command_response
from txmongo.utils import timeout


Expand Down

0 comments on commit 7226996

Please sign in to comment.