Skip to content

Commit

Permalink
Merge 8417b98 into 6225f42
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaSkriblovsky committed Oct 5, 2016
2 parents 6225f42 + 8417b98 commit 29d7233
Showing 1 changed file with 140 additions and 7 deletions.
147 changes: 140 additions & 7 deletions txmongo/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from twisted.internet import defer, reactor, task
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.protocol import ReconnectingClientFactory, ClientFactory
from twisted.python import log
from twisted.python.compat import StringType
from txmongo.database import Database
Expand Down Expand Up @@ -254,8 +254,11 @@ class ConnectionPool(object):

__wc_possible_options = {'w', "wtimeout", 'j', "fsync"}

__pinger_discovery_interval = 10

def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_factory=None,
watchdog_interval=30, watchdog_timeout=120, **kwargs):
watchdog_interval=30, watchdog_timeout=120,
ping_interval=10, ping_timeout=10, **kwargs):
assert isinstance(uri, StringType)
assert isinstance(pool_size, int)
assert pool_size >= 1
Expand Down Expand Up @@ -289,13 +292,24 @@ def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_fac

host, port = self.__uri['nodelist'][0]

self.ssl_context_factory = ssl_context_factory

initial_delay = kwargs.get('retry_delay', 30)
for factory in self.__pool:
if ssl_context_factory:
factory.connector = reactor.connectSSL(
host, port, factory, ssl_context_factory, initial_delay)
else:
factory.connector = reactor.connectTCP(host, port, factory, initial_delay)
factory.connector = self.__tcp_or_ssl_connect(host, port, factory,
timeout=initial_delay)

self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.__pingers = {}
self.__pinger_discovery = task.LoopingCall(self.__discovery_nodes_to_ping)
self.__pinger_discovery.start(self.__pinger_discovery_interval, now=False)

def __tcp_or_ssl_connect(self, host, port, factory, **kwargs):
if self.ssl_context_factory:
return reactor.connectSSL(host, port, factory, self.ssl_context_factory, **kwargs)
else:
return reactor.connectTCP(host, port, factory, **kwargs)

@property
def write_concern(self):
Expand Down Expand Up @@ -326,6 +340,10 @@ def get_default_database(self):
return None

def disconnect(self):
self.__pinger_discovery.stop()
for pinger in self.__pingers.values():
pinger.connector.disconnect()

for factory in self.__pool:
factory.stopTrying()
factory.stopFactory()
Expand Down Expand Up @@ -370,6 +388,121 @@ def uri(self):
return self.__uri


# Pingers are persistent connections that are established to each
# node of the replicaset to monitor their availability.
#
# Every `__pinger_discovery_interval` seconds ConnectionPool compares
# actual nodes addresses and starts/stops Pingers to ensure that
# Pinger is started for every node address.
#
# Every `ping_interval` seconds pingers send ismaster commands.
#
# All pool connections to corresponding TCP address are dropped
# if one of following happens:
# 1. Pinger is unable to receive response to ismaster within
# `ping_timeout` seconds
# 2. Pinger is unable to connect to address within `ping_timeout`
# seconds
#
# If Pinger's connection is closed by server, pool connections are not
# dropped. Next discovery procedure will recreate the Pinger.

def __discovery_nodes_to_ping(self):
existing = set(self.__pingers)
peers = {conn.instance.transport.getPeer()
for conn in self.__pool if conn.instance}

for peer in peers - existing:
pinger = _Pinger(self.ping_interval, self.ping_timeout,
self.__on_ping_lost, self.__on_ping_fail)
pinger.connector = self.__tcp_or_ssl_connect(peer.host, peer.port, pinger,
timeout=self.ping_timeout)
self.__pingers[peer] = pinger

for unused_peer in existing - peers:
self.__pingers[unused_peer].connector.disconnect()
del self.__pingers[unused_peer]

def __on_ping_lost(self, addr):
if addr in self.__pingers:
self.__pingers[addr].connector.disconnect()
del self.__pingers[addr]

def __on_ping_fail(self, addr):
# Kill all pool connections to this addr
for connection in self.__pool:
if connection.instance and connection.instance.transport.getPeer() == addr:
connection.instance.transport.abortConnection()

self.__on_ping_lost(addr)


class _PingerProtocol(MongoProtocol):

disconnected = False

def __init__(self, interval, timeout, fail_callback):
MongoProtocol.__init__(self)
self.interval = interval
self.timeout = timeout
self.fail_callback = fail_callback

def ping(self):
if self.disconnected:
return

def on_ok(result):
if timeout_call.active():
timeout_call.cancel()
reactor.callLater(self.interval, self.ping)

def on_fail(failure):
if timeout_call.active():
timeout_call.cancel()
on_timeout()

def on_timeout():
self.transport.loseConnection()
self.fail_callback(self.transport.getPeer())

timeout_call = reactor.callLater(self.timeout, on_timeout)

self.send_QUERY(Query(collection="admin.$cmd", query={"ismaster": 1}))\
.addCallbacks(on_ok, on_fail)


def connectionMade(self):
MongoProtocol.connectionMade(self)
self.ping()

def connectionLost(self, reason):
MongoProtocol.connectionLost(self, reason)
self.disconnected = True


class _Pinger(ClientFactory):

def __init__(self, interval, timeout, lost_callback, fail_callback):
self.interval = interval
self.timeout = timeout
self.lost_callback = lost_callback
self.fail_callback = fail_callback

def buildProtocol(self, addr):
proto = _PingerProtocol(self.interval, self.timeout, self.fail_callback)
proto.factory = self
return proto

def setInstance(self, instance=None, reason=None):
pass

def clientConnectionLost(self, connector, reason):
self.lost_callback(connector.getDestination())

def clientConnectionFailed(self, connector, reason):
self.fail_callback(connector.getDestination())


###
# Begin Legacy Wrapper
###
Expand Down

0 comments on commit 29d7233

Please sign in to comment.