Permalink
Browse files

Bug: Not valid calculate lastReceiveLSN for logical replication (#801)

* Bug: Not valid receiveLSN that lead to lost parallel transactions

Add test that reproduce issue from
https://www.postgresql.org/message-id/CAHHbV7V4XvdHGw_jpR9Xyq3fz%3Df%2BO4oa%2B73sbizGTv_AvmDXhQ%40mail.gmail.com

* bug: lastReceiveLSN not valid for logical replication

Logical and Physical replication use different algorithms
to calculate the lastReceiveLSN. 
For physical replication the calculation is:
startLsn from XLogData plus the payloadsize; this is correct as we have the raw data. 
For logical replication the lastReceiveLSN uses startLSN from XLogData without the payload size as payload size is not available as a result logical decoding message size
can change and we get LSN from the future random future transaction.
  • Loading branch information...
Gordiychuk authored and davecramer committed Apr 7, 2017
1 parent 3883a84 commit 170d9c27810349456e56aff1c36a0ad6584b9e28
@@ -8,6 +8,7 @@
import org.postgresql.copy.CopyDual;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationType;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
@@ -26,6 +27,7 @@
public static final long POSTGRES_EPOCH_2000_01_01 = 946684800000L;
private final CopyDual copyDual;
private final long updateInterval;
private final ReplicationType replicationType;
private long lastStatusUpdate;
private boolean closeFlag = false;
@@ -44,11 +46,14 @@
* completely, although an update will still be sent when requested by the
* server, to avoid timeout disconnect.
*/
public V3PGReplicationStream(CopyDual copyDual, LogSequenceNumber startLSN, long updateIntervalMs) {
public V3PGReplicationStream(CopyDual copyDual, LogSequenceNumber startLSN, long updateIntervalMs,
ReplicationType replicationType
) {
this.copyDual = copyDual;
this.updateInterval = updateIntervalMs;
this.lastStatusUpdate = System.currentTimeMillis() - updateIntervalMs;
this.lastReceiveLSN = startLSN;
this.replicationType = replicationType;
}
@Override
@@ -229,8 +234,15 @@ private ByteBuffer processXLogData(ByteBuffer buffer) {
lastServerLSN = LogSequenceNumber.valueOf(buffer.getLong());
long systemClock = buffer.getLong();
int payloadSize = buffer.limit() - buffer.position();
lastReceiveLSN = LogSequenceNumber.valueOf(startLsn + payloadSize);
switch (replicationType) {
case LOGICAL:
lastReceiveLSN = LogSequenceNumber.valueOf(startLsn);
break;
case PHYSICAL:
int payloadSize = buffer.limit() - buffer.position();
lastReceiveLSN = LogSequenceNumber.valueOf(startLsn + payloadSize);
break;
}
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(Level.FINEST, " <=BE XLogData(currWal: {0}, lastServerWal: {1}, clock: {2})",
@@ -10,6 +10,7 @@
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.ReplicationProtocol;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationType;
import org.postgresql.replication.fluent.CommonOptions;
import org.postgresql.replication.fluent.logical.LogicalReplicationOptions;
import org.postgresql.replication.fluent.physical.PhysicalReplicationOptions;
@@ -38,24 +39,30 @@ public PGReplicationStream startLogical(LogicalReplicationOptions options)
throws SQLException {
String query = createStartLogicalQuery(options);
return initializeReplication(query, options);
return initializeReplication(query, options, ReplicationType.LOGICAL);
}
public PGReplicationStream startPhysical(PhysicalReplicationOptions options)
throws SQLException {
String query = createStartPhysicalQuery(options);
return initializeReplication(query, options);
return initializeReplication(query, options, ReplicationType.PHYSICAL);
}
private PGReplicationStream initializeReplication(String query, CommonOptions options)
private PGReplicationStream initializeReplication(String query, CommonOptions options,
ReplicationType replicationType)
throws SQLException {
LOGGER.log(Level.FINEST, " FE=> StartReplication(query: {0})", query);
configureSocketTimeout(options);
CopyDual copyDual = (CopyDual) queryExecutor.startCopy(query, true);
return new V3PGReplicationStream(copyDual, options.getStartLSNPosition(), options.getStatusInterval());
return new V3PGReplicationStream(
copyDual,
options.getStartLSNPosition(),
options.getStatusInterval(),
replicationType
);
}
/**
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2017, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/
package org.postgresql.replication;
public enum ReplicationType {
LOGICAL,
PHYSICAL
}
@@ -87,18 +87,11 @@ public void testSentLocationEqualToLastReceiveLSN() throws Exception {
LogSequenceNumber lastReceivedLSN = stream.getLastReceiveLSN();
stream.forceUpdateStatus();
int lastPayloadSize =
received
.get(countMessage - 1)
.getBytes()
.length;
LogSequenceNumber waitLsn = LogSequenceNumber.valueOf(lastReceivedLSN.asLong() - lastPayloadSize);
LogSequenceNumber sentByServer = getSentLocationOnView();
assertThat("When changes absent on server last receive by stream LSN "
+ "should be equal to last sent by server LSN",
sentByServer, equalTo(waitLsn)
sentByServer, equalTo(lastReceivedLSN)
);
}
@@ -746,6 +746,91 @@ public void testReplicationRestartFromLastFeedbackPosition() throws Exception {
result, equalTo(wait));
}
@Test
public void testReplicationRestartFromLastFeedbackPositionParallelTransaction() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;
LogSequenceNumber startLSN = getCurrentLSN();
PGReplicationStream stream =
pgConnection
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(SLOT_NAME)
.withStartPosition(startLSN)
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.start();
Connection tx1Connection = TestUtil.openDB();
tx1Connection.setAutoCommit(false);
Connection tx2Connection = TestUtil.openDB();
tx2Connection.setAutoCommit(false);
Statement stTx1 = tx1Connection.createStatement();
Statement stTx2 = tx2Connection.createStatement();
stTx1.execute("BEGIN");
stTx2.execute("BEGIN");
stTx1.execute("insert into test_logic_table(name) values('first tx changes')");
stTx2.execute("insert into test_logic_table(name) values('second tx changes')");
tx1Connection.commit();
tx2Connection.commit();
tx1Connection.close();
tx2Connection.close();
List<String> consumedData = new ArrayList<String>();
consumedData.addAll(receiveMessageWithoutBlock(stream, 3));
stream.setFlushedLSN(stream.getLastReceiveLSN());
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.forceUpdateStatus();
//emulate replication break
replConnection.close();
waitStopReplicationSlot();
replConnection = openReplicationConnection();
pgConnection = (PGConnection) replConnection;
stream =
pgConnection
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(SLOT_NAME)
.withStartPosition(LogSequenceNumber.INVALID_LSN) /* Invalid LSN indicate for start from restart lsn */
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.start();
Statement st = sqlConnection.createStatement();
st.execute("insert into test_logic_table(name) values('third tx changes')");
st.close();
consumedData.addAll(receiveMessageWithoutBlock(stream, 3));
String result = group(consumedData);
String wait = group(Arrays.asList(
"BEGIN",
"table public.test_logic_table: INSERT: pk[integer]:1 name[character varying]:'first tx changes'",
"COMMIT",
"BEGIN",
"table public.test_logic_table: INSERT: pk[integer]:2 name[character varying]:'second tx changes'",
"COMMIT"
));
assertThat(
"When we add feedback about applied lsn to replication stream(in this case it's force update status)"
+ "after restart consume changes via this slot should be started from last success lsn that "
+ "we send before via force status update, that why we wait consume both transaction without duplicates",
result, equalTo(wait));
}
private void waitStopReplicationSlot() throws SQLException, InterruptedException {
while (true) {
PreparedStatement statement =
@@ -204,6 +204,55 @@ public void testWalRecordCanBeRepeatBeRestartReplication() throws Exception {
);
}
@Test
public void restartPhysicalReplicationWithoutRepeatMessage() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;
LogSequenceNumber lsn = getCurrentLSN();
Statement st = sqlConnection.createStatement();
st.execute("insert into test_physic_table(name) values('first value')");
st.close();
PGReplicationStream stream =
pgConnection
.getReplicationAPI()
.replicationStream()
.physical()
.withSlotName(SLOT_NAME)
.withStartPosition(lsn)
.start();
byte[] streamOneFirstPart = toByteArray(stream.read());
LogSequenceNumber restartLSN = stream.getLastReceiveLSN();
st = sqlConnection.createStatement();
st.execute("insert into test_physic_table(name) values('second value')");
st.close();
byte[] streamOneSecondPart = toByteArray(stream.read());
stream.close();
//reopen stream
stream =
pgConnection
.getReplicationAPI()
.replicationStream()
.physical()
.withSlotName(SLOT_NAME)
.withStartPosition(restartLSN)
.start();
byte[] streamTwoFirstPart = toByteArray(stream.read());
stream.close();
boolean arrayEquals = Arrays.equals(streamOneSecondPart, streamTwoFirstPart);
assertThat("Interrupt physical replication and restart from lastReceiveLSN should not "
+ "lead to repeat messages skip part of them",
arrayEquals, CoreMatchers.equalTo(true)
);
}
private boolean isActiveOnView() throws SQLException {
boolean result = false;
Statement st = sqlConnection.createStatement();

0 comments on commit 170d9c2

Please sign in to comment.