Skip to content

Commit

Permalink
bug: lastReceiveLSN not valid for logical replication
Browse files Browse the repository at this point in the history
Algorithm of calculate lastReceiveLSN are different
for logical and physical replication. For physical
replication it startLsn from XLogData plus payload
size. For logical replication as lastReceiveLSN
should be use startLSN from XLogData. Add to result
payload size not available, because from WAL reads not
RAW data - as a result logical decoding message size
can change and we get lsn from the future random
future transaction.
  • Loading branch information
Gordiychuk committed Apr 7, 2017
1 parent 941fa78 commit 44c808b
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 15 deletions.
Expand Up @@ -8,6 +8,7 @@
import org.postgresql.copy.CopyDual;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationType;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand All @@ -26,6 +27,7 @@ public class V3PGReplicationStream implements PGReplicationStream {
public static final long POSTGRES_EPOCH_2000_01_01 = 946684800000L;
private final CopyDual copyDual;
private final long updateInterval;
private final ReplicationType replicationType;
private long lastStatusUpdate;
private boolean closeFlag = false;

Expand All @@ -44,11 +46,14 @@ public class V3PGReplicationStream implements PGReplicationStream {
* completely, although an update will still be sent when requested by the
* server, to avoid timeout disconnect.
*/
public V3PGReplicationStream(CopyDual copyDual, LogSequenceNumber startLSN, long updateIntervalMs) {
public V3PGReplicationStream(CopyDual copyDual, LogSequenceNumber startLSN, long updateIntervalMs,
ReplicationType replicationType
) {
this.copyDual = copyDual;
this.updateInterval = updateIntervalMs;
this.lastStatusUpdate = System.currentTimeMillis() - updateIntervalMs;
this.lastReceiveLSN = startLSN;
this.replicationType = replicationType;
}

@Override
Expand Down Expand Up @@ -229,8 +234,15 @@ private ByteBuffer processXLogData(ByteBuffer buffer) {
lastServerLSN = LogSequenceNumber.valueOf(buffer.getLong());
long systemClock = buffer.getLong();

int payloadSize = buffer.limit() - buffer.position();
lastReceiveLSN = LogSequenceNumber.valueOf(startLsn + payloadSize);
switch (replicationType) {
case LOGICAL:
lastReceiveLSN = LogSequenceNumber.valueOf(startLsn);
break;
case PHYSICAL:
int payloadSize = buffer.limit() - buffer.position();
lastReceiveLSN = LogSequenceNumber.valueOf(startLsn + payloadSize);
break;
}

if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(Level.FINEST, " <=BE XLogData(currWal: {0}, lastServerWal: {1}, clock: {2})",
Expand Down
Expand Up @@ -10,6 +10,7 @@
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.ReplicationProtocol;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationType;
import org.postgresql.replication.fluent.CommonOptions;
import org.postgresql.replication.fluent.logical.LogicalReplicationOptions;
import org.postgresql.replication.fluent.physical.PhysicalReplicationOptions;
Expand Down Expand Up @@ -38,24 +39,30 @@ public PGReplicationStream startLogical(LogicalReplicationOptions options)
throws SQLException {

String query = createStartLogicalQuery(options);
return initializeReplication(query, options);
return initializeReplication(query, options, ReplicationType.LOGICAL);
}

public PGReplicationStream startPhysical(PhysicalReplicationOptions options)
throws SQLException {

String query = createStartPhysicalQuery(options);
return initializeReplication(query, options);
return initializeReplication(query, options, ReplicationType.PHYSICAL);
}

private PGReplicationStream initializeReplication(String query, CommonOptions options)
private PGReplicationStream initializeReplication(String query, CommonOptions options,
ReplicationType replicationType)
throws SQLException {
LOGGER.log(Level.FINEST, " FE=> StartReplication(query: {0})", query);

configureSocketTimeout(options);
CopyDual copyDual = (CopyDual) queryExecutor.startCopy(query, true);

return new V3PGReplicationStream(copyDual, options.getStartLSNPosition(), options.getStatusInterval());
return new V3PGReplicationStream(
copyDual,
options.getStartLSNPosition(),
options.getStatusInterval(),
replicationType
);
}

/**
Expand Down
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2017, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.replication;

public enum ReplicationType {
LOGICAL,
PHYSICAL
}
Expand Up @@ -87,18 +87,11 @@ public void testSentLocationEqualToLastReceiveLSN() throws Exception {
LogSequenceNumber lastReceivedLSN = stream.getLastReceiveLSN();
stream.forceUpdateStatus();

int lastPayloadSize =
received
.get(countMessage - 1)
.getBytes()
.length;

LogSequenceNumber waitLsn = LogSequenceNumber.valueOf(lastReceivedLSN.asLong() - lastPayloadSize);
LogSequenceNumber sentByServer = getSentLocationOnView();

assertThat("When changes absent on server last receive by stream LSN "
+ "should be equal to last sent by server LSN",
sentByServer, equalTo(waitLsn)
sentByServer, equalTo(lastReceivedLSN)
);
}

Expand Down
Expand Up @@ -204,6 +204,55 @@ public void testWalRecordCanBeRepeatBeRestartReplication() throws Exception {
);
}

@Test
public void restartPhysicalReplicationWithoutRepeatMessage() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;

LogSequenceNumber lsn = getCurrentLSN();

Statement st = sqlConnection.createStatement();
st.execute("insert into test_physic_table(name) values('first value')");
st.close();

PGReplicationStream stream =
pgConnection
.getReplicationAPI()
.replicationStream()
.physical()
.withSlotName(SLOT_NAME)
.withStartPosition(lsn)
.start();

byte[] streamOneFirstPart = toByteArray(stream.read());
LogSequenceNumber restartLSN = stream.getLastReceiveLSN();

st = sqlConnection.createStatement();
st.execute("insert into test_physic_table(name) values('second value')");
st.close();

byte[] streamOneSecondPart = toByteArray(stream.read());
stream.close();

//reopen stream
stream =
pgConnection
.getReplicationAPI()
.replicationStream()
.physical()
.withSlotName(SLOT_NAME)
.withStartPosition(restartLSN)
.start();

byte[] streamTwoFirstPart = toByteArray(stream.read());
stream.close();

boolean arrayEquals = Arrays.equals(streamOneSecondPart, streamTwoFirstPart);
assertThat("Interrupt physical replication and restart from lastReceiveLSN should not "
+ "lead to repeat messages skip part of them",
arrayEquals, CoreMatchers.equalTo(true)
);
}

private boolean isActiveOnView() throws SQLException {
boolean result = false;
Statement st = sqlConnection.createStatement();
Expand Down

0 comments on commit 44c808b

Please sign in to comment.