From 0096b6b06fdb76d9b616fe08dcfa7e00f79b2c57 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Wed, 21 Oct 2015 10:56:43 +0200 Subject: [PATCH 1/3] Schedule update of machines cache when api_execute call has failed Such situation could happen if we replaced all etcd nodes except one which was used by patroni. After replacing the last node patroni will try to execute request on all other nodes from machines_cache but non of them are available. Michines cache would became empty and patroni will stick to the latest node which was available in the machines_cache and will never try to refresh machines_cache from dns for example. Currently machines cache is refreshed only when one request to the etcd cluster has failed, but probably it should be done periodically, for example every minute... --- patroni/etcd.py | 6 +++++- tests/test_etcd.py | 14 ++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/patroni/etcd.py b/patroni/etcd.py index 79379700b..622eb7475 100644 --- a/patroni/etcd.py +++ b/patroni/etcd.py @@ -52,7 +52,11 @@ def machines(self): def api_execute(self, path, method, **kwargs): # Update machines_cache if previous attempt of update has failed self._update_machines_cache and self._load_machines_cache() - return super(Client, self).api_execute(path, method, **kwargs) + try: + return super(Client, self).api_execute(path, method, **kwargs) + except etcd.EtcdConnectionFailed: + self._update_machines_cache = True + raise @staticmethod def get_srv_record(host): diff --git a/tests/test_etcd.py b/tests/test_etcd.py index 53c054e56..6cd5441e2 100644 --- a/tests/test_etcd.py +++ b/tests/test_etcd.py @@ -106,13 +106,13 @@ def etcd_read(key, **kwargs): "modifiedIndex": 20437, "createdIndex": 20437}, {"key": "/service/batman5/members", "dir": True, "nodes": [ {"key": "/service/batman5/members/postgresql1", - "value": "postgres://replicator:rep-pass@127.0.0.1:5434/postgres" - + "?application_name=http://127.0.0.1:8009/patroni", + "value": "postgres://replicator:rep-pass@127.0.0.1:5434/postgres" + + "?application_name=http://127.0.0.1:8009/patroni", "expiration": "2015-05-15T09:10:59.949384522Z", "ttl": 21, "modifiedIndex": 20727, "createdIndex": 20727}, {"key": "/service/batman5/members/postgresql0", - "value": "postgres://replicator:rep-pass@127.0.0.1:5433/postgres" - + "?application_name=http://127.0.0.1:8008/patroni", + "value": "postgres://replicator:rep-pass@127.0.0.1:5433/postgres" + + "?application_name=http://127.0.0.1:8008/patroni", "expiration": "2015-05-15T09:11:09.611860899Z", "ttl": 30, "modifiedIndex": 20730, "createdIndex": 20730}], "modifiedIndex": 1581, "createdIndex": 1581}], "modifiedIndex": 1581, "createdIndex": 1581}} @@ -143,6 +143,7 @@ def socket_getaddrinfo(*args): def http_request(method, url, **kwargs): + print('http_request', method, url, kwargs) if url == 'http://localhost:2379/': return MockResponse() raise socket.error @@ -165,6 +166,11 @@ def test_api_execute(self): self.client._base_uri = 'http://localhost:4001' self.client._machines_cache = ['http://localhost:2379'] self.client.api_execute('/', 'GET') + self.client._update_machines_cache = False + self.client._base_uri = 'http://localhost:4001' + self.client._machines_cache = [] + self.assertRaises(etcd.EtcdConnectionFailed, self.client.api_execute, '/', 'GET') + self.assertTrue(self.client._update_machines_cache) def test_get_srv_record(self): self.assertEquals(self.client.get_srv_record('blabla'), []) From 8bd28507a93310cb8b37d830822900ce1d1417f2 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Wed, 21 Oct 2015 11:08:06 +0200 Subject: [PATCH 2/3] format tests according to the latest pep8 standards --- tests/test_postgresql.py | 15 +++++---------- tests/test_utils.py | 4 +++- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index 9a02ece69..cc781fc85 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -4,12 +4,7 @@ import shutil import unittest -from sys import version_info -if version_info.major == 2: - import __builtin__ as builtins -else: - import builtins - +from six.moves import builtins from mock import Mock, MagicMock, PropertyMock, patch, mock_open from patroni.dcs import Cluster, Leader, Member from patroni.exceptions import PostgresException, PostgresConnectionException @@ -141,10 +136,10 @@ def pg_controldata_string(*args, **kwargs): def postmaster_opts_string(*args, **kwargs): - return '/usr/local/pgsql/bin/postgres "-D" "data/postgresql0" "--listen_addresses=127.0.0.1" "--port=5432"'\ - ' "--hot_standby=on" "--wal_keep_segments=8" "--wal_level=hot_standby" "--archive_command=mkdir -p ../wal_archive \n'\ - '&& cp %p ../wal_archive/%f" "--wal_log_hints=on" "--max_wal_senders=5" "--archive_timeout=1800s" "--archive_mode=on"'\ - ' "--max_replication_slots=5"\n' + return '/usr/local/pgsql/bin/postgres "-D" "data/postgresql0" "--listen_addresses=127.0.0.1" \ +"--port=5432" "--hot_standby=on" "--wal_keep_segments=8" "--wal_level=hot_standby" \ +"--archive_command=mkdir -p ../wal_archive && cp %p ../wal_archive/%f" "--wal_log_hints=on" \ +"--max_wal_senders=5" "--archive_timeout=1800s" "--archive_mode=on" "--max_replication_slots=5"\n' def psycopg2_connect(*args, **kwargs): diff --git a/tests/test_utils.py b/tests/test_utils.py index 45b194dcd..265f98ab0 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -67,7 +67,9 @@ def test_deadline(self): self.assertRaises(RetryFailedError, retry, self._fail(times=100)) def test_copy(self): - _sleep = lambda t: None + def _sleep(t): + None + retry = self._makeOne(sleep_func=_sleep) rcopy = retry.copy() self.assertTrue(rcopy.sleep_func is _sleep) From c4a6dd48d34b490971edc77e87bd466ad95d8988 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Wed, 21 Oct 2015 11:09:37 +0200 Subject: [PATCH 3/3] remove debug print statement --- tests/test_etcd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_etcd.py b/tests/test_etcd.py index 6cd5441e2..d0d01d71f 100644 --- a/tests/test_etcd.py +++ b/tests/test_etcd.py @@ -143,7 +143,6 @@ def socket_getaddrinfo(*args): def http_request(method, url, **kwargs): - print('http_request', method, url, kwargs) if url == 'http://localhost:2379/': return MockResponse() raise socket.error