Skip to content

Commit

Permalink
Merge branch 'master' of github.com:zalando/patroni into feature/sighup
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Kukushkin committed May 19, 2016
2 parents 98c505a + 082b6f8 commit 6104d68
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .zappr.yml
Expand Up @@ -2,7 +2,7 @@ approvals:
# PR needs at least 4 approvals
minimum: 1
# approval = comment that matches this regex
pattern: "^:?\\+1:?$"
pattern: "^\\s*(:?\\+1:?|👍)\\s*$"
from:
# commenter must be either one of:
# a public zalando org member
Expand Down
17 changes: 2 additions & 15 deletions patroni/__init__.py
Expand Up @@ -6,7 +6,7 @@
import yaml

from patroni.api import RestApiServer
from patroni.exceptions import PatroniException
from patroni.dcs import get_dcs
from patroni.ha import Ha
from patroni.postgresql import Postgresql
from patroni.utils import reap_children, set_ignore_sigterm, setup_signal_handlers
Expand All @@ -25,7 +25,7 @@ def __init__(self, config_file=None, config_env=None):
self.nap_time = config['loop_wait']
self.tags = self.get_tags(config)
self.postgresql = Postgresql(config['postgresql'])
self.dcs = self.get_dcs(self.postgresql.name, config)
self.dcs = get_dcs(self.postgresql.name, config)
self.version = __version__
self.api = RestApiServer(self, config['restapi'])
self.ha = Ha(self)
Expand Down Expand Up @@ -69,19 +69,6 @@ def nofailover(self):
def replicatefrom(self):
return self.tags.get('replicatefrom')

@staticmethod
def get_dcs(name, config):
if 'etcd' in config:
from patroni.etcd import Etcd
return Etcd(name, config['etcd'])
if 'zookeeper' in config:
from patroni.zookeeper import ZooKeeper
return ZooKeeper(name, config['zookeeper'])
if 'consul' in config:
from patroni.consul import Consul
return Consul(name, config['consul'])
raise PatroniException('Can not find suitable configuration of distributed configuration store')

def schedule_next_run(self):
self.next_run += self.nap_time
current_time = time.time()
Expand Down
8 changes: 4 additions & 4 deletions patroni/ctl.py
Expand Up @@ -16,7 +16,8 @@
import yaml

from click import ClickException
from patroni import Patroni, PatroniException
from patroni.dcs import get_dcs as _get_dcs
from patroni.exceptions import PatroniException
from patroni.postgresql import parseurl
from prettytable import PrettyTable
from six.moves.urllib_parse import urlparse
Expand Down Expand Up @@ -93,10 +94,9 @@ def ctl(ctx):


def get_dcs(config, scope):
for k in set(DCS_DEFAULTS.keys()) & set(config.keys()):
config[k].setdefault('scope', scope)
config.setdefault('scope', scope)
try:
return Patroni.get_dcs(scope, config)
return _get_dcs(scope, config)
except PatroniException as e:
raise PatroniCtlException(str(e))

Expand Down
24 changes: 24 additions & 0 deletions patroni/dcs.py → patroni/dcs/__init__.py
@@ -1,9 +1,13 @@
import abc
import dateutil
import importlib
import inspect
import json
import os
import six

from collections import namedtuple
from patroni.exceptions import PatroniException
from random import randint
from six.moves.urllib_parse import urlparse, urlunparse, parse_qsl
from threading import Event, Lock
Expand All @@ -26,6 +30,26 @@ def parse_connection_string(value):
return conn_url, api_url


def get_dcs(node_name, config):
available_implementations = []
for name in os.listdir(os.path.dirname(__file__)):
if name.endswith('.py') and not name.startswith('__'): # find module
module = importlib.import_module(__package__ + '.' + name[:-3])
for name in dir(module): # iterate through module content
if not name.startswith('__'): # skip internal stuff
value = getattr(module, name)
name = name.lower()
# try to find implementation of AbstractDCS interface
if inspect.isclass(value) and issubclass(value, AbstractDCS):
available_implementations.append(name)
if name in config: # which has configuration section in the config file
# propagate some parameters
config[name].update({p: config[p] for p in ('namespace', 'scope', 'ttl') if p in config})
return value(node_name, config[name])
raise PatroniException("""Can not find suitable configuration of distributed configuration store
Available implementations: """ + ', '.join(available_implementations))


class Member(namedtuple('Member', 'index,name,session,data')):

"""Immutable object (namedtuple) which represents single member of PostgreSQL cluster.
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 1 addition & 2 deletions tests/test_consul.py
@@ -1,9 +1,8 @@
import consul
import unittest

from patroni.dcs import AbstractDCS
from mock import Mock, patch
from patroni.consul import Cluster, Consul, ConsulError, ConsulException, HTTPClient, NotFound
from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulError, ConsulException, HTTPClient, NotFound
from test_etcd import SleepException


Expand Down
166 changes: 82 additions & 84 deletions tests/test_ctl.py
Expand Up @@ -8,7 +8,6 @@
from mock import patch, Mock
from patroni.ctl import ctl, members, store_config, load_config, output_members, post_patroni, get_dcs, parse_dcs, \
wait_for_leader, get_all_members, get_any_member, get_cursor, query_member, configure, PatroniCtlException
from patroni.etcd import Etcd, Client
from psycopg2 import OperationalError
from test_etcd import etcd_read, requests_get, socket_getaddrinfo, MockResponse
from test_ha import get_cluster_initialized_without_leader, get_cluster_initialized_with_leader, \
Expand Down Expand Up @@ -50,9 +49,9 @@ class TestCtl(unittest.TestCase):
@patch('socket.getaddrinfo', socket_getaddrinfo)
def setUp(self):
self.runner = CliRunner()
with patch.object(Client, 'machines') as mock_machines:
with patch.object(etcd.Client, 'machines') as mock_machines:
mock_machines.__get__ = Mock(return_value=['http://remotehost:2379'])
self.e = Etcd('foo', {'ttl': 30, 'host': 'ok:2379', 'scope': 'test'})
self.e = get_dcs({'etcd': {'ttl': 30, 'host': 'ok:2379', 'scope': 'test'}}, 'foo')

@patch('psycopg2.connect', psycopg2_connect)
def test_get_cursor(self):
Expand Down Expand Up @@ -81,29 +80,30 @@ def test_output_members(self):
self.assertIsNone(output_members(cluster, name='abc', fmt='json'))
self.assertIsNone(output_members(cluster, name='abc', fmt='tsv'))

@patch('patroni.etcd.Etcd.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
@patch('patroni.etcd.Etcd.get_etcd_client', Mock(return_value=None))
@patch('patroni.ctl.get_dcs')
@patch('patroni.ctl.post_patroni', Mock(return_value=MockResponse()))
def test_failover(self):
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n\ny''')
def test_failover(self, mock_get_dcs):
mock_get_dcs.return_value = self.e
mock_get_dcs.return_value.get_cluster = get_cluster_initialized_with_leader
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n\ny')
assert 'leader' in result.output

result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n2100-01-01T12:23:00\ny''')
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n2100-01-01T12:23:00\ny')
assert result.exit_code == 0

result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n2030-01-01T12:23:00\ny''')
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n2030-01-01T12:23:00\ny')
assert result.exit_code == 0

# Aborting failover,as we anser NO to the confirmation
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n\nN''')
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n\nN')
assert result.exit_code == 1

# Target and source are equal
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nleader\n\ny''')
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nleader\n\ny')
assert result.exit_code == 1

# Reality is not part of this cluster
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nReality\n\ny''')
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nReality\n\ny')
assert result.exit_code == 1

result = self.runner.invoke(ctl, ['failover', 'dummy', '--force'])
Expand All @@ -124,63 +124,62 @@ def test_failover(self):
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='dummy')
assert result.exit_code == 1

with patch('patroni.etcd.Etcd.get_cluster', Mock(return_value=get_cluster_initialized_with_only_leader())):
# No members available
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n\ny''')
assert result.exit_code == 1

with patch('patroni.etcd.Etcd.get_cluster', Mock(return_value=get_cluster_initialized_without_leader())):
# No master available
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n\ny''')
assert result.exit_code == 1

with patch('patroni.ctl.post_patroni', Mock(side_effect=Exception)):
# Non-responding patroni
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n\ny''')
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n\ny')
assert 'falling back to DCS' in result.output

with patch('patroni.ctl.post_patroni') as mocked:
mocked.return_value.status_code = 500
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='''leader\nother\n\ny''')
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n\ny')
assert 'Failover failed' in result.output

# No members available
mock_get_dcs.return_value.get_cluster = get_cluster_initialized_with_only_leader
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n\ny')
assert result.exit_code == 1

# No master available
mock_get_dcs.return_value.get_cluster = get_cluster_initialized_without_leader
result = self.runner.invoke(ctl, ['failover', 'dummy'], input='leader\nother\n\ny')
assert result.exit_code == 1

def test_get_dcs(self):
self.assertRaises(PatroniCtlException, get_dcs, {'dummy': {}}, 'dummy')
with patch('patroni.Patroni.get_dcs', Mock(return_value=self.e)):
assert get_dcs({'etcd': {'host': 'none'}}, 'dummy').client_path('') == '/service/test/'

@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.ctl.query_member', Mock(return_value=([['mock column']], None)))
@patch('patroni.ctl.get_dcs')
@patch.object(etcd.Client, 'read', etcd_read)
def test_query(self):
with patch('patroni.ctl.get_dcs', Mock(return_value=self.e)):
# Mutually exclusive
result = self.runner.invoke(ctl, ['query', 'alpha', '--member', 'abc', '--role', 'master'])
assert result.exit_code == 1
def test_query(self, mock_get_dcs):
mock_get_dcs.return_value = self.e
# Mutually exclusive
result = self.runner.invoke(ctl, ['query', 'alpha', '--member', 'abc', '--role', 'master'])
assert result.exit_code == 1

with self.runner.isolated_filesystem():
with open('dummy', 'w') as dummy_file:
dummy_file.write('SELECT 1')
with self.runner.isolated_filesystem():
with open('dummy', 'w') as dummy_file:
dummy_file.write('SELECT 1')

# Mutually exclusive
result = self.runner.invoke(ctl, ['query', 'alpha', '--file', 'dummy', '--command', 'dummy'])
assert result.exit_code == 1
# Mutually exclusive
result = self.runner.invoke(ctl, ['query', 'alpha', '--file', 'dummy', '--command', 'dummy'])
assert result.exit_code == 1

result = self.runner.invoke(ctl, ['query', 'alpha', '--file', 'dummy'])
assert result.exit_code == 0
result = self.runner.invoke(ctl, ['query', 'alpha', '--file', 'dummy'])
assert result.exit_code == 0

os.remove('dummy')
os.remove('dummy')

result = self.runner.invoke(ctl, ['query', 'alpha', '--command', 'SELECT 1'])
assert 'mock column' in result.output
result = self.runner.invoke(ctl, ['query', 'alpha', '--command', 'SELECT 1'])
assert 'mock column' in result.output

# --command or --file is mandatory
result = self.runner.invoke(ctl, ['query', 'alpha'])
assert result.exit_code == 1
# --command or --file is mandatory
result = self.runner.invoke(ctl, ['query', 'alpha'])
assert result.exit_code == 1

result = self.runner.invoke(ctl, ['query', 'alpha', '--command', 'SELECT 1', '--username', 'root',
'--password', '--dbname', 'postgres'], input='ab\nab')
assert 'mock column' in result.output
result = self.runner.invoke(ctl, ['query', 'alpha', '--command', 'SELECT 1', '--username', 'root',
'--password', '--dbname', 'postgres'], input='ab\nab')
assert 'mock column' in result.output

def test_query_member(self):
with patch('patroni.ctl.get_cursor', Mock(return_value=MockConnect().cursor())):
Expand All @@ -203,24 +202,24 @@ def test_query_member(self):
with patch('patroni.ctl.get_cursor', Mock(side_effect=OperationalError('bla'))):
rows = query_member(None, None, None, 'replica', 'SELECT pg_is_in_recovery()')

@patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
def test_dsn(self):
with patch('patroni.ctl.get_dcs', Mock(return_value=self.e)):
result = self.runner.invoke(ctl, ['dsn', 'alpha'])
assert 'host=127.0.0.1 port=5435' in result.output
@patch('patroni.ctl.get_dcs')
def test_dsn(self, mock_get_dcs):
mock_get_dcs.return_value.get_cluster = get_cluster_initialized_with_leader
result = self.runner.invoke(ctl, ['dsn', 'alpha'])
assert 'host=127.0.0.1 port=5435' in result.output

# Mutually exclusive options
result = self.runner.invoke(ctl, ['dsn', 'alpha', '--role', 'master', '--member', 'dummy'])
assert result.exit_code == 1
# Mutually exclusive options
result = self.runner.invoke(ctl, ['dsn', 'alpha', '--role', 'master', '--member', 'dummy'])
assert result.exit_code == 1

# Non-existing member
result = self.runner.invoke(ctl, ['dsn', 'alpha', '--member', 'dummy'])
assert result.exit_code == 1
# Non-existing member
result = self.runner.invoke(ctl, ['dsn', 'alpha', '--member', 'dummy'])
assert result.exit_code == 1

@patch('patroni.etcd.Etcd.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
@patch('patroni.etcd.Etcd.get_etcd_client', Mock(return_value=None))
@patch('requests.post', requests_get)
def test_restart_reinit(self):
@patch('patroni.ctl.get_dcs')
def test_restart_reinit(self, mock_get_dcs):
mock_get_dcs.return_value.get_cluster = get_cluster_initialized_with_leader
result = self.runner.invoke(ctl, ['restart', 'alpha'], input='y')
assert 'restart failed for' in result.output
assert result.exit_code == 0
Expand All @@ -240,29 +239,28 @@ def test_restart_reinit(self):
result = self.runner.invoke(ctl, ['restart', 'alpha'], input='y')
assert result.exit_code == 0

@patch('patroni.etcd.Etcd.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
def test_remove(self):
with patch('patroni.ctl.get_dcs', Mock(return_value=self.e)):
result = self.runner.invoke(ctl, ['remove', 'alpha'], input='alpha\nslave')
assert 'Please confirm' in result.output
assert 'You are about to remove all' in result.output
# Not typing an exact confirmation
assert result.exit_code == 1
@patch('patroni.ctl.get_dcs')
def test_remove(self, mock_get_dcs):
mock_get_dcs.return_value.get_cluster = get_cluster_initialized_with_leader
result = self.runner.invoke(ctl, ['remove', 'alpha'], input='alpha\nslave')
assert 'Please confirm' in result.output
assert 'You are about to remove all' in result.output
# Not typing an exact confirmation
assert result.exit_code == 1

# master specified does not match master of cluster
result = self.runner.invoke(ctl, ['remove', 'alpha'], input='''alpha\nYes I am aware\nslave''')
assert result.exit_code == 1
# master specified does not match master of cluster
result = self.runner.invoke(ctl, ['remove', 'alpha'], input='alpha\nYes I am aware\nslave')
assert result.exit_code == 1

# cluster specified on cmdline does not match verification prompt
result = self.runner.invoke(ctl, ['remove', 'alpha'], input='beta\nleader')
assert result.exit_code == 1
# cluster specified on cmdline does not match verification prompt
result = self.runner.invoke(ctl, ['remove', 'alpha'], input='beta\nleader')
assert result.exit_code == 1

result = self.runner.invoke(ctl, ['remove', 'alpha'], input='''alpha\nYes I am aware\nleader''')
assert result.exit_code == 0
result = self.runner.invoke(ctl, ['remove', 'alpha'], input='alpha\nYes I am aware\nleader')
assert result.exit_code == 0

@patch('patroni.etcd.Etcd.watch', Mock(return_value=None))
@patch('patroni.etcd.Etcd.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
@patch('patroni.dcs.AbstractDCS.watch', Mock(return_value=None))
@patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
def test_wait_for_leader(self):
self.assertRaises(PatroniCtlException, wait_for_leader, self.e, 0)

Expand Down Expand Up @@ -299,9 +297,9 @@ def test_get_all_members(self):

self.assertEquals(len(list(get_all_members(get_cluster_initialized_without_leader(), role='replica'))), 2)

@patch('patroni.etcd.Etcd.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
@patch('patroni.etcd.Etcd.get_etcd_client', Mock(return_value=None))
def test_members(self):
@patch('patroni.ctl.get_dcs')
def test_members(self, mock_get_dcs):
mock_get_dcs.return_value.get_cluster = get_cluster_initialized_with_leader
result = self.runner.invoke(members, ['alpha'])
assert '127.0.0.1' in result.output
assert result.exit_code == 0
Expand Down

0 comments on commit 6104d68

Please sign in to comment.