Skip to content

Commit

Permalink
Adjust session ttl if supplied value is smaller than minimum possible
Browse files Browse the repository at this point in the history
It could happen that ttl provided in Patroni configuration is smaller
than minimum supported by Consul. In such case Consul agent fails to
create a new session and responds with 500 Internal Server Error and
http body contains somthing like: "Invalid Session TTL '3000000000',
must be between [10s=24h0m0s]". Without session Patroni is not able to
create member and leader keys in the Consul KV store and it means that
cluster becomes completely unhealty.

As a workaround we will handle such exception, adjust ttl to the minimum
possible and retry session creation.
  • Loading branch information
Alexander Kukushkin committed Nov 3, 2017
1 parent 82c02bf commit fb8f7d5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
46 changes: 35 additions & 11 deletions patroni/dcs/consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class ConsulInternalError(ConsulException):
"""An internal Consul server error occurred"""


class InvalidSessionTTL(ConsulInternalError):
"""Session TTL is too small or too big"""


class HTTPClient(object):

def __init__(self, host='127.0.0.1', port=8500, scheme='http', verify=True, cert=None, ca_cert=None):
Expand All @@ -49,6 +53,10 @@ def __init__(self, host='127.0.0.1', port=8500, scheme='http', verify=True, cert
def set_read_timeout(self, timeout):
self._read_timeout = timeout/3.0

@property
def ttl(self):
return self._ttl

def set_ttl(self, ttl):
ret = self._ttl != ttl
self._ttl = ttl
Expand All @@ -58,7 +66,11 @@ def set_ttl(self, ttl):
def response(response):
data = response.data.decode('utf-8')
if response.status == 500:
raise ConsulInternalError('{0} {1}'.format(response.status, data))
msg = '{0} {1}'.format(response.status, data)
if data.startswith('Invalid Session TTL'):
raise InvalidSessionTTL(msg)
else:
raise ConsulInternalError(msg)
return base.Response(response.status, response.headers, data)

def uri(self, path, params=None):
Expand Down Expand Up @@ -169,17 +181,23 @@ def create_session(self):
time.sleep(5)

def set_ttl(self, ttl):
ttl_ = ttl/2.0 # Consul multiplies the TTL by 2x
if ttl_ < 10.0: # Minimal TTL for consul = 10, else gives "Error 500"
ttl_ = 10.0
if self._client.http.set_ttl(ttl_):
if self._client.http.set_ttl(ttl/2.0): # Consul multiplies the TTL by 2x
self._session = None
self.__do_not_watch = True

def set_retry_timeout(self, retry_timeout):
self._retry.deadline = retry_timeout
self._client.http.set_read_timeout(retry_timeout)

def adjust_ttl(self):
try:
settings = self._client.agent.self()
min_ttl = (settings['Config']['SessionTTLMin'] or 10000000000)/1000000000.0
logger.warning('Changing Session TTL from %s to %s', self._client.http.ttl, min_ttl)
self._client.http.set_ttl(min_ttl)
except Exception:
logger.exception('adjust_ttl')

def _do_refresh_session(self):
""":returns: `!True` if it had to create new session"""
if self._session and self._last_session_refresh + self._loop_wait > time.time():
Expand All @@ -192,9 +210,15 @@ def _do_refresh_session(self):
self._session = None
ret = not self._session
if ret:
self._session = self._client.session.create(name=self._scope + '-' + self._name,
checks=self.__session_checks,
lock_delay=0.001, behavior='delete')
try:
self._session = self._client.session.create(name=self._scope + '-' + self._name,
checks=self.__session_checks,
lock_delay=0.001, behavior='delete')
except InvalidSessionTTL:
logger.exception('session.create')
self.adjust_ttl()
raise

self._last_session_refresh = time.time()
return ret

Expand Down Expand Up @@ -269,10 +293,10 @@ def _load_cluster(self):
logger.exception('get_cluster')
raise ConsulError('Consul is not responding properly')

def touch_member(self, data, **kwargs):
def touch_member(self, data, ttl=None, permanent=False):
cluster = self.cluster
member = cluster and cluster.get_member(self._name, fallback_to_leader=False)
create_member = self.refresh_session()
create_member = not permanent and self.refresh_session()

if member and (create_member or member.session != self._session):
try:
Expand All @@ -285,7 +309,7 @@ def touch_member(self, data, **kwargs):
return True

try:
args = {} if kwargs.get('permanent', False) else {'acquire': self._session}
args = {} if permanent else {'acquire': self._session}
self._client.kv.put(self.member_path, data, **args)
self._my_member_data = data
return True
Expand Down
10 changes: 8 additions & 2 deletions tests/test_consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from consul import ConsulException, NotFound
from mock import Mock, patch
from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulInternalError, ConsulError, HTTPClient
from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulInternalError, \
ConsulError, HTTPClient, InvalidSessionTTL
from test_etcd import SleepException


Expand Down Expand Up @@ -47,7 +48,10 @@ def test_get(self):
self.client.get(Mock(), '')
self.client.get(Mock(), '', {'wait': '1s', 'index': 1, 'token': 'foo'})
self.client.http.request.return_value.status = 500
self.client.http.request.return_value.data = 'Foo'
self.assertRaises(ConsulInternalError, self.client.get, Mock(), '')
self.client.http.request.return_value.data = "Invalid Session TTL '3000000000', must be between [10s=24h0m0s]"
self.assertRaises(InvalidSessionTTL, self.client.get, Mock(), '')

def test_unknown_method(self):
try:
Expand Down Expand Up @@ -84,7 +88,9 @@ def test_create_session(self):
self.assertRaises(SleepException, self.c.create_session)

@patch.object(consul.Consul.Session, 'renew', Mock(side_effect=NotFound))
@patch.object(consul.Consul.Session, 'create', Mock(side_effect=ConsulException))
@patch.object(consul.Consul.Session, 'create', Mock(side_effect=[InvalidSessionTTL, ConsulException]))
@patch.object(consul.Consul.Agent, 'self', Mock(return_value={'Config': {'SessionTTLMin': 0}}))
@patch.object(HTTPClient, 'set_ttl', Mock(side_effect=ValueError))
def test_referesh_session(self):
self.c._session = '1'
self.assertFalse(self.c.refresh_session())
Expand Down

0 comments on commit fb8f7d5

Please sign in to comment.