Skip to content

Commit

Permalink
Register service in consul (#802)
Browse files Browse the repository at this point in the history
Кegister service 'scope_name' with tag 'master' or 'replica'

example with scope 'pgsql-pgpi'
```[root@pgpi1 ~]# host -t SRV pgsql-pgpi.service.consul. 127.0.0.1
Using domain server:
Name: 127.0.0.1
Address: 127.0.0.1#53
Aliases:

pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi1.node.dc.consul.
pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi2.node.dc.consul.
[root@pgpi1 ~]# host -t SRV master.pgsql-pgpi.service.consul. 127.0.0.1
Using domain server:
Name: 127.0.0.1
Address: 127.0.0.1#53
Aliases:

master.pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi2.node.dc.consul.
[root@pgpi1 ~]# host -t SRV replica.pgsql-pgpi.service.consul. 127.0.0.1
Using domain server:
Name: 127.0.0.1
Address: 127.0.0.1#53
Aliases:

replica.pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi1.node.dc.consul.```

Fixes: #771
  • Loading branch information
Isonami authored and CyberDem0n committed Sep 7, 2018
1 parent dd7c3c3 commit 2e9cb41
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 5 deletions.
90 changes: 89 additions & 1 deletion patroni/dcs/consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import re
import socket
import ssl
import time
Expand All @@ -12,7 +13,7 @@
from patroni.exceptions import DCSError
from patroni.utils import deep_compare, parse_bool, Retry, RetryFailedError, split_host_port
from urllib3.exceptions import HTTPError
from six.moves.urllib.parse import urlencode, urlparse
from six.moves.urllib.parse import urlencode, urlparse, quote
from six.moves.http_client import HTTPException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -133,6 +134,31 @@ def wrapper(*args, **kwargs):
return wrapper


def force_if_last_failed(func):
def wrapper(*args, **kwargs):
if wrapper.last_result is False:
kwargs['force'] = True
wrapper.last_result = func(*args, **kwargs)
return wrapper.last_result

wrapper.last_result = None
return wrapper


def service_name_from_scope_name(scope_name):
"""Translate scope name to service name which can be used in dns.
230 = 253 - len('replica.') - len('.service.consul')
"""

def replace_char(match):
c = match.group(0)
return '-' if c in '. _' else "u{:04d}".format(ord(c))

service_name = re.sub(r'[^a-z0-9\-]', replace_char, scope_name.lower())
return service_name[0:230]


class Consul(AbstractDCS):

def __init__(self, config):
Expand Down Expand Up @@ -174,6 +200,13 @@ def __init__(self, config):
self.set_ttl(config.get('ttl') or 30)
self._last_session_refresh = 0
self.__session_checks = config.get('checks')
self._register_service = config.get('register_service', False)
if self._register_service:
self._service_name = service_name_from_scope_name(self._scope)
if self._scope != self._service_name:
logger.warning('Using %s as consul service name instead of scope name %s', self._service_name,
self._scope)
self._service_check_interval = config.get('service_check_interval', '5s')
if not self._ctl:
self.create_session()

Expand Down Expand Up @@ -323,11 +356,65 @@ def touch_member(self, data, ttl=None, permanent=False):
try:
args = {} if permanent else {'acquire': self._session}
self._client.kv.put(self.member_path, json.dumps(data, separators=(',', ':')), **args)
if self._register_service:
self.update_service(not create_member and member and member.data or {}, data)
return True
except Exception:
logger.exception('touch_member')
return False

@catch_consul_errors
def register_service(self, service_name, **kwargs):
logger.info('Register service %s, params %s', service_name, kwargs)
return self._client.agent.service.register(service_name, **kwargs)

@catch_consul_errors
def deregister_service(self, service_id):
logger.info('Deregister service %s', service_id)
# service_id can contain special characters, but is used as part of uri in deregister request
service_id = quote(service_id)
return self._client.agent.service.deregister(service_id)

def _update_service(self, data):
service_name = self._service_name
role = data['role']
state = data['state']
api_parts = urlparse(data['api_url'])
api_parts = api_parts._replace(path='/{0}'.format(role))
conn_parts = urlparse(data['conn_url'])
check = base.Check.http(api_parts.geturl(), self._service_check_interval, deregister=self._client.http.ttl * 10)
params = {
'service_id': '{0}/{1}'.format(self._scope, self._name),
'address': conn_parts.hostname,
'port': conn_parts.port,
'check': check,
'tags': [role]
}

if state == 'stopped':
return self.deregister_service(params['service_id'])

if role in ['master', 'replica']:
if state != 'running':
return
return self.register_service(service_name, **params)

logger.warning('Could not register service: unknown role type %s', role)

@force_if_last_failed
def update_service(self, old_data, new_data, force=False):
update = False

for key in ['role', 'api_url', 'conn_url', 'state']:
if key not in new_data:
logger.warning('Could not register service: not enough params in member data')
return
if old_data.get(key) != new_data[key]:
update = True

if force or update:
return self._update_service(new_data)

@catch_consul_errors
def _do_attempt_to_acquire_leader(self, kwargs):
return self.retry(self._client.kv.put, self.leader_path, self._name, **kwargs)
Expand All @@ -339,6 +426,7 @@ def attempt_to_acquire_leader(self, permanent=False):
ret = self._do_attempt_to_acquire_leader({} if permanent else {'acquire': self._session})
if not ret:
logger.info('Could not take out TTL lock')

return ret

def take_leader(self):
Expand Down
27 changes: 23 additions & 4 deletions tests/test_consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ class TestConsul(unittest.TestCase):
@patch.object(consul.Consul.KV, 'delete', Mock())
def setUp(self):
Consul({'ttl': 30, 'scope': 't', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10,
'verify': 'on', 'key': 'foo', 'cert': 'bar', 'cacert': 'buz', 'token': 'asd', 'dc': 'dc1'})
Consul({'ttl': 30, 'scope': 't', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10,
'verify': 'on', 'cert': 'bar', 'cacert': 'buz'})
self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10})
'verify': 'on', 'key': 'foo', 'cert': 'bar', 'cacert': 'buz', 'token': 'asd', 'dc': 'dc1',
'register_service': True})
Consul({'ttl': 30, 'scope': 't_', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10,
'verify': 'on', 'cert': 'bar', 'cacert': 'buz', 'register_service': True})
self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10,
'register_service': True})
self.c._base_path = '/service/good'
self.c._load_cluster()

Expand Down Expand Up @@ -111,6 +113,7 @@ def test_get_cluster(self):
@patch.object(consul.Consul.KV, 'delete', Mock(side_effect=[ConsulException, True, True]))
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=[True, ConsulException]))
def test_touch_member(self):
self.c._register_service = True
self.c.refresh_session = Mock(return_value=True)
self.c.touch_member({'balbla': 'blabla'})
self.c.touch_member({'balbla': 'blabla'})
Expand Down Expand Up @@ -177,3 +180,19 @@ def test_sync_state(self):
@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
def test_set_history_value(self):
self.assertTrue(self.c.set_history_value('{}'))

@patch.object(consul.Consul.Agent.Service, 'register', Mock(side_effect=(False, True)))
@patch.object(consul.Consul.Agent.Service, 'deregister', Mock(return_value=True))
def test_update_service(self):
d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'}
self.assertIsNone(self.c.update_service({}, {}))
self.assertFalse(self.c.update_service({}, d))
self.assertTrue(self.c.update_service(d, d))
self.assertIsNone(self.c.update_service(d, d))
d['state'] = 'stopped'
self.assertTrue(self.c.update_service(d, d, force=True))
d['state'] = 'unknown'
self.assertIsNone(self.c.update_service({}, d))
d['state'] = 'running'
d['role'] = 'bla'
self.assertIsNone(self.c.update_service({}, d))

0 comments on commit 2e9cb41

Please sign in to comment.