Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Improve reconnects #8

Merged
merged 8 commits into from

2 participants

Allan Beaufour Jeff Lindsay
Allan Beaufour

This patch improves reconnect handling:

  • allow setting timeouts on connections
  • randomize connect host order
  • don't reconnect to host that just failed
  • actually make reconnect not hang :)
Jeff Lindsay progrium merged commit 5b087e0 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 8, 2012
  1. remove unused import

    Allan Beaufour authored
Commits on May 22, 2012
  1. add debugging messages and a couple of todos

    Allan Beaufour authored
  2. on connection error in .recv_loop(), don't try to kill own greenlet on

    Allan Beaufour authored
    .reconnect(). Part of making reconnect working.
  3. shuffle the list of endpoint addresses, so all clients don't connect

    Allan Beaufour authored
    to the same node all the time.
  4. make .reconnect() remember last host it tried to connect to, and try

    Allan Beaufour authored
    the next one in the list.
  5. make it possible to specify the connection timeout

    Allan Beaufour authored
  6. Use exponential backoff for the retry times.

    Allan Beaufour authored
  7. remove the unused CONNECT_TIMEOUT variable

    Allan Beaufour authored
This page is out of date. Refresh to see the latest.
Showing with 102 additions and 21 deletions.
  1. +102 −21 doozer/client.py
123 doozer/client.py
View
@@ -1,16 +1,20 @@
+import logging
import os
+import random
import struct
import gevent
import gevent.event
import gevent.socket
-import google.protobuf.message
from msg_pb2 import Response
from msg_pb2 import Request
-CONNECT_TIMEOUT = 5.0
REQUEST_TIMEOUT = 2.0
+
+DEFAULT_RETRY_WAIT = 2.0
+"""Default connection retry waiting time (seconds)"""
+
DEFAULT_URI = "doozer:?%s" % "&".join([
"ca=127.0.0.1:8046",
"ca=127.0.0.1:8041",
@@ -74,51 +78,112 @@ def parse_uri(uri):
else:
raise ValueError("invalid doozerd uri")
-def connect(uri=None):
- """Start a Doozer client connection"""
+def connect(uri=None, timeout=None):
+ """
+ Start a Doozer client connection
+
+ @param uri: str|None, Doozer URI
+ @param timeout: float|None, connection timeout in seconds (per address)
+ """
+
uri = uri or os.environ.get("DOOZER_URI", DEFAULT_URI)
addrs = parse_uri(uri)
if not addrs:
raise ValueError("there were no addrs supplied in the uri (%s)" % uri)
- return Client(addrs)
+ return Client(addrs, timeout)
class Connection(object):
- def __init__(self, addrs=None):
+ def __init__(self, addrs=None, timeout=None):
+ """
+ @param timeout: float|None, connection timeout in seconds (per address)
+ """
+ self._logger = logging.getLogger('pydoozer.Connection')
+ self._logger.debug('__init__(%s)', addrs)
+
if addrs is None:
addrs = []
self.addrs = addrs
+ self.addrs_index = 0
+ """Next address to connect to in self.addrs"""
+
self.pending = {}
self.loop = None
self.sock = None
self.address = None
+ self.timeout = timeout
self.ready = gevent.event.Event()
+
+ # Shuffle the addresses so all clients don't connect to the
+ # same node in the cluster.
+ random.shuffle(addrs)
def connect(self):
self.reconnect()
- def reconnect(self):
- self.disconnect()
+ def reconnect(self, kill_loop=True):
+ """
+ Reconnect to the cluster.
+
+ @param kill_loop: bool, kill the current receive loop
+ """
+
+ self._logger.debug('reconnect()')
+
+ self.disconnect(kill_loop)
+
+ # Default to the socket timeout
+ retry_wait = self.timeout or gevent.socket.getdefaulttimeout() or DEFAULT_RETRY_WAIT
for retry in range(5):
- addrs = list(self.addrs)
- while len(addrs):
+ addrs_left = len(self.addrs)
+ while addrs_left:
try:
- host, port = addrs.pop(0).split(':')
+ host, port = self.addrs[self.addrs_index].split(':')
+ self.addrs_index = (self.addrs_index + 1) % len(self.addrs)
self.address = "%s:%s" % (host, port)
- self.sock = gevent.socket.create_connection((host, int(port)))
+ self._logger.debug('Connecting to %s...', self.address)
+ self.sock = gevent.socket.create_connection((host, int(port)),
+ timeout=self.timeout)
+ self._logger.debug('Connection successful')
+
+ # Reset the timeout on the connection so it
+ # doesn't make .recv() and .send() timeout.
+ self.sock.settimeout(None)
self.ready.set()
self.loop = _spawner(self._recv_loop)
return
- except IOError:
+
+ except IOError, e:
+ self._logger.warning('Failed to connect to %s (%s)', self.address, e)
pass
- gevent.sleep(retry * 2)
+ addrs_left -= 1
+
+ self._logger.debug('Waiting %d seconds to reconnect', retry_wait)
+ gevent.sleep(retry_wait)
+ retry_wait *= 2
+
+ self._logger.error('Could not connect to any of the defined addresses')
raise ConnectError("Can't connect to any of the addresses: %s" % self.addrs)
- def disconnect(self):
- if self.loop:
+ def disconnect(self, kill_loop=True):
+ """
+ Disconnect current connection.
+
+ @param kill_loop: bool, Kill the current receive loop
+ """
+ self._logger.debug('disconnect()')
+
+ if kill_loop and self.loop:
+ self._logger.debug('killing loop')
self.loop.kill()
+ self.loop = None
if self.sock:
+ self._logger.debug('closing connection')
self.sock.close()
+ self.sock = None
+
+ self._logger.debug('clearing ready signal')
self.ready.clear()
+ self.address = None
def send(self, request, retry=True):
request.tag = 0
@@ -127,17 +192,22 @@ def send(self, request, retry=True):
request.tag %= 2**31
self.pending[request.tag] = gevent.event.AsyncResult()
data = request.SerializeToString()
- head = struct.pack(">I", len(data))
+ data_len = len(data)
+ head = struct.pack(">I", data_len)
packet = ''.join([head, data])
+ self._logger.debug('Sending packet, tag: %d, len: %d', request.tag, data_len)
try:
self.ready.wait(timeout=2)
self.sock.send(packet)
except IOError, e:
+ self._logger.warning('Error sending packet (%s)', e)
self.reconnect()
if retry:
+ self._logger.debug('Retrying sending packet')
self.ready.wait()
self.sock.send(packet)
else:
+ self._logger.warning('Failed retrying to send packet')
raise e
response = self.pending[request.tag].get(timeout=REQUEST_TIMEOUT)
del self.pending[request.tag]
@@ -147,6 +217,8 @@ def send(self, request, retry=True):
return response
def _recv_loop(self):
+ self._logger.debug('_recv_loop(%s)', self.address)
+
while True:
try:
head = self.sock.recv(4)
@@ -154,22 +226,31 @@ def _recv_loop(self):
data = self.sock.recv(length)
response = Response()
response.ParseFromString(data)
+ self._logger.debug('Received packet, tag: %d, len: %d', response.tag, length)
if response.tag in self.pending:
self.pending[response.tag].set(response)
except struct.error, e:
+ self._logger.warning('Got invalid packet from server (%s)', e)
# If some extra bytes are sent, just reconnect.
# This is related to this bug:
# https://github.com/ha/doozerd/issues/5
- self.reconnect()
+ break
except IOError, e:
- self.reconnect()
+ self._logger.warning('Lost connection? (%s)', e)
+ break
+
+ # Note: .reconnect() will spawn a new loop
+ self.reconnect(kill_loop=False)
class Client(object):
- def __init__(self, addrs=None):
+ def __init__(self, addrs=None, timeout=None):
+ """
+ @param timeout: float|None, connection timeout in seconds (per address)
+ """
if addrs is None:
addrs = []
- self.connection = Connection(addrs)
+ self.connection = Connection(addrs, timeout)
self.connect()
def rev(self):
Something went wrong with that request. Please try again.