Skip to content

Commit

Permalink
Failover logical slots (#1820)
Browse files Browse the repository at this point in the history
Effectively, this PR consists of a few changes:

1. The easy part:
  In case of permanent logical slots are defined in the global configuration, Patroni on the primary will not only create them, but also periodically update DCS with the current values of `confirmed_flush_lsn` for all these slots.
  In order to reduce the number of interactions with DCS the new `/status` key was introduced. It will contain the json object with `optime` and `slots` keys. For backward compatibility the `/optime/leader` will be updated if there are members with old Patroni in the cluster.

2. The tricky part:
  On replicas that are eligible for a failover, Patroni creates the logical replication slot by copying the slot file from the primary and restarting the replica. In order to copy the slot file Patroni opens a connection to the primary with `rewind` or `superuser` credentials and calls `pg_read_binary_file()`  function.
  When the logical slot already exists on the replica Patroni periodically calls `pg_replication_slot_advance()` function, which allows moving the slot forward.

3. Additional requirements:
  In order to ensure that primary doesn't cleanup tuples from pg_catalog that are required for logical decoding, Patroni enables `hot_standby_feedback` on replicas with logical slots and on cascading replicas if they are used for streaming by replicas with logical slots.

4. When logical slots are copied from to the replica there is a timeframe when it could be not safe to use them after promotion. Right now there is no protection from promoting such a replica. But, Patroni will show the warning with names of the slots that might be not safe to use.

Compatibility.
The `pg_replication_slot_advance()` function is only available starting from PostgreSQL 11. For older Postgres versions Patroni will refuse to create the logical slot on the primary.

The old "permanent slots" feature, which creates logical slots right after promotion and before allowing connections, was removed.

Close: #1749
  • Loading branch information
CyberDem0n committed Mar 25, 2021
1 parent 09f2f57 commit c7173aa
Show file tree
Hide file tree
Showing 34 changed files with 933 additions and 320 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/install_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def install_packages(what):
}
packages['exhibitor'] = packages['zookeeper']
packages = packages.get(what, [])
ver = str({'etcd': '9.6', 'etcd3': '9.6', 'consul': 10, 'exhibitor': 11, 'kubernetes': 12, 'raft': 13}.get(what))
ver = str({'etcd': '9.6', 'etcd3': '13', 'consul': 12, 'exhibitor': 11, 'kubernetes': 13, 'raft': 12}.get(what))
subprocess.call(['sudo', 'apt-get', 'update', '-y'])
return subprocess.call(['sudo', 'apt-get', 'install', '-y', 'postgresql-' + ver, 'expect-dev', 'wget'] + packages)

Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ def main():

env = os.environ.copy()
if sys.platform.startswith('linux'):
version = {'etcd': '9.6', 'etcd3': '9.6', 'consul': 10, 'exhibitor': 11, 'kubernetes': 12, 'raft': 13}.get(what)
version = {'etcd': '9.6', 'etcd3': '13', 'consul': 12, 'exhibitor': 11, 'kubernetes': 13, 'raft': 12}.get(what)
path = '/usr/lib/postgresql/{0}/bin:.'.format(version)
unbuffer = ['timeout', '600', 'unbuffer']
args = ['--tags=-skip'] if what == 'etcd' else []
else:
path = os.path.abspath(os.path.join('pgsql', 'bin'))
if sys.platform == 'darwin':
path += ':.'
unbuffer = []
args = unbuffer = []
env['PATH'] = path + os.pathsep + env['PATH']
env['DCS'] = what

ret = subprocess.call(unbuffer + [sys.executable, '-m', 'behave'], env=env)
ret = subprocess.call(unbuffer + [sys.executable, '-m', 'behave'] + args, env=env)

