Skip to content

Commit

Permalink
Merge c8d1ed6 into 5d8c2fb
Browse files Browse the repository at this point in the history
  • Loading branch information
hughcapet committed Jan 9, 2024
2 parents 5d8c2fb + c8d1ed6 commit 17fb58c
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 84 deletions.
7 changes: 5 additions & 2 deletions patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def _write_status_response(self, status_code: int, response: Dict[str, Any]) ->
* ``tags``: tags that were set through Patroni configuration merged with dynamically applied tags;
* ``database_system_identifier``: ``Database system identifier`` from ``pg_controldata`` output;
* ``pending_restart``: ``True`` if PostgreSQL is pending to be restarted;
* ``pending_restart_reason``: dictionary where each key is the parameter that caused "pending restart" flag
to be set and the value is a dictionary with the old and the new value.
* ``scheduled_restart``: a dictionary with a single key ``schedule``, which is the timestamp for the
scheduled restart;
* ``watchdog_failed``: ``True`` if watchdog device is unhealthy;
Expand All @@ -196,8 +198,9 @@ def _write_status_response(self, status_code: int, response: Dict[str, Any]) ->
response['tags'] = tags
if patroni.postgresql.sysid:
response['database_system_identifier'] = patroni.postgresql.sysid
if patroni.postgresql.pending_restart:
if patroni.postgresql.pending_restart_reason:
response['pending_restart'] = True
response['pending_restart_reason'] = dict(patroni.postgresql.pending_restart_reason)
response['patroni'] = {
'version': patroni.version,
'scope': patroni.postgresql.scope,
Expand Down Expand Up @@ -634,7 +637,7 @@ def do_GET_metrics(self) -> None:
metrics.append("# HELP patroni_pending_restart Value is 1 if the node needs a restart, 0 otherwise.")
metrics.append("# TYPE patroni_pending_restart gauge")
metrics.append("patroni_pending_restart{0} {1}"
.format(labels, int(patroni.postgresql.pending_restart)))
.format(labels, int(bool(patroni.postgresql.pending_restart_reason))))

metrics.append("# HELP patroni_is_paused Value is 1 if auto failover is disabled, 0 otherwise.")
metrics.append("# TYPE patroni_is_paused gauge")
Expand Down
12 changes: 10 additions & 2 deletions patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ def output_members(cluster: Cluster, name: str, extended: bool = False,

all_members = [m for c in clusters.values() for m in c['members'] if 'host' in m]

for c in ('Pending restart', 'Scheduled restart', 'Tags'):
for c in ('Pending restart', 'Pending restart reason', 'Scheduled restart', 'Tags'):
if extended or any(m.get(c.lower().replace(' ', '_')) for m in all_members):
columns.append(c)

Expand All @@ -1572,11 +1572,19 @@ def output_members(cluster: Cluster, name: str, extended: bool = False,
logging.debug(member)

lag = member.get('lag', '')

def format_diff(param: str, values: Dict[str, str], hide_long: bool):
full_diff = param + ': ' + values['old_value'] + '->' + values['new_value']
return full_diff if not hide_long or len(full_diff) <= 50 else param + ': [hidden - too long]'
restart_reason = '\n'.join(
[format_diff(k, v, fmt == 'pretty') for k, v in member.get('pending_restart_reason', {}).items()]) or ''

member.update(cluster=name, member=member['name'], group=g,
host=member.get('host', ''), tl=member.get('timeline', ''),
role=member['role'].replace('_', ' ').title(),
lag_in_mb=round(lag / 1024 / 1024) if isinstance(lag, int) else lag,
pending_restart='*' if member.get('pending_restart') else '')
pending_restart='*' if member.get('pending_restart') else '',
pending_restart_reason=restart_reason)

if append_port and member['host'] and member.get('port'):
member['host'] = ':'.join([member['host'], str(member['port'])])
Expand Down
5 changes: 3 additions & 2 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ def touch_member(self) -> bool:
tags = self.get_effective_tags()
if tags:
data['tags'] = tags
if self.state_handler.pending_restart:
if self.state_handler.pending_restart_reason:
data['pending_restart'] = True
data['pending_restart_reason'] = dict(self.state_handler.pending_restart_reason)
if self._async_executor.scheduled_action in (None, 'promote') \
and data['state'] in ['running', 'restarting', 'starting']:
try:
Expand Down Expand Up @@ -1485,7 +1486,7 @@ def restart_matches(self, role: Optional[str], postgres_version: Optional[str],
if postgres_version and postgres_version_to_int(postgres_version) <= int(self.state_handler.server_version):
reason_to_cancel = "postgres version mismatch"

if pending_restart and not self.state_handler.pending_restart:
if pending_restart and not self.state_handler.pending_restart_reason:
reason_to_cancel = "pending restart flag is not set"

if not reason_to_cancel:
Expand Down
28 changes: 21 additions & 7 deletions patroni/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .sync import SyncHandler
from .. import global_config, psycopg
from ..async_executor import CriticalTask
from ..collections import CaseInsensitiveSet
from ..collections import CaseInsensitiveSet, CaseInsensitiveDict
from ..dcs import Cluster, Leader, Member, SLOT_ADVANCE_AVAILABLE_VERSION
from ..exceptions import PostgresConnectionException
from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int
Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
self._state_lock = Lock()
self.set_state('stopped')

self._pending_restart = False
self._pending_restart_reason = CaseInsensitiveDict()
self.connection_pool = ConnectionPool()
self._connection = self.connection_pool.get('heartbeat')
self.citus_handler = mpp.get_handler_impl(self)
Expand Down Expand Up @@ -321,11 +321,25 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
self._is_leader_retry.deadline = self.retry.deadline = config['retry_timeout'] / 2.0

@property
def pending_restart(self) -> bool:
return self._pending_restart
def pending_restart_reason(self) -> CaseInsensitiveDict:
"""Get :attr:`_pending_restart_reason` value.
def set_pending_restart(self, value: bool) -> None:
self._pending_restart = value
:attr:`_pending_restart_reason` is a :class:`CaseInsensitiveDict` object of the PG parameters that are
causing pending restart state. Every key is a parameter name, value - :class:`ParamDiff` object.
"""
return self._pending_restart_reason

def set_pending_restart_reason(self, diff_dict: CaseInsensitiveDict, update: bool = False) -> None:
"""Set new or update current :attr:`_pending_restart_reason`.
:param diff_dict: :class:``CaseInsensitiveDict`` object with the parameters that are causing pending restart
state with the diff of their values. Used to reset/update the :attr:`_pending_restart_reason`.
:param update: bool, indicates if :attr:`_pending_restart_reason` should be updated or reset with *diff_dict*.
"""
if update:
self._pending_restart_reason.update(diff_dict)
else:
self._pending_restart_reason = diff_dict

@property
def sysid(self) -> str:
Expand Down Expand Up @@ -727,7 +741,7 @@ def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] =
self.set_role(role or self.get_postgres_role_from_data_directory())

self.set_state('starting')
self._pending_restart = False
self.set_pending_restart_reason(CaseInsensitiveDict())

try:
if not self.ensure_major_version_is_known():
Expand Down
2 changes: 1 addition & 1 deletion patroni/postgresql/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def post_bootstrap(self, config: Dict[str, Any], task: CriticalTask) -> Optional
postgresql.restart()
else:
postgresql.config.replace_pg_hba()
if postgresql.pending_restart:
if postgresql.pending_restart_reason:
postgresql.restart()
else:
postgresql.reload()
Expand Down
74 changes: 57 additions & 17 deletions patroni/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
from contextlib import contextmanager
from urllib.parse import urlparse, parse_qsl, unquote
from types import TracebackType
from typing import Any, Collection, Dict, Iterator, List, Optional, Union, Tuple, Type, TYPE_CHECKING
from typing import Any, Callable, Collection, Dict, Iterator, List, Optional, Union, Tuple, Type, TYPE_CHECKING

from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value
from .. import global_config
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name
from ..exceptions import PatroniFatalException, PostgresConnectionException
from ..file_perm import pg_perm
from ..utils import compare_values, parse_bool, parse_int, split_host_port, uri, validate_directory, is_subpath
from ..utils import (compare_values, maybe_convert_base_unit, parse_bool, parse_int,
split_host_port, uri, validate_directory, is_subpath)
from ..validator import IntValidator, EnumValidator

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -270,6 +271,25 @@ def _bool_is_true_validator(value: Any) -> bool:
return parse_bool(value) is True


class ParamDiff(Dict[str, str]):
"""Single PG parameter's value diff to be stored in :attr:`~patroni.postgresql.Postgresql._pending_restart_reason`.
Extends :class:`dict` to only have the two specific attributes: ``old_value`` and ``new_value`` and their values
casted to :class:`str`.
"""

def __init__(self, old_value: Any, new_value: Any):
"""Create a new instance of :class:`ParamDiff` with the given *old_value* and *new_value*.
:param old_value: current :class:`str` parameter value.
:param new_value: :class:`str` value of the paramater after a restart.
"""
super().__init__()
str_value: Callable[[Any], str] = lambda x: '' if x is None else str(x)
self['old_value'] = str_value(old_value)
self['new_value'] = str_value(new_value)


class ConfigHandler(object):

# List of parameters which must be always passed to postmaster as command line options
Expand Down Expand Up @@ -1078,7 +1098,8 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
self._superuser = config['authentication'].get('superuser', {})
server_parameters = self.get_server_parameters(config)

conf_changed = hba_changed = ident_changed = local_connection_address_changed = pending_restart = False
conf_changed = hba_changed = ident_changed = local_connection_address_changed = False
param_diff = CaseInsensitiveDict()
if self._postgresql.state == 'running':
changes = CaseInsensitiveDict({p: v for p, v in server_parameters.items()
if p.lower() not in self._RECOVERY_PARAMETERS})
Expand All @@ -1101,15 +1122,16 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
new_value = changes.pop(r[0])
if new_value is None or not compare_values(r[3], r[2], r[1], new_value):
conf_changed = True
old_value = maybe_convert_base_unit(r[1], r[3], r[2])
if r[4] == 'postmaster':
pending_restart = True
logger.info('Changed %s from %s to %s (restart might be required)',
r[0], r[1], new_value)
param_diff[r[0]] = ParamDiff(old_value, new_value)
logger.info("Changed %s from '%s' to '%s' (restart might be required)",
r[0], old_value, new_value)
if config.get('use_unix_socket') and r[0] == 'unix_socket_directories'\
or r[0] in ('listen_addresses', 'port'):
local_connection_address_changed = True
else:
logger.info('Changed %s from %s to %s', r[0], r[1], new_value)
logger.info("Changed %s from '%s' to '%s'", r[0], old_value, new_value)
elif r[0] in self._server_parameters \
and not compare_values(r[3], r[2], r[1], self._server_parameters[r[0]]):
# Check if any parameter was set back to the current pg_settings value
Expand All @@ -1118,10 +1140,11 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
conf_changed = True
for param, value in changes.items():
if '.' in param:
# Check that user-defined-paramters have changed (parameters with period in name)
# Check that user-defined-parameters have changed (parameters with period in name)
if value is None or param not in self._server_parameters \
or str(value) != str(self._server_parameters[param]):
logger.info('Changed %s from %s to %s', param, self._server_parameters.get(param), value)
logger.info("Changed %s from '%s' to '%s'",
param, self._server_parameters.get(param), value)
conf_changed = True
elif param in server_parameters:
logger.warning('Removing invalid parameter `%s` from postgresql.parameters', param)
Expand All @@ -1136,7 +1159,6 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
ident_changed = self._config.get('pg_ident', []) != config['pg_ident']

self._config = config
self._postgresql.set_pending_restart(pending_restart)
self._server_parameters = server_parameters
self._adjust_recovery_parameters()
self._krbsrvname = config.get('krbsrvname')
Expand Down Expand Up @@ -1166,16 +1188,29 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
if self._postgresql.major_version >= 90500:
time.sleep(1)
try:
pending_restart = self._postgresql.query(
'SELECT COUNT(*) FROM pg_catalog.pg_settings'
' WHERE pg_catalog.lower(name) != ALL(%s) AND pending_restart',
[n.lower() for n in self._RECOVERY_PARAMETERS])[0][0] > 0
self._postgresql.set_pending_restart(pending_restart)
settings_diff: CaseInsensitiveDict = CaseInsensitiveDict()
for param, value, unit, vartype in self._postgresql.query(
'SELECT name, pg_catalog.current_setting(name), unit, vartype FROM pg_catalog.pg_settings'
' WHERE pg_catalog.lower(name) != ALL(%s) AND pending_restart',
[n.lower() for n in self._RECOVERY_PARAMETERS]):
new_value = (
lambda v: '?' if v is None
else maybe_convert_base_unit(v, vartype, unit))(self._postgresql.get_guc_value(param))
settings_diff[param] = ParamDiff(value, new_value)
external_change = {param: value for param, value in settings_diff.items()
if param not in param_diff or value != param_diff[param]}
if external_change:
logger.info("PostgreSQL configuration parameters requiring restart"
" (%s) seem to be changed bypassing Patroni config."
" Setting 'Pending restart' flag", ', '.join(external_change))
param_diff = settings_diff
except Exception as e:
logger.warning('Exception %r when running query', e)
else:
logger.info('No PostgreSQL configuration items changed, nothing to reload.')

self._postgresql.set_pending_restart_reason(param_diff)

def set_synchronous_standby_names(self, value: Optional[str]) -> Optional[bool]:
"""Updates synchronous_standby_names and reloads if necessary.
:returns: True if value was updated."""
Expand Down Expand Up @@ -1227,7 +1262,9 @@ def effective_configuration(self) -> CaseInsensitiveDict:
cvalue = parse_int(data[cname])
if cvalue is not None and value is not None and cvalue > value:
effective_configuration[name] = cvalue
self._postgresql.set_pending_restart(True)
logger.info("%s value in pg_controldata: %d, in the global configuration: %d."
" pg_controldata value will be used. Setting 'Pending restart' flag", name, cvalue, value)
self._postgresql.set_pending_restart_reason(CaseInsensitiveDict({name: ParamDiff(cvalue, value)}), True)

# If we are using custom bootstrap with PITR it could fail when values like max_connections
# are increased, therefore we disable hot_standby if recovery_target_action == 'promote'.
Expand All @@ -1244,7 +1281,10 @@ def effective_configuration(self) -> CaseInsensitiveDict:

if disable_hot_standby:
effective_configuration['hot_standby'] = 'off'
self._postgresql.set_pending_restart(True)
logger.info("'hot_standby' parameter is set to 'off' during the custom bootstrap."
" Setting 'Pending restart' flag")
self._postgresql.set_pending_restart_reason(
CaseInsensitiveDict({'hot_standby': ParamDiff('off', 'on')}), True)

return effective_configuration

Expand Down

0 comments on commit 17fb58c

Please sign in to comment.