Skip to content

Commit

Permalink
Compatibility with postgresql 12 (#1068)
Browse files Browse the repository at this point in the history
* use `SHOW primary_conninfo` instead of parsing config file on pg12
* strip out standby and recovery parameters from postgresql.auto.conf before starting the postgres 12

Patroni config remains backward compatible.
Despite for example `restore_command` converted to a GUC starting from postgresql 12, in the Patroni configuration you can still keep it in the `postgresql.recovery_conf` section.
If you put it into `postgresql.parameters.restore_command`, that will also work, but it is important not to mix both ways:
```yaml
# is OK
postgresql:
  parameters:
    restore_command: my_restore_command
    archive_cleanup_command: my_archive_cleanup_command

# is OK
postgresql:
  recovery_conf:
    restore_command: my_restore_command
    archive_cleanup_command: my_archive_cleanup_command

# is NOT ok
postgresql:
  parameters:
    restore_command: my_restore_command
  recovery_conf:
    archive_cleanup_command: my_archive_cleanup_command
```
  • Loading branch information
CyberDem0n committed Aug 2, 2019
1 parent 4a24b79 commit 0b1b1e3
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 21 deletions.
3 changes: 2 additions & 1 deletion patroni/postgresql/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def _post_restore(self):
self._postgresql.configure_server_parameters()

# make sure there is no trigger file or postgres will be automatically promoted
trigger_file = self._postgresql.config.get('recovery_conf', {}).get('trigger_file') or 'promote'
trigger_file = 'promote_trigger_file' if self._postgresql.major_version >= 120000 else 'trigger_file'
trigger_file = self._postgresql.config.get('recovery_conf', {}).get(trigger_file) or 'promote'
trigger_file = os.path.abspath(os.path.join(self._postgresql.data_dir, trigger_file))
if os.path.exists(trigger_file):
os.unlink(trigger_file)
Expand Down
154 changes: 140 additions & 14 deletions patroni/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,26 +186,53 @@ class ConfigHandler(object):
'wal_log_hints': ('on', lambda _: False, 90400)
})

_RECOVERY_PARAMETERS = {
'archive_cleanup_command',
'restore_command',
'recovery_end_command',
'recovery_target',
'recovery_target_name',
'recovery_target_time',
'recovery_target_xid',
'recovery_target_lsn',
'recovery_target_inclusive',
'recovery_target_timeline',
'recovery_target_action',
'recovery_min_apply_delay',
'primary_conninfo',
'primary_slot_name',
'promote_trigger_file',
'trigger_file'
}

_CONFIG_WARNING_HEADER = '# Do not edit this file manually!\n# It will be overwritten by Patroni!\n'

def __init__(self, postgresql, config):
self._postgresql = postgresql
self._config_dir = os.path.abspath(config.get('config_dir') or postgresql.data_dir)
config_base_name = config.get('config_base_name', 'postgresql')
self._postgresql_conf = os.path.join(self._config_dir, config_base_name + '.conf')
self._postgresql_conf_mtime = None
self._postgresql_base_conf_name = config_base_name + '.base.conf'
self._postgresql_base_conf = os.path.join(self._config_dir, self._postgresql_base_conf_name)
self._pg_hba_conf = os.path.join(self._config_dir, 'pg_hba.conf')
self._pg_ident_conf = os.path.join(self._config_dir, 'pg_ident.conf')
self._recovery_conf = os.path.join(postgresql.data_dir, 'recovery.conf')
self._recovery_conf_mtime = None
self._recovery_signal = os.path.join(postgresql.data_dir, 'recovery.signal')
self._standby_signal = os.path.join(postgresql.data_dir, 'standby.signal')
self._auto_conf = os.path.join(postgresql.data_dir, 'postgresql.auto.conf')
self._auto_conf_mtime = None
self._synchronous_standby_names = None
self._postmaster_ctime = None
self._primary_conninfo = None
self._config = {}
self._recovery_params = {}
self.reload_config(config)

def setup_server_parameters(self):
self._server_parameters = self.get_server_parameters(self._config)
self._adjust_recovery_parameters()

@property
def _configuration_to_save(self):
Expand Down Expand Up @@ -256,10 +283,12 @@ def write_postgresql_conf(self, configuration=None):
os.rename(self._postgresql_conf, self._postgresql_base_conf)

with open(self._postgresql_conf, 'w') as f:
os.chmod(self._postgresql_conf, stat.S_IWRITE | stat.S_IREAD)
f.write(self._CONFIG_WARNING_HEADER)
f.write("include '{0}'\n\n".format(self._config.get('custom_conf') or self._postgresql_base_conf_name))
for name, value in sorted((configuration or self._server_parameters).items()):
if not self._postgresql.bootstrap.running_custom_bootstrap or name != 'hba_file':
if (not self._postgresql.bootstrap.running_custom_bootstrap or name != 'hba_file') \
and name not in self._RECOVERY_PARAMETERS:
f.write("{0} = '{1}'\n".format(name, value))
# when we are doing custom bootstrap we assume that we don't know superuser password
# and in order to be able to change it, we are opening trust access from a certain address
Expand All @@ -270,6 +299,15 @@ def write_postgresql_conf(self, configuration=None):
if 'ident_file' not in self._server_parameters:
f.write("ident_file = '{0}'\n".format(self._pg_ident_conf.replace('\\', '\\\\')))

if self._postgresql.major_version >= 120000:
if self._recovery_params:
f.write('\n# recovery.conf\n')
for name, value in sorted(self._recovery_params.items()):
f.write("{0} = '{1}'\n".format(name, value))

if not self._postgresql.bootstrap.keep_existing_recovery_conf:
self._sanitize_auto_conf()

def append_pg_hba(self, config):
if not self.hba_file and not self._config.get('pg_hba'):
with open(self._pg_hba_conf, 'a') as f:
Expand Down Expand Up @@ -342,10 +380,31 @@ def primary_conninfo(self, member):
return ' '.join('{0}={{{0}}}'.format(kw) for kw in keywords if r.get(kw)).format(**r)

def recovery_conf_exists(self):
if self._postgresql.major_version >= 120000:
return os.path.exists(self._standby_signal) or os.path.exists(self._recovery_signal)
return os.path.exists(self._recovery_conf)

def _read_primary_conninfo(self):
# TODO: recovery.conf could be stale, would be nice to detect that.
pg_conf_mtime = mtime(self._postgresql_conf)
auto_conf_mtime = mtime(self._auto_conf)
postmaster_ctime = self._postgresql.is_running()
if postmaster_ctime:
postmaster_ctime = postmaster_ctime.create_time()

if self._postgresql_conf_mtime == pg_conf_mtime and self._auto_conf_mtime == auto_conf_mtime \
and self._postmaster_ctime == postmaster_ctime:
return None, False

try:
primary_conninfo = self._postgresql.query('SHOW primary_conninfo').fetchone()[0]
self._postgresql_conf_mtime = pg_conf_mtime
self._auto_conf_mtime = auto_conf_mtime
self._postmaster_ctime = postmaster_ctime
except Exception:
primary_conninfo = None
return primary_conninfo, True

def _read_primary_conninfo_pre_v12(self):
recovery_conf_mtime = mtime(self._recovery_conf)
if recovery_conf_mtime == self._recovery_conf_mtime:
return None, False
Expand All @@ -363,13 +422,22 @@ def _read_primary_conninfo(self):
self._recovery_conf_mtime = recovery_conf_mtime
return primary_conninfo, True

def check_recovery_conf(self, member):
# Name is confusing. In fact it checks the value of primary_conninfo
if not self.recovery_conf_exists():
return False
def check_recovery_conf(self, member): # Name is confusing. In fact it checks the value of primary_conninfo
# TODO: recovery.conf could be stale, would be nice to detect that.
if self._postgresql.major_version >= 120000:
if not os.path.exists(self._standby_signal):
return False

_read_primary_conninfo = self._read_primary_conninfo
else:
if not self.recovery_conf_exists():
return False

primary_conninfo, updated = self._read_primary_conninfo()
# updated indicates that mtime of recovery.conf was changed and the primary_conninfo value was read from file
_read_primary_conninfo = self._read_primary_conninfo_pre_v12

primary_conninfo, updated = _read_primary_conninfo()
# updated indicates that mtime of postgresql.conf, postgresql.auto.conf, or recovery.conf was changed
# and the primary_conninfo value was read either from config or from the database connection.
if updated:
# primary_conninfo is one of:
# - None (exception or unparsable config)
Expand Down Expand Up @@ -397,15 +465,69 @@ def check_recovery_conf(self, member):

return all(self._primary_conninfo.get(p) == str(v) for p, v in wanted_primary_conninfo.items())

@staticmethod
def _remove_file_if_exists(name):
if os.path.isfile(name) or os.path.islink(name):
os.unlink(name)

def write_recovery_conf(self, recovery_params):
with open(self._recovery_conf, 'w') as f:
os.chmod(self._recovery_conf, stat.S_IWRITE | stat.S_IREAD)
for name, value in recovery_params.items():
f.write("{0} = '{1}'\n".format(name, value))
if self._postgresql.major_version >= 120000:
if parse_bool(recovery_params.pop('standby_mode', None)):
open(self._standby_signal, 'w').close()
else:
self._remove_file_if_exists(self._standby_signal)
open(self._recovery_signal, 'w').close()
self._recovery_params = recovery_params
else:
with open(self._recovery_conf, 'w') as f:
os.chmod(self._recovery_conf, stat.S_IWRITE | stat.S_IREAD)
for name, value in recovery_params.items():
f.write("{0} = '{1}'\n".format(name, value))

def remove_recovery_conf(self):
if os.path.isfile(self._recovery_conf) or os.path.islink(self._recovery_conf):
os.unlink(self._recovery_conf)
for name in (self._recovery_conf, self._standby_signal, self._recovery_signal):
self._remove_file_if_exists(name)
self._recovery_params = {}

def _sanitize_auto_conf(self):
overwrite = False
lines = []

if os.path.exists(self._auto_conf):
try:
with open(self._auto_conf) as f:
for raw_line in f:
line = raw_line.strip()
match = PARAMETER_RE.match(line)
if match and match.group(1).lower() in self._RECOVERY_PARAMETERS:
overwrite = True
else:
lines.append(raw_line)
except Exception:
logger.info('Failed to read %s', self._auto_conf)

if overwrite:
try:
with open(self._auto_conf, 'w') as f:
for raw_line in lines:
f.write(raw_line)
except Exception:
logger.exception('Failed to remove some unwanted parameters from %s', self._auto_conf)

def _adjust_recovery_parameters(self):
# It is not strictly necessary, but we can make patroni configs crossi-compatible with all postgres versions.
recovery_conf = {n: v for n, v in self._server_parameters.items() if n.lower() in self._RECOVERY_PARAMETERS}
if recovery_conf:
self._config['recovery_conf'] = recovery_conf

if self.get('recovery_conf'):
good_name, bad_name = 'trigger_file', 'promote_trigger_file'
if self._postgresql.major_version >= 120000:
good_name, bad_name = bad_name, good_name

value = self._config['recovery_conf'].pop(bad_name, None)
if good_name not in self._config['recovery_conf'] and value:
self._config['recovery_conf'][good_name] = value

def get_server_parameters(self, config):
parameters = config['parameters'].copy()
Expand Down Expand Up @@ -534,6 +656,7 @@ def reload_config(self, config):
self._config = config
self._postgresql.set_pending_restart(pending_restart)
self._server_parameters = server_parameters
self._adjust_recovery_parameters()
self._connect_address = config.get('connect_address')
self._krbsrvname = config.get('krbsrvname')

Expand Down Expand Up @@ -596,6 +719,9 @@ def effective_configuration(self):
if self._postgresql.major_version >= 90400:
options_mapping['max_worker_processes'] = 'max_worker_processes setting'

if self._postgresql.major_version >= 120000:
options_mapping['max_wal_senders'] = 'max_wal_senders setting'

data = self._postgresql.controldata()
effective_configuration = self._server_parameters.copy()

Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class PostgresInit(unittest.TestCase):
'search_path': 'public', 'hot_standby': 'on', 'max_wal_senders': 5,
'wal_keep_segments': 8, 'wal_log_hints': 'on', 'max_locks_per_transaction': 64,
'max_worker_processes': 8, 'max_connections': 100, 'max_prepared_transactions': 0,
'track_commit_timestamp': 'off', 'unix_socket_directories': '/tmp'}
'track_commit_timestamp': 'off', 'unix_socket_directories': '/tmp', 'trigger_file': 'bla'}

@patch('psycopg2.connect', psycopg2_connect)
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
Expand Down
70 changes: 65 additions & 5 deletions tests/test_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import psycopg2
import subprocess
import time

from mock import Mock, MagicMock, PropertyMock, patch, mock_open
from patroni.async_executor import CriticalTask
Expand All @@ -17,6 +18,17 @@
from . import BaseTestPostgresql, MockCursor, MockPostmaster, psycopg2_connect


mtime_ret = {}


def mock_mtime(filename):
if filename not in mtime_ret:
mtime_ret[filename] = time.time()
else:
mtime_ret[filename] += 1
return mtime_ret[filename]


def pg_controldata_string(*args, **kwargs):
return b"""
pg_control version number: 942
Expand Down Expand Up @@ -78,7 +90,7 @@ class TestPostgresql(BaseTestPostgresql):

@patch('subprocess.call', Mock(return_value=0))
@patch('os.rename', Mock())
@patch.object(Postgresql, 'get_major_version', Mock(return_value=90600))
@patch.object(Postgresql, 'get_major_version', Mock(return_value=120000))
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
def setUp(self):
super(TestPostgresql, self).setUp()
Expand Down Expand Up @@ -186,11 +198,58 @@ def test_checkpoint(self):
self.assertIsNone(self.p.checkpoint())
self.assertEqual(self.p.checkpoint(), 'not accessible or not healty')

@patch('patroni.postgresql.config.mtime', mock_mtime)
@patch.object(MockCursor, 'fetchone', Mock(side_effect=[('foo=bar',), ('',), ('',), ('a=b',)]))
def test_check_recovery_conf(self):
self.p.config.write_recovery_conf({'primary_conninfo': 'foo'})
self.assertFalse(self.p.config.check_recovery_conf(None))
self.p.config.write_recovery_conf({})
for version in (120000, 100000):
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=version)):
self.p.config.write_recovery_conf({'standby_mode': 'on', 'primary_conninfo': 'foo'})
self.assertFalse(self.p.config.check_recovery_conf(None))
self.p.config.write_recovery_conf({'primary_conninfo': 'foo'})
self.p.config.write_postgresql_conf()
self.assertFalse(self.p.config.check_recovery_conf(None))
self.p.config.write_recovery_conf({'standby_mode': 'on'})
self.assertTrue(self.p.config.check_recovery_conf(None))
with patch('patroni.postgresql.config.ConfigHandler.primary_conninfo_params',
Mock(return_value={'a': 'b'})):
self.assertFalse(self.p.config.check_recovery_conf(None))
self.p.config.write_recovery_conf({'standby_mode': 'on', 'primary_conninfo': 'a=b'})
self.assertTrue(self.p.config.check_recovery_conf(None))

@patch.object(Postgresql, 'major_version', PropertyMock(return_value=120000))
@patch.object(Postgresql, 'is_running', MockPostmaster)
@patch.object(MockPostmaster, 'create_time', Mock(return_value=1234567), create=True)
@patch.object(MockCursor, 'fetchone', Mock(return_value=('',)))
def test__read_primary_conninfo(self):
self.p.config.write_recovery_conf({'standby_mode': 'on'})
self.p.config.write_postgresql_conf()
self.assertTrue(self.p.config.check_recovery_conf(None))
self.assertTrue(self.p.config.check_recovery_conf(None))
with patch.object(Postgresql, 'query', Mock(side_effect=Exception)),\
patch('patroni.postgresql.config.mtime', mock_mtime):
self.assertFalse(self.p.config.check_recovery_conf(None))

@patch.object(Postgresql, 'major_version', PropertyMock(return_value=100000))
def test__read_primary_conninfo_pre_v12(self):
self.p.config.write_recovery_conf({'standby_mode': 'on'})
self.assertTrue(self.p.config.check_recovery_conf(None))
self.assertTrue(self.p.config.check_recovery_conf(None))

def test_write_postgresql_and_sanitize_auto_conf(self):
read_data = 'primary_conninfo = foo\nfoo = bar\n'
with open(os.path.join(self.p.data_dir, 'postgresql.auto.conf'), 'w') as f:
f.write(read_data)

mock_read_auto = mock_open(read_data=read_data)
mock_read_auto.return_value.__iter__ = lambda o: iter(o.readline, '')
with patch.object(builtins, 'open', Mock(side_effect=[mock_open()(), mock_read_auto(), IOError])),\
patch('os.chmod', Mock()):
self.p.config.write_postgresql_conf()

with patch.object(builtins, 'open', Mock(side_effect=[mock_open()(), IOError])), patch('os.chmod', Mock()):
self.p.config.write_postgresql_conf()
self.p.config.write_recovery_conf({'foo': 'bar'})
self.p.config.write_postgresql_conf()

@patch.object(Postgresql, 'is_running', Mock(return_value=False))
@patch.object(Postgresql, 'start', Mock())
Expand Down Expand Up @@ -604,7 +663,8 @@ def test__build_effective_configuration(self):
Mock(return_value={'max_connections setting': '200',
'max_worker_processes setting': '20',
'max_prepared_xacts setting': '100',
'max_locks_per_xact setting': '100'})):
'max_locks_per_xact setting': '100',
'max_wal_senders setting': 10})):
self.p.cancellable.cancel()
self.assertFalse(self.p.start())
self.assertTrue(self.p.pending_restart)

0 comments on commit 0b1b1e3

Please sign in to comment.