Skip to content

Commit

Permalink
Flush logical slots to disk during a shutdown checkpoint if required.
Browse files Browse the repository at this point in the history
It's entirely possible for a logical slot to have a confirmed_flush LSN
higher than the last value saved on disk while not being marked as dirty.
Currently, it is not a major problem but a later patch adding support for
the upgrade of slots relies on that value being properly flushed to disk.

It can also help avoid processing the same transactions again in some
boundary cases after the clean shutdown and restart.  Say, we process
some transactions for which we didn't send anything downstream (the
changes got filtered) but the confirm_flush LSN is updated due to
keepalives.  As we don't flush the latest value of confirm_flush LSN, it
may lead to processing the same changes again without this patch.

The approach taken by this patch has been suggested by Ashutosh Bapat.

Author: Vignesh C, Julien Rouhaud, Kuroda Hayato
Reviewed-by: Amit Kapila, Dilip Kumar, Michael Paquier, Ashutosh Bapat, Peter Smith, Hou Zhijie
Discussion: http://postgr.es/m/CAA4eK1JzJagMmb_E8D4au=GYQkxox0AfNBm1FbP7sy7t4YWXPQ@mail.gmail.com
Discussion: http://postgr.es/m/TYAPR01MB58664C81887B3AF2EB6B16E3F5939@TYAPR01MB5866.jpnprd01.prod.outlook.com
  • Loading branch information
Amit Kapila committed Sep 14, 2023
1 parent a2e0d5e commit e0b2eed
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -7039,7 +7039,7 @@ static void
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
CheckPointRelationMap();
CheckPointReplicationSlots();
CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
CheckPointSnapBuild();
CheckPointLogicalRewriteHeap();
CheckPointReplicationOrigin();
Expand Down
37 changes: 33 additions & 4 deletions src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;

/*
* Create the slot on disk. We haven't actually marked the slot allocated
Expand Down Expand Up @@ -1572,11 +1573,13 @@ InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
/*
* Flush all replication slots to disk.
*
* This needn't actually be part of a checkpoint, but it's a convenient
* location.
* It is convenient to flush dirty replication slots at the time of checkpoint.
* Additionally, in case of a shutdown checkpoint, we also identify the slots
* for which the confirmed_flush LSN has been updated since the last time it
* was saved and flush them.
*/
void
CheckPointReplicationSlots(void)
CheckPointReplicationSlots(bool is_shutdown)
{
int i;

Expand All @@ -1601,6 +1604,30 @@ CheckPointReplicationSlots(void)

/* save the slot to disk, locking is handled in SaveSlotToPath() */
sprintf(path, "pg_replslot/%s", NameStr(s->data.name));

/*
* Slot's data is not flushed each time the confirmed_flush LSN is
* updated as that could lead to frequent writes. However, we decide
* to force a flush of all logical slot's data at the time of shutdown
* if the confirmed_flush LSN is changed since we last flushed it to
* disk. This helps in avoiding an unnecessary retreat of the
* confirmed_flush LSN after restart.
*/
if (is_shutdown && SlotIsLogical(s))
{
SpinLockAcquire(&s->mutex);

Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush);

if (s->data.invalidated == RS_INVAL_NONE &&
s->data.confirmed_flush != s->last_saved_confirmed_flush)
{
s->just_dirtied = true;
s->dirty = true;
}
SpinLockRelease(&s->mutex);
}

SaveSlotToPath(s, path, LOG);
}
LWLockRelease(ReplicationSlotAllocationLock);
Expand Down Expand Up @@ -1873,11 +1900,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)

/*
* Successfully wrote, unset dirty bit, unless somebody dirtied again
* already.
* already and remember the confirmed_flush LSN value.
*/
SpinLockAcquire(&slot->mutex);
if (!slot->just_dirtied)
slot->dirty = false;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
SpinLockRelease(&slot->mutex);

LWLockRelease(&slot->io_in_progress_lock);
Expand Down Expand Up @@ -2074,6 +2102,7 @@ RestoreSlotFromDisk(const char *name)
/* initialize in memory state */
slot->effective_xmin = cp.slotdata.xmin;
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;

slot->candidate_catalog_xmin = InvalidTransactionId;
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
Expand Down
9 changes: 8 additions & 1 deletion src/include/replication/slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ typedef struct ReplicationSlot
XLogRecPtr candidate_xmin_lsn;
XLogRecPtr candidate_restart_valid;
XLogRecPtr candidate_restart_lsn;

/*
* This value tracks the last confirmed_flush LSN flushed which is used
* during a shutdown checkpoint to decide if logical's slot data should be
* forcibly flushed or not.
*/
XLogRecPtr last_saved_confirmed_flush;
} ReplicationSlot;

#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
Expand Down Expand Up @@ -241,7 +248,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);

extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
extern void CheckPointReplicationSlots(bool is_shutdown);

extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
Expand Down
1 change: 1 addition & 0 deletions src/test/recovery/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tests += {
't/035_standby_logical_decoding.pl',
't/036_truncated_dropped.pl',
't/037_invalid_database.pl',
't/038_save_logical_slots_shutdown.pl',
],
},
}
102 changes: 102 additions & 0 deletions src/test/recovery/t/038_save_logical_slots_shutdown.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@

# Copyright (c) 2023, PostgreSQL Global Development Group

# Test logical replication slots are always flushed to disk during a shutdown
# checkpoint.

use strict;
use warnings;

use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

sub compare_confirmed_flush
{
my ($node, $confirmed_flush_from_log) = @_;

# Fetch Latest checkpoint location from the control file
my ($stdout, $stderr) =
run_command([ 'pg_controldata', $node->data_dir ]);
my @control_data = split("\n", $stdout);
my $latest_checkpoint = undef;
foreach (@control_data)
{
if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg)
{
$latest_checkpoint = $1;
last;
}
}
die "Latest checkpoint location not found in control file\n"
unless defined($latest_checkpoint);

# Is it same as the value read from log?
ok( $latest_checkpoint eq $confirmed_flush_from_log,
"Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location"
);

return;
}

# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('pub');
$node_publisher->init(allows_streaming => 'logical');
# Avoid checkpoint during the test, otherwise, the latest checkpoint location
# will change.
$node_publisher->append_conf(
'postgresql.conf', q{
checkpoint_timeout = 1h
autovacuum = off
});
$node_publisher->start;

# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('sub');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;

# Create tables
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");

# Insert some data
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tbl VALUES (generate_series(1, 5));");

# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION pub FOR ALL TABLES");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
);

$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');

my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");

is($result, qq(5), "check initial copy was done");

my $offset = -s $node_publisher->logfile;

# Restart the publisher to ensure that the slot will be flushed if required
$node_publisher->restart();

# Wait until the walsender creates decoding context
$node_publisher->wait_for_log(
qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./,
$offset);

# Extract confirmed_flush from the logfile
my $log_contents = slurp_file($node_publisher->logfile, $offset);
$log_contents =~
qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./
or die "could not get confirmed_flush_lsn";

# Ensure that the slot's confirmed_flush LSN is the same as the
# latest_checkpoint location.
compare_confirmed_flush($node_publisher, $1);

done_testing();

0 comments on commit e0b2eed

Please sign in to comment.