if ret != 0:
if subprocess.call('grep . features/output/*_failed/*postgres?.*', shell=True) != 0:
Expand Down
2 changes: 1 addition & 1 deletion docs/SETTINGS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Dynamic configuration is stored in the DCS (Distributed Configuration Store) and
- **restore\_command**: command to restore WAL records from the remote master to standby leader, can be different from the list defined in :ref:`postgresql_settings`
- **archive\_cleanup\_command**: cleanup command for standby leader
- **recovery\_min\_apply\_delay**: how long to wait before actually apply WAL records on a standby leader
- **slots**: define permanent replication slots. These slots will be preserved during switchover/failover. Patroni will try to create slots before opening connections to the cluster.
- **slots**: define permanent replication slots. These slots will be preserved during switchover/failover. The logical slots are copied from the primary to a standby with restart, and after that their position advanced every **loop_wait** seconds (if necessary). Copying logical slot files performed via ``libpq`` connection and using either rewind or superuser credentials (see **postgresql.authentication** section). There is always a chance that the logical slot position on the replica is a bit behind the former primary, therefore application should be prepared that some messages could be received the second time after the failover. The easiest way of doing so - tracking ``confirmed_flush_lsn``. Enabling permanent logical replication slots requires **postgresql.use_slots** to be set and will also automatically enable the ``hot_standby_feedback``. Since the failover of logical replication slots is unsafe on PostgreSQL 9.6 and older and PostgreSQL version 10 is missing some important functions, the feature only works with PostgreSQL 11+.
- **my_slot_name**: the name of replication slot. If the permanent slot name matches with the name of the current primary it will not be created. Everything else is the responsibility of the operator to make sure that there are no clashes in names between replication slots automatically created by Patroni for members and permanent replication slots.
- **type**: slot type. Could be ``physical`` or ``logical``. If the slot is logical, you have to additionally define ``database`` and ``plugin``.
- **database**: the database name where logical slots should be created.
Expand Down
17 changes: 0 additions & 17 deletions features/callback.py

This file was deleted.

16 changes: 13 additions & 3 deletions features/standby_cluster.feature
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Feature: standby cluster
Scenario: check permanent logical slots are preserved on failover/switchover
Scenario: prepare the cluster with logical slots
Given I start postgres1
Then postgres1 is a leader after 10 seconds
And there is a non empty initialize key in DCS after 15 seconds
Expand All @@ -10,15 +10,24 @@ Feature: standby cluster
When I issue a PATCH request to http://127.0.0.1:8009/config with {"slots": {"test_logical": {"type": "logical", "database": "postgres", "plugin": "test_decoding"}}}
Then I receive a response code 200
And I do a backup of postgres1
When I start postgres0 with callback configured
When I start postgres0
Then "members/postgres0" key in DCS has state=running after 10 seconds
And replication works from postgres1 to postgres0 after 15 seconds

@skip
Scenario: check permanent logical slots are synced to the replica
Given I run patronictl.py restart batman postgres1 --force
Then Logical slot test_logical is in sync between postgres0 and postgres1 after 10 seconds
When I add the table replicate_me to postgres1
And I get all changes from logical slot test_logical on postgres1
Then Logical slot test_logical is in sync between postgres0 and postgres1 after 10 seconds

Scenario: Detach exiting node from the cluster
When I shut down postgres1
Then postgres0 is a leader after 10 seconds
And "members/postgres0" key in DCS has role=master after 3 seconds
When I issue a GET request to http://127.0.0.1:8008/
Then I receive a response code 200
And there is a label with "test_logical" in postgres0 data directory

Scenario: check replication of a single table in a standby cluster
Given I start postgres1 in a standby cluster batman1 as a clone of postgres0
Expand All @@ -35,6 +44,7 @@ Feature: standby cluster
When I start postgres2 in a cluster batman1
Then postgres2 role is the replica after 24 seconds
And table foo is present on postgres2 after 20 seconds
And postgres1 does not have a logical replication slot named test_logical

Scenario: check failover
When I kill postgres1
Expand Down
32 changes: 28 additions & 4 deletions features/steps/slots.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import time
import psycopg2

from behave import step, then
import psycopg2 as pg


@step('I create a logical replication slot {slot_name} on {pg_name:w} with the {plugin:w} plugin')
Expand All @@ -8,7 +10,7 @@ def create_logical_replication_slot(context, slot_name, pg_name, plugin):
output = context.pctl.query(pg_name, ("SELECT pg_create_logical_replication_slot('{0}', '{1}'),"
" current_database()").format(slot_name, plugin))
print(output.fetchone())
except pg.Error as e:
except psycopg2.Error as e:
print(e)
assert False, "Error creating slot {0} on {1} with plugin {2}".format(slot_name, pg_name, plugin)

Expand All @@ -22,7 +24,7 @@ def has_logical_replication_slot(context, pg_name, slot_name, plugin):
assert row[0] == "logical", "Found replication slot named {0} but wasn't a logical slot".format(slot_name)
assert row[1] == plugin, ("Found replication slot named {0} but was using plugin "
"{1} rather than {2}").format(slot_name, row[1], plugin)
except pg.Error:
except psycopg2.Error:
assert False, "Error looking for slot {0} on {1} with plugin {2}".format(slot_name, pg_name, plugin)


Expand All @@ -32,5 +34,27 @@ def does_not_have_logical_replication_slot(context, pg_name, slot_name):
row = context.pctl.query(pg_name, ("SELECT 1 FROM pg_replication_slots"
" WHERE slot_name = '{0}'").format(slot_name)).fetchone()
assert not row, "Found unexpected replication slot named {0}".format(slot_name)
except pg.Error:
except psycopg2.Error:
assert False, "Error looking for slot {0} on {1}".format(slot_name, pg_name)


@step('Logical slot {slot_name:w} is in sync between {pg_name1:w} and {pg_name2:w} after {time_limit:d} seconds')
def logical_slots_in_sync(context, slot_name, pg_name1, pg_name2, time_limit):
time_limit *= context.timeout_multiplier
max_time = time.time() + int(time_limit)
while time.time() < max_time:
try:
query = "SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '{0}'".format(slot_name)
slot1 = context.pctl.query(pg_name1, query).fetchone()
slot2 = context.pctl.query(pg_name2, query).fetchone()
if slot1[0] == slot2[0]:
return
except Exception:
pass
time.sleep(1)
assert False, "Logical slot {0} is not in sync between {1} and {2}".format(slot_name, pg_name1, pg_name2)


@step('I get all changes from logical slot {slot_name:w} on {pg_name:w}')
def logical_slot_get_changes(context, slot_name, pg_name):
context.pctl.query(pg_name, "SELECT * FROM pg_logical_slot_get_changes('{0}', NULL, NULL)".format(slot_name))
14 changes: 2 additions & 12 deletions features/steps/standby_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@
callback = executable + " features/callback2.py "


@step('I start {name:w} with callback configured')
def start_patroni_with_callbacks(context, name):
return context.pctl.start(name, custom_config={
"postgresql": {
"callbacks": {
"on_role_change": executable + " features/callback.py"
}
}
})


@step('I start {name:w} in a cluster {cluster_name:w}')
def start_patroni(context, name, cluster_name):
return context.pctl.start(name, custom_config={
Expand Down Expand Up @@ -55,7 +44,8 @@ def start_patroni_standby_cluster(context, name, cluster_name, name2):
"port": port,
"primary_slot_name": "pm_1",
"create_replica_methods": ["backup_restore", "basebackup"]
}
},
"postgresql": {"parameters": {"wal_level": "logical"}}
}
},
"postgresql": {
Expand Down
2 changes: 1 addition & 1 deletion patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def do_GET(self, write_status_code_only=False):
patroni = self.server.patroni
cluster = patroni.dcs.cluster

leader_optime = cluster and cluster.last_leader_operation or 0
leader_optime = cluster and cluster.last_lsn or 0
replayed_location = response.get('xlog', {}).get('replayed_location', 0)
max_replica_lag = parse_int(self.path_query.get('lag', [sys.maxsize])[0], 'B')
if max_replica_lag is None:
Expand Down

0 comments on commit c7173aa

Please sign in to comment.