Skip to content

Commit

Permalink
Merge 91e88f1 into dababde
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaSkriblovsky committed Oct 14, 2016
2 parents dababde + 91e88f1 commit 1bb4d54
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 30 deletions.
3 changes: 3 additions & 0 deletions docs/source/NEWS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ API Changes
^^^^^^^^^^^

- ``Database.command()`` now takes ``codec_options`` argument.
- ``watchdog_interval`` and ``watchdog_timeout`` arguments of ``ConnectionPool`` renamed
to ``ping_interval`` and ``ping_timeout`` correspondingly along with internal change of
connection aliveness checking mechanism.

Bugfixes
^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_protocol_connectionReady(self):
@defer.inlineCallbacks
def test_connection_notifyReady(self):
uri = parse_uri("mongodb://localhost:27017/")
conn = _Connection(None, uri, 0, 10, 10, 10, 10)
conn = _Connection(None, uri, 0, 10, 10)
d1 = conn.notifyReady()
d2 = conn.notifyReady()
d1.cancel()
Expand Down
6 changes: 4 additions & 2 deletions tests/test_replicaset.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ def do_query():

@defer.inlineCallbacks
def test_StaleConnection(self):
conn = MongoConnection("localhost", self.ports[0],
watchdog_interval=10, watchdog_timeout=5)
conn = MongoConnection("localhost", self.ports[0], ping_interval = 5, ping_timeout = 5)
try:
yield conn.db.coll.count()
# check that 5s pingers won't break connection if it is healthy
yield self.__sleep(6)
yield conn.db.coll.count()
self.__mongod[0].kill(signal.SIGSTOP)
yield self.__sleep(0.2)
Expand Down
168 changes: 141 additions & 27 deletions txmongo/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from twisted.internet import defer, reactor, task
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.protocol import ReconnectingClientFactory, ClientFactory
from twisted.python import log
from twisted.python.compat import StringType
from txmongo.database import Database
from txmongo.errors import TimeExceeded
from txmongo.protocol import MongoProtocol, Query
from txmongo.utils import timeout

Expand All @@ -30,24 +29,19 @@ class _Connection(ReconnectingClientFactory):
__allnodes = None
__index = -1
__uri = None
__conf_loop = None

instance = None
protocol = MongoProtocol

def __init__(self, pool, uri, connection_id, initial_delay, max_delay, watchdog_interval,
watchdog_timeout):
def __init__(self, pool, uri, connection_id, initial_delay, max_delay):
self.__allnodes = list(uri["nodelist"])
self.__notify_ready = []
self.__pool = pool
self.__uri = uri
self.__conf_loop = task.LoopingCall(lambda: self.configure(self.instance))
self.__conf_loop.start(watchdog_interval, now=False)
self.connection_id = connection_id
self.initialDelay = initial_delay
self.maxDelay = max_delay
self.__auth_creds = {}
self.__watchdog_timeout = watchdog_timeout

def buildProtocol(self, addr):
# Build the protocol.
Expand Down Expand Up @@ -78,7 +72,7 @@ def _initializeProto(self, proto):

@staticmethod
@timeout
def __send_ismaster(proto, _deadline=None):
def __send_ismaster(proto, **kwargs):
query = Query(collection="admin.$cmd", query={"ismaster": 1})
return proto.send_QUERY(query)

Expand All @@ -94,11 +88,7 @@ def configure(self, proto):
if not proto:
defer.returnValue(None)

try:
reply = yield self.__send_ismaster(proto, timeout=self.__watchdog_timeout)
except TimeExceeded:
proto.transport.abortConnection()
defer.returnValue(None)
reply = yield self.__send_ismaster(proto, timeout=self.initialDelay)

# Handle the reply from the "ismaster" query. The reply contains
# configuration information about the peer.
Expand Down Expand Up @@ -225,10 +215,6 @@ def setInstance(self, instance=None, reason=None):
else:
df.errback(reason)

def stopTrying(self):
ReconnectingClientFactory.stopTrying(self)
self.__conf_loop.stop()

@property
def uri(self):
return self.__uri
Expand Down Expand Up @@ -257,13 +243,13 @@ class ConnectionPool(object):

__wc_possible_options = {'w', "wtimeout", 'j', "fsync"}

__pinger_discovery_interval = 10

def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_factory=None,
watchdog_interval=30, watchdog_timeout=120, **kwargs):
ping_interval=10, ping_timeout=10, **kwargs):
assert isinstance(uri, StringType)
assert isinstance(pool_size, int)
assert pool_size >= 1
assert watchdog_interval > 0
assert watchdog_timeout > 0

if not uri.startswith("mongodb://"):
uri = "mongodb://" + uri
Expand All @@ -281,7 +267,7 @@ def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_fac
max_delay = kwargs.get('max_delay', 60.0)
self.__pool_size = pool_size
self.__pool = [
_Connection(self, self.__uri, i, retry_delay, max_delay, watchdog_interval, watchdog_timeout)
_Connection(self, self.__uri, i, retry_delay, max_delay)
for i in range(pool_size)
]

Expand All @@ -292,13 +278,24 @@ def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_fac

host, port = self.__uri['nodelist'][0]

self.ssl_context_factory = ssl_context_factory

initial_delay = kwargs.get('retry_delay', 30)
for factory in self.__pool:
if ssl_context_factory:
factory.connector = reactor.connectSSL(
host, port, factory, ssl_context_factory, initial_delay)
else:
factory.connector = reactor.connectTCP(host, port, factory, initial_delay)
factory.connector = self.__tcp_or_ssl_connect(host, port, factory,
timeout=initial_delay)

self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.__pingers = {}
self.__pinger_discovery = task.LoopingCall(self.__discovery_nodes_to_ping)
self.__pinger_discovery.start(self.__pinger_discovery_interval, now=False)

def __tcp_or_ssl_connect(self, host, port, factory, **kwargs):
if self.ssl_context_factory:
return reactor.connectSSL(host, port, factory, self.ssl_context_factory, **kwargs)
else:
return reactor.connectTCP(host, port, factory, **kwargs)

@property
def write_concern(self):
Expand Down Expand Up @@ -329,6 +326,10 @@ def get_default_database(self):
return None

def disconnect(self):
self.__pinger_discovery.stop()
for pinger in self.__pingers.values():
pinger.connector.disconnect()

for factory in self.__pool:
factory.stopTrying()
factory.stopFactory()
Expand Down Expand Up @@ -370,6 +371,119 @@ def uri(self):
return self.__uri


# Pingers are persistent connections that are established to each
# node of the replicaset to monitor their availability.
#
# Every `__pinger_discovery_interval` seconds ConnectionPool compares
# actual nodes addresses and starts/stops Pingers to ensure that
# Pinger is started for every node address.
#
# Every `ping_interval` seconds pingers send ismaster commands.
#
# All pool connections to corresponding TCP address are dropped
# if one of following happens:
# 1. Pinger is unable to receive response to ismaster within
# `ping_timeout` seconds
# 2. Pinger is unable to connect to address within `ping_timeout`
# seconds
#
# If Pinger's connection is closed by server, pool connections are not
# dropped. Next discovery procedure will recreate the Pinger.

def __discovery_nodes_to_ping(self):
existing = set(self.__pingers)
peers = {conn.instance.transport.getPeer()
for conn in self.__pool if conn.instance}

for peer in peers - existing:
pinger = _Pinger(self.ping_interval, self.ping_timeout,
self.__on_ping_lost, self.__on_ping_fail)
pinger.connector = self.__tcp_or_ssl_connect(peer.host, peer.port, pinger,
timeout=self.ping_timeout)
self.__pingers[peer] = pinger

for unused_peer in existing - peers:
self.__pingers[unused_peer].connector.disconnect()
del self.__pingers[unused_peer]

def __on_ping_lost(self, addr):
if addr in self.__pingers:
self.__pingers[addr].connector.disconnect()
del self.__pingers[addr]

def __on_ping_fail(self, addr):
# Kill all pool connections to this addr
for connection in self.__pool:
if connection.instance and connection.instance.transport.getPeer() == addr:
connection.instance.transport.abortConnection()

self.__on_ping_lost(addr)


class _PingerProtocol(MongoProtocol):

__next_call = None

def __init__(self, interval, timeout, fail_callback):
MongoProtocol.__init__(self)
self.interval = interval
self.timeout = timeout
self.fail_callback = fail_callback

def ping(self):
def on_ok(result):
if timeout_call.active():
timeout_call.cancel()
self.__next_call = reactor.callLater(self.interval, self.ping)

def on_fail(failure):
if timeout_call.active():
timeout_call.cancel()
on_timeout()

def on_timeout():
self.transport.loseConnection()
self.fail_callback(self.transport.getPeer())

timeout_call = reactor.callLater(self.timeout, on_timeout)

self.send_QUERY(Query(collection="admin.$cmd", query={"ismaster": 1}))\
.addCallbacks(on_ok, on_fail)


def connectionMade(self):
MongoProtocol.connectionMade(self)
self.ping()

def connectionLost(self, reason):
MongoProtocol.connectionLost(self, reason)
if self.__next_call and self.__next_call.active():
self.__next_call.cancel()


class _Pinger(ClientFactory):

def __init__(self, interval, timeout, lost_callback, fail_callback):
self.interval = interval
self.timeout = timeout
self.lost_callback = lost_callback
self.fail_callback = fail_callback

def buildProtocol(self, addr):
proto = _PingerProtocol(self.interval, self.timeout, self.fail_callback)
proto.factory = self
return proto

def setInstance(self, instance=None, reason=None):
pass

def clientConnectionLost(self, connector, reason):
self.lost_callback(connector.getDestination())

def clientConnectionFailed(self, connector, reason):
self.fail_callback(connector.getDestination())


###
# Begin Legacy Wrapper
###
Expand Down

0 comments on commit 1bb4d54

Please sign in to comment.