Skip to content

Commit

Permalink
feat: Use KeepAlive to confirm LSNs (#2941)
Browse files Browse the repository at this point in the history
* Advance flushLSN from KeepAlive if client has confirmed most recent XLogData

* Add test

* Create 2nd database in appveyor

* Reduce StatusInterval and add small wait

* Refactor test method

---------

Co-authored-by: Declan Murphy <declan.murphy@zalando.ie>
  • Loading branch information
decmurphy and Declan Murphy committed Aug 30, 2023
1 parent 5c9928d commit 650767d
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 2 deletions.
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ before_build:
- psql -U postgres -c "alter user test with replication" postgres
- createuser -U postgres testsspi
- createdb -U postgres -O test test
- createdb -U postgres -O test test_2
- del %APPDATA%\postgresql\pgpass.conf

build_script:
Expand Down
4 changes: 4 additions & 0 deletions docker/postgres-server/scripts/post-startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ main () {
psql_super "${POSTGRES_DB}" "
CREATE DATABASE test OWNER test
"
# Create additional database for some LSN testing
psql_super "${POSTGRES_DB}" "
CREATE DATABASE test_2 OWNER test
"

if ! is_pg_version_at_least "9.0"; then
# Older versions do not have plpgsql so we explicitly install it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class V3PGReplicationStream implements PGReplicationStream {
private volatile LogSequenceNumber lastReceiveLSN = LogSequenceNumber.INVALID_LSN;
private volatile LogSequenceNumber lastAppliedLSN = LogSequenceNumber.INVALID_LSN;
private volatile LogSequenceNumber lastFlushedLSN = LogSequenceNumber.INVALID_LSN;
private volatile LogSequenceNumber startOfLastMessageLSN = LogSequenceNumber.INVALID_LSN;
private volatile LogSequenceNumber explicitlyFlushedLSN = LogSequenceNumber.INVALID_LSN;

/**
* @param copyDual bidirectional copy protocol
Expand Down Expand Up @@ -194,6 +196,7 @@ private void updateStatusInternal(
copyDual.writeToCopy(reply, 0, reply.length);
copyDual.flushCopy();

explicitlyFlushedLSN = flushed;
lastStatusUpdate = System.nanoTime();
}

Expand Down Expand Up @@ -230,6 +233,13 @@ private boolean processKeepAliveMessage(ByteBuffer buffer) {
if (lastServerLSN.asLong() > lastReceiveLSN.asLong()) {
lastReceiveLSN = lastServerLSN;
}
// if the client has confirmed flush of last XLogData msg and KeepAlive shows ServerLSN is still
// advancing, we can safely advance FlushLSN to ServerLSN
if (explicitlyFlushedLSN.asLong() >= startOfLastMessageLSN.asLong()
&& lastServerLSN.asLong() > explicitlyFlushedLSN.asLong()
&& lastServerLSN.asLong() > lastFlushedLSN.asLong()) {
lastFlushedLSN = lastServerLSN;
}

long lastServerClock = buffer.getLong();

Expand All @@ -248,6 +258,7 @@ private boolean processKeepAliveMessage(ByteBuffer buffer) {

private ByteBuffer processXLogData(ByteBuffer buffer) {
long startLsn = buffer.getLong();
startOfLastMessageLSN = LogSequenceNumber.valueOf(startLsn);
lastServerLSN = LogSequenceNumber.valueOf(buffer.getLong());
long systemClock = buffer.getLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@ public class LogicalReplicationStatusTest {

private Connection replicationConnection;
private Connection sqlConnection;
private Connection secondSqlConnection;

@Before
public void setUp() throws Exception {
//statistic available only for privileged user
sqlConnection = TestUtil.openPrivilegedDB();
secondSqlConnection = TestUtil.openPrivilegedDB("test_2");
//DriverManager.setLogWriter(new PrintWriter(System.out));
replicationConnection = TestUtil.openReplicationConnection();
TestUtil.createTable(sqlConnection, "test_logic_table",
"pk serial primary key, name varchar(100)");
TestUtil.createTable(secondSqlConnection, "test_logic_table",
"pk serial primary key, name varchar(100)");

TestUtil.recreateLogicalReplicationSlot(sqlConnection, SLOT_NAME, "test_decoding");
}
Expand All @@ -59,7 +63,9 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
replicationConnection.close();
TestUtil.dropTable(sqlConnection, "test_logic_table");
TestUtil.dropTable(secondSqlConnection, "test_logic_table");
TestUtil.dropReplicationSlot(sqlConnection, SLOT_NAME);
secondSqlConnection.close();
sqlConnection.close();
}

Expand Down Expand Up @@ -423,6 +429,59 @@ flushLSN, equalTo(waitLSN)
);
}

@Test()
public void testKeepAliveServerLSNCanBeUsedToAdvanceFlushLSN() throws Exception {
PGConnection pgConnection = (PGConnection) replicationConnection;

LogSequenceNumber startLSN = getCurrentLSN();

PGReplicationStream stream =
pgConnection
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(SLOT_NAME)
.withStartPosition(startLSN)
.withStatusInterval(1, TimeUnit.SECONDS)
.start();

// create replication changes and poll for messages
Statement st = sqlConnection.createStatement();
st.execute("insert into test_logic_table(name) values('previous changes')");
st.close();

receiveMessageWithoutBlock(stream, 3);

// client confirms flush of these changes. At this point we're in sync with server
LogSequenceNumber confirmedClientFlushLSN = stream.getLastReceiveLSN();
stream.setFlushedLSN(confirmedClientFlushLSN);
stream.forceUpdateStatus();

// now insert something into other DB (without replication) to generate WAL
System.out.println("Writing to quiet table");
Statement st2 = secondSqlConnection.createStatement();
st2.execute("insert into test_logic_table(name) values('previous changes')");
st2.close();

TimeUnit.SECONDS.sleep(1);

// read KeepAlive messages - lastServerLSN will have advanced and we can safely confirm it
stream.readPending();

LogSequenceNumber lastFlushedLSN = stream.getLastFlushedLSN();
LogSequenceNumber lastReceivedLSN = stream.getLastReceiveLSN();

assertThat("Activity in other database will generate WAL but no XLogData "
+ " messages. Received LSN will begin to advance beyond of confirmed flushLSN",
confirmedClientFlushLSN, not(equalTo(lastReceivedLSN))
);

assertThat("When all XLogData messages have been processed, we can confirm "
+ " flush of Server LSNs in the KeepAlive messages",
lastFlushedLSN, equalTo(lastReceivedLSN)
);
}

private LogSequenceNumber getSentLocationOnView() throws Exception {
return getLSNFromView((((BaseConnection) sqlConnection).haveMinimumServerVersion(ServerVersion.v10)
? "sent_lsn" : "sent_location"));
Expand Down
12 changes: 10 additions & 2 deletions pgjdbc/src/test/java/org/postgresql/test/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public class TestUtil {
* Returns the Test database JDBC URL
*/
public static String getURL() {
return getURL(getServer(), + getPort());
return getURL(getServer(), getPort());
}

public static String getURL(String database) {
return getURL(getServer() + ":" + getPort(), database);
}

public static String getURL(String server, int port) {
Expand Down Expand Up @@ -309,14 +313,18 @@ public static File getFile(String name) {
* functions now as of 4/14
*/
public static Connection openPrivilegedDB() throws SQLException {
return openPrivilegedDB(getDatabase());
}

public static Connection openPrivilegedDB(String databaseName) throws SQLException {
initDriver();
Properties properties = new Properties();

PGProperty.GSS_ENC_MODE.set(properties,getGSSEncMode().value);
PGProperty.USER.set(properties, getPrivilegedUser());
PGProperty.PASSWORD.set(properties, getPrivilegedPassword());
PGProperty.OPTIONS.set(properties, "-c synchronous_commit=on");
return DriverManager.getConnection(getURL(), properties);
return DriverManager.getConnection(getURL(databaseName), properties);

}

Expand Down

0 comments on commit 650767d

Please sign in to comment.