Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Murat Kabilov committed Aug 9, 2016
1 parent 45d9932 commit a32b8bf
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 94 deletions.
2 changes: 1 addition & 1 deletion features/patroni_api.feature
Expand Up @@ -57,7 +57,7 @@ Scenario: check API requests for the primary-replica pair
And I receive a response output "Success: reinitialize for member postgres1"
When I run patronictl.py restart batman postgres0 --force
Then I receive a response returncode 0
And I receive a response output "Successful restart on member postgres0"
And I receive a response output "Success: restart on member postgres0"
And postgres0 role is the primary after 5 seconds
When I sleep for 10 seconds
Then postgres1 role is the secondary after 15 seconds
Expand Down
5 changes: 2 additions & 3 deletions patroni/api.py
Expand Up @@ -7,10 +7,9 @@
import dateutil.parser
import datetime
import pytz
import re

from patroni.exceptions import PostgresConnectionException
from patroni.utils import deep_compare, patch_config, Retry, RetryFailedError
from patroni.utils import deep_compare, patch_config, Retry, RetryFailedError, is_valid_pg_version
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from six.moves.socketserver import ThreadingMixIn
from threading import Thread
Expand Down Expand Up @@ -212,7 +211,7 @@ def do_POST_restart(self):
data = "PostgreSQL role should be either master or replica"
break
elif k == 'postgres_version':
if not re.match(r'[1-9][0-9]?(\.(0|([1-9][0-9]?))){2}$', request[k]):
if not is_valid_pg_version(request[k]):
status_code = 400
data = "PostgreSQL version should be in the first.major.minor format"
break
Expand Down
115 changes: 46 additions & 69 deletions patroni/ctl.py
Expand Up @@ -16,13 +16,13 @@
import time
import tzlocal
import yaml
import re

from click import ClickException
from patroni.config import Config
from patroni.dcs import get_dcs as _get_dcs
from patroni.exceptions import PatroniException
from patroni.postgresql import get_conn_kwargs
from patroni.utils import is_valid_pg_version
from prettytable import PrettyTable
from six.moves.urllib_parse import urlparse

Expand Down Expand Up @@ -120,7 +120,7 @@ def auth_header(config):
return {'Authorization': 'Basic ' + base64.b64encode(config['restapi']['auth'].encode('utf-8')).decode('utf-8')}


def request_patroni(request_type, member, endpoint, content=None, headers=None):
def request_patroni(member, request_type, endpoint, content=None, headers=None):
headers = headers or {}
url_parts = urlparse(member.api_url)
logging.debug(url_parts)
Expand All @@ -129,12 +129,8 @@ def request_patroni(request_type, member, endpoint, content=None, headers=None):

url = '{0}://{1}/{2}'.format(url_parts.scheme, url_parts.netloc, endpoint)

if request_type == 'get':
return requests.get(url, headers=headers, timeout=60)
elif request_type == 'post':
return requests.post(url, headers=headers, data=json.dumps(content) if content else None, timeout=60)
elif request_type == 'delete':
return requests.delete(url, headers=headers, timeout=60)
return getattr(requests, request_type)(url, headers=headers,
data=json.dumps(content) if content else None, timeout=60)


def print_output(columns, rows=None, alignment=None, fmt='pretty', header=True, delimiter='\t'):
Expand Down Expand Up @@ -270,11 +266,7 @@ def get_members(cluster, cluster_name, member_names, role, force, action):
if not confirm:
raise PatroniCtlException('Aborted {0}'.format(action))

result = {}
for mn in member_names:
result[mn] = candidates[mn]

return result
return [candidates[n] for n in member_names]


@ctl.command('dsn', help='Generate a dsn for the provided member, defaults to a dsn of the master')
Expand Down Expand Up @@ -405,7 +397,7 @@ def query_member(cluster, cursor, member, role, command, connect_parameters):
def remove(config_file, cluster_name, fmt, dcs):
_, dcs, cluster = ctl_load_config(cluster_name, config_file, dcs)

output_members(cluster, cluster_name, None, fmt)
output_members(cluster, cluster_name, fmt=fmt)

confirm = click.prompt('Please confirm the cluster name to remove', type=str)
if confirm != cluster_name:
Expand Down Expand Up @@ -448,13 +440,6 @@ def ctl_load_config(cluster_name, config_file, dcs):
return config, dcs, cluster


def patroni_status(member, headers):
r = request_patroni('get', member, 'patroni', None, headers)
check_response(r, member.name, 'get status', True)

return json.loads(r.text)


def check_response(response, member_name, action_name, silent_success=False):
if response.status_code >= 400:
click.echo('Failed: {0} for member {1}, status code={2}, ({3})'.format(
Expand Down Expand Up @@ -496,10 +481,9 @@ def restart(cluster_name, member_names, config_file, dcs, force, role, p_any, sc
config, dcs, cluster = ctl_load_config(cluster_name, config_file, dcs)

members = get_members(cluster, cluster_name, member_names, role, force, 'restart')

if p_any:
k = random.choice(list(members))
members = {k: members[k]}
random.shuffle(members)
members = members[:1]

if version is None and not force:
version = click.prompt('Restart if the PostgreSQL version is less than provided (e.g. 9.5.2) ',
Expand All @@ -510,7 +494,7 @@ def restart(cluster_name, member_names, config_file, dcs, force, role, p_any, sc
content['restart_pending'] = True

if version:
if not re.match(r'[1-9][0-9]?(\.(0|([1-9][0-9]?))){2}$', version):
if not is_valid_pg_version(version):
message = 'PostgreSQL version should be in the first.major.minor format'
raise PatroniCtlException(message)
else:
Expand All @@ -523,19 +507,23 @@ def restart(cluster_name, member_names, config_file, dcs, force, role, p_any, sc
if scheduled_at:
content['schedule'] = scheduled_at.isoformat()

for mn, member in members.items():
for member in members:
if 'schedule' in content:
if force and 'scheduled_restart' in patroni_status(member, auth_header(config)):
r = request_patroni('delete', member, 'restart', None, auth_header(config))
check_response(r, mn, 'flush scheduled restart')
if force and member.data.get('scheduled_restart'):
r = request_patroni(member, 'delete', 'restart', headers=auth_header(config))
check_response(r, member.name, 'flush scheduled restart', True)

r = request_patroni('post', member, 'restart', content, auth_header(config))
r = request_patroni(member, 'post', 'restart', content, auth_header(config))
if r.status_code == 200:
click.echo('Successful restart on member {0}'.format(mn))
elif 200 < r.status_code < 300:
click.echo('{0} on member {1}'.format(r.text, mn))
click.echo('Success: restart on member {0}'.format(member.name))
elif r.status_code == 202:
click.echo('Success: restart scheduled on member {0}'.format(member.name))
elif r.status_code == 409:
click.echo('Failed: another restart is already scheduled on member {0}'.format(member.name))
else:
click.echo('Restart failed for member {0}, status code={1}, ({2})'.format(mn, r.status_code, r.text))
click.echo('Failed: restart for member {0}, status code={1}, ({2})'.format(
member.name, r.status_code, r.text)
)


@ctl.command('reinit', help='Reinitialize cluster member')
Expand All @@ -548,9 +536,9 @@ def reinit(cluster_name, member_names, config_file, dcs, force):
config, dcs, cluster = ctl_load_config(cluster_name, config_file, dcs)
members = get_members(cluster, cluster_name, member_names, None, force, 'reinitialize')

for mn, member in members.items():
r = request_patroni('post', member, 'reinitialize', None, auth_header(config))
check_response(r, mn, 'reinitialize')
for member in members:
r = request_patroni(member, 'post', 'reinitialize', headers=auth_header(config))
check_response(r, member.name, 'reinitialize')


@ctl.command('failover', help='Failover to a replica')
Expand Down Expand Up @@ -625,7 +613,7 @@ def failover(config_file, cluster_name, master, candidate, force, dcs, scheduled

r = None
try:
r = request_patroni('post', cluster.leader.member, 'failover', failover_value, auth_header(config))
r = request_patroni(cluster.leader.member, 'post', 'failover', failover_value, auth_header(config))
if r.status_code in (200, 202):
logging.debug(r)
cluster = dcs.get_cluster()
Expand All @@ -644,7 +632,7 @@ def failover(config_file, cluster_name, master, candidate, force, dcs, scheduled
output_members(cluster, cluster_name)


def output_members(cluster, name, extra=None, fmt='pretty'):
def output_members(cluster, name, extended=False, fmt='pretty'):
rows = []
logging.debug(cluster)
leader_name = None
Expand All @@ -653,8 +641,6 @@ def output_members(cluster, name, extra=None, fmt='pretty'):

xlog_location_cluster = cluster.last_leader_operation or 0

extra_columns = extra.keys() if extra else []

# Mainly for consistent pretty printing and watching we sort the output
cluster.members.sort(key=lambda x: x.name)
for m in cluster.members:
Expand All @@ -679,9 +665,15 @@ def output_members(cluster, name, extra=None, fmt='pretty'):
m.data.get('state', ''),
lag,
]
if extra:
for extra_col in extra_columns:
row.append(extra[extra_col][m.name])
if extended:
value = ''
scheduled_restart = m.data.get('scheduled_restart')
if scheduled_restart:
value = scheduled_restart['schedule']
if 'postgres_version' in scheduled_restart:
value += ' if version < {0}'.format(scheduled_restart['postgres_version'])

row.append(value)

rows.append(row)

Expand All @@ -693,10 +685,11 @@ def output_members(cluster, name, extra=None, fmt='pretty'):
'State',
'Lag in MB',
]
columns += extra_columns

alignment = {'Cluster': 'l', 'Member': 'l', 'Host': 'l', 'Lag in MB': 'r'}
alignment.update(dict([(col, 'l') for col in extra_columns]))

if extended:
columns.append('Scheduled restart')
alignment['Scheduled restart'] = 'l'

print_output(columns, rows, alignment, fmt)

Expand All @@ -719,23 +712,8 @@ def members(config_file, cluster_names, fmt, watch, w, dcs, extended):
dcs = get_dcs(config, cluster_name)

for _ in watching(w, watch):
extra = dict()
cluster = dcs.get_cluster()
if extended:
extra = {
'Scheduled restart': dict()
}
for member in cluster.members:
value = ''
status = patroni_status(member, auth_header(config))
if 'scheduled_restart' in status:
value = status['scheduled_restart']['schedule']
if 'postgres_version' in status['scheduled_restart']:
value += ' if version < {0}'.format(status['scheduled_restart']['postgres_version'])

extra['Scheduled restart'][member.name] = value

output_members(cluster, cluster_name, extra, fmt)
output_members(cluster, cluster_name, extended, fmt)


def timestamp(precision=6):
Expand Down Expand Up @@ -766,11 +744,10 @@ def flush(cluster_name, member_names, config_file, dcs, force, role, target):
config, dcs, cluster = ctl_load_config(cluster_name, config_file, dcs)

members = get_members(cluster, cluster_name, member_names, role, force, 'flush')
for mn, member in members.items():
status = patroni_status(member, auth_header(config))
for member in members:
if target == 'restart':
if 'scheduled_restart' in status:
r = request_patroni('delete', member, 'restart', None, auth_header(config))
check_response(r, mn, 'flush scheduled restart')
if member.data.get('scheduled_restart'):
r = request_patroni(member, 'delete', 'restart', None, auth_header(config))
check_response(r, member.name, 'flush scheduled restart')
else:
click.echo('No scheduled restart for member {0}'.format(mn))
click.echo('No scheduled restart for member {0}'.format(member.name))
2 changes: 2 additions & 0 deletions patroni/ha.py
Expand Up @@ -431,6 +431,7 @@ def schedule_future_restart(self, restart_data):
with self._async_executor:
if not self.patroni.scheduled_restart:
self.patroni.scheduled_restart = restart_data
self.touch_member()
return True
return False

Expand All @@ -439,6 +440,7 @@ def delete_future_restart(self):
with self._async_executor:
if self.patroni.scheduled_restart:
self.patroni.scheduled_restart = {}
self.touch_member()
ret = True
return ret

Expand Down
5 changes: 5 additions & 0 deletions patroni/utils.py
Expand Up @@ -2,6 +2,7 @@
import random
import sys
import time
import re

from patroni.exceptions import PatroniException

Expand Down Expand Up @@ -226,6 +227,10 @@ def reap_children():
__reap_children = False


def is_valid_pg_version(version):
return re.match(r'[1-9][0-9]?(\.(0|([1-9][0-9]?))){2}$', version)


class RetryFailedError(PatroniException):

"""Raised when retrying an operation ultimately failed, after retrying the maximum number of attempts."""
Expand Down

0 comments on commit a32b8bf

Please sign in to comment.