Skip to content

Commit

Permalink
Merge pull request #170 from IlyaSkriblovsky/stale-connection
Browse files Browse the repository at this point in the history
Handling stale connection
  • Loading branch information
psi29a committed Jun 15, 2016
2 parents 97933a0 + 54c3761 commit 89627a9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
3 changes: 3 additions & 0 deletions tests/mongod.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def stop(self):
else:
return defer.fail(self.__end_reason)

def kill(self, signal):
self.__proc.signalProcess(signal)

def makeConnection(self, process): pass
def childConnectionLost(self, child_fd): pass
def processExited(self, reason): pass
Expand Down
18 changes: 18 additions & 0 deletions tests/test_replicaset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import absolute_import, division

import signal
from bson import SON
from pymongo.errors import OperationFailure, AutoReconnect, ConfigurationError
from time import time
Expand Down Expand Up @@ -275,3 +276,20 @@ def do_query():
fireOnOneCallback=True,
fireOnOneErrback=True)
self.flushLoggedErrors(AutoReconnect)

@defer.inlineCallbacks
def test_StaleConnection(self):
conn = MongoConnection("localhost", self.ports[0])
try:
yield conn.db.coll.count()
self.__mongod[0].kill(signal.SIGSTOP)
yield self.__sleep(0.2)
while True:
try:
yield conn.db.coll.count()
break
except AutoReconnect:
pass
finally:
self.__mongod[0].kill(signal.SIGCONT)
yield conn.disconnect()
33 changes: 24 additions & 9 deletions txmongo/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
from twisted.python import log
from twisted.python.compat import StringType
from txmongo.database import Database
from txmongo.errors import TimeExceeded
from txmongo.protocol import MongoProtocol, Query

from txmongo.utils import timeout

DEFAULT_MAX_BSON_SIZE = 16777216
DEFAULT_MAX_WRITE_BATCH_SIZE = 1000
Expand All @@ -29,22 +30,23 @@ class _Connection(ReconnectingClientFactory):
__index = -1
__uri = None
__conf_loop = None
__conf_loop_seconds = 300.0

instance = None
protocol = MongoProtocol

def __init__(self, pool, uri, connection_id, initial_delay, max_delay):
def __init__(self, pool, uri, connection_id, initial_delay, max_delay, watchdog_interval,
watchdog_timeout):
self.__allnodes = list(uri["nodelist"])
self.__notify_ready = []
self.__pool = pool
self.__uri = uri
self.__conf_loop = task.LoopingCall(lambda: self.configure(self.instance))
self.__conf_loop.start(self.__conf_loop_seconds, now=False)
self.__conf_loop.start(watchdog_interval, now=False)
self.connection_id = connection_id
self.initialDelay = initial_delay
self.maxDelay = max_delay
self.__auth_creds = {}
self.__watchdog_timeout = watchdog_timeout

def buildProtocol(self, addr):
# Build the protocol.
Expand Down Expand Up @@ -73,6 +75,12 @@ def _initializeProto(self, proto):
except Exception as e:
proto.fail(e)

@staticmethod
@timeout
def __send_ismaster(proto, _deadline=None):
query = Query(collection="admin.$cmd", query={"ismaster": 1})
return proto.send_QUERY(query)

@defer.inlineCallbacks
def configure(self, proto):
"""
Expand All @@ -85,8 +93,11 @@ def configure(self, proto):
if not proto:
defer.returnValue(None)

query = Query(collection="admin.$cmd", query={"ismaster": 1})
reply = yield proto.send_QUERY(query)
try:
reply = yield self.__send_ismaster(proto, timeout=self.__watchdog_timeout)
except TimeExceeded:
proto.transport.abortConnection()
defer.returnValue(None)

# Handle the reply from the "ismaster" query. The reply contains
# configuration information about the peer.
Expand Down Expand Up @@ -243,10 +254,12 @@ class ConnectionPool(object):
__wc_possible_options = {'w', "wtimeout", 'j', "fsync"}

def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_factory=None,
**kwargs):
watchdog_interval=15, watchdog_timeout=5, **kwargs):
assert isinstance(uri, StringType)
assert isinstance(pool_size, int)
assert pool_size >= 1
assert watchdog_interval > 0
assert watchdog_timeout > 0

if not uri.startswith("mongodb://"):
uri = "mongodb://" + uri
Expand All @@ -261,8 +274,10 @@ def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_fac
retry_delay = kwargs.get('retry_delay', 1.0)
max_delay = kwargs.get('max_delay', 60.0)
self.__pool_size = pool_size
self.__pool = [_Connection(
self, self.__uri, i, retry_delay, max_delay) for i in range(pool_size)]
self.__pool = [
_Connection(self, self.__uri, i, retry_delay, max_delay, watchdog_interval, watchdog_timeout)
for i in range(pool_size)
]

if self.__uri['database'] and self.__uri['username'] and self.__uri['password']:
self.authenticate(self.__uri['database'], self.__uri['username'],
Expand Down

0 comments on commit 89627a9

Please sign in to comment.