Permalink
Browse files

test: fix drop replication slot on 9.4 for tests (#696)

Postgresql 9.4 doesn't have PID in pg_replication_slots view, so we can't
terminate replication session. Instead of it, we wait until postgresql
after `replicationConnection.close()` change replication slot status to
not active.

```
testReplicationRestartFromLastFeedbackPosition(org.postgresql.replication.LogicalReplicationTest)  Time elapsed: 0.038 sec  <<< ERROR!
org.postgresql.util.PSQLException: ERROR: column "active_pid" does not exist
  Position: 29
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2480)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2180)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:294)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:430)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:356)
	at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:303)
	at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:289)
	at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:266)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:262)
	at org.postgresql.replication.LogicalReplicationTest.dropReplicationSlot(LogicalReplicationTest.java:97)
	at org.postgresql.replication.LogicalReplicationTest.tearDown(LogicalReplicationTest.java:84)
```

Current solution it's WA on bug describe in
https://www.postgresql.org/message-id/CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w%40mail.gmail.com

* Add timeout on wait close replication slot
  • Loading branch information...
Gordiychuk authored and vlsi committed Nov 26, 2016
1 parent f48c6bb commit c1c48bd705f04e8d1b1cb8b998dda0a6893f891d
@@ -15,8 +15,6 @@
import org.postgresql.test.TestUtil;
import org.postgresql.test.util.rules.ServerVersionRule;
import org.postgresql.test.util.rules.annotation.HaveMinimalServerVersion;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;

import org.junit.After;
import org.junit.Before;
@@ -51,41 +49,18 @@ public void setUp() throws Exception {
replicationConnection = openReplicationConnection();
TestUtil.createTable(sqlConnection, "test_logic_table",
"pk serial primary key, name varchar(100)");
Statement st = sqlConnection.createStatement();
st.execute(
"SELECT * FROM pg_create_logical_replication_slot('" + SLOT_NAME + "', 'test_decoding')");
st.close();

TestUtil.recreateLogicalReplicationSlot(sqlConnection, SLOT_NAME, "test_decoding");
}

@After
public void tearDown() throws Exception {
replicationConnection.close();
TestUtil.dropTable(sqlConnection, "test_logic_table");

dropReplicationSlot();
TestUtil.dropReplicationSlot(sqlConnection, SLOT_NAME);
sqlConnection.close();
}

private void dropReplicationSlot() throws SQLException {
try {
Statement dropStatement = sqlConnection.createStatement();
dropStatement.execute("select pg_drop_replication_slot('" + SLOT_NAME + "')");
dropStatement.close();
} catch (PSQLException e) {
//slot is active
if (PSQLState.OBJECT_IN_USE.equals(new PSQLState(e.getSQLState()))) {
Statement terminateStatement = sqlConnection.createStatement();
terminateStatement.execute("select pg_terminate_backend(active_pid) from pg_replication_slots "
+ "where active = true and slot_name='" + SLOT_NAME + "'"
);
terminateStatement.close();
dropReplicationSlot();
} else {
throw e;
}
}
}

@Test()
public void testSentLocationEqualToLastReceiveLSN() throws Exception {
PGConnection pgConnection = (PGConnection) replicationConnection;
@@ -70,42 +70,18 @@ public void setUp() throws Exception {
replConnection = openReplicationConnection();
TestUtil.createTable(sqlConnection, "test_logic_table",
"pk serial primary key, name varchar(100)");
Statement st = sqlConnection.createStatement();
st.execute(
"SELECT * FROM pg_create_logical_replication_slot('" + SLOT_NAME + "', 'test_decoding')");
st.close();

TestUtil.recreateLogicalReplicationSlot(sqlConnection, SLOT_NAME, "test_decoding");
}

@After
public void tearDown() throws Exception {
replConnection.close();
TestUtil.dropTable(sqlConnection, "test_logic_table");

dropReplicationSlot();
TestUtil.dropReplicationSlot(sqlConnection, SLOT_NAME);
sqlConnection.close();
}

private void dropReplicationSlot() throws SQLException {
try {
Statement dropStatement = sqlConnection.createStatement();
dropStatement.execute("select pg_drop_replication_slot('" + SLOT_NAME + "')");
dropStatement.close();
} catch (PSQLException e) {
//slot is active
if (PSQLState.OBJECT_IN_USE.equals(new PSQLState(e.getSQLState()))) {
Statement terminateStatement = sqlConnection.createStatement();
terminateStatement.execute(
"select pg_terminate_backend(active_pid) from pg_replication_slots "
+ "where active = true and slot_name='" + SLOT_NAME + "'"
);
terminateStatement.close();
dropReplicationSlot();
} else {
throw e;
}
}
}

@Test(timeout = 1000)
public void testNotAvailableStartNotExistReplicationSlot() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;
@@ -14,8 +14,6 @@
import org.postgresql.test.TestUtil;
import org.postgresql.test.util.rules.ServerVersionRule;
import org.postgresql.test.util.rules.annotation.HaveMinimalServerVersion;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;

import org.hamcrest.CoreMatchers;
import org.junit.After;
@@ -50,41 +48,17 @@ public void setUp() throws Exception {
replConnection = openReplicationConnection();
TestUtil.createTable(sqlConnection, "test_physic_table",
"pk serial primary key, name varchar(100)");

Statement st = sqlConnection.createStatement();
st.execute("SELECT * FROM pg_create_physical_replication_slot('" + SLOT_NAME + "')");
st.close();
TestUtil.recreatePhysicalReplicationSlot(sqlConnection, SLOT_NAME);
}

@After
public void tearDown() throws Exception {
replConnection.close();
TestUtil.dropTable(sqlConnection, "test_physic_table");
dropReplicationSlot();

TestUtil.dropReplicationSlot(sqlConnection, SLOT_NAME);
sqlConnection.close();
}

private void dropReplicationSlot() throws SQLException {
try {
Statement dropStatement = sqlConnection.createStatement();
dropStatement.execute("select pg_drop_replication_slot('" + SLOT_NAME + "')");
dropStatement.close();
} catch (PSQLException e) {
//slot is active
if (PSQLState.OBJECT_IN_USE.equals(new PSQLState(e.getSQLState()))) {
Statement terminateStatement = sqlConnection.createStatement();
terminateStatement.execute("select pg_terminate_backend(active_pid) from pg_replication_slots "
+ "where active = true and slot_name='" + SLOT_NAME + "'"
);
terminateStatement.close();
dropReplicationSlot();
} else {
throw e;
}
}
}

@Test
public void testReceiveChangesWithoutReplicationSlot() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;
@@ -206,11 +206,9 @@ private boolean isLogicalSlotExists(String slotName) throws SQLException {
return result;
}

private void dropReplicationSlot() throws SQLException {
private void dropReplicationSlot() throws Exception {
if (slotName != null) {
Statement dropStatement = sqlConnection.createStatement();
dropStatement.execute("select pg_drop_replication_slot('" + slotName + "')");
dropStatement.close();
TestUtil.dropReplicationSlot(sqlConnection, slotName);
}
}

@@ -6,6 +6,7 @@
package org.postgresql.test;

import org.postgresql.PGProperty;
import org.postgresql.core.ServerVersion;
import org.postgresql.core.Version;
import org.postgresql.jdbc.PgConnection;

@@ -22,6 +23,8 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Utility class for JDBC tests
@@ -722,4 +725,100 @@ public static void closeQuietly(ResultSet rs) {
}
}
}

public static void recreateLogicalReplicationSlot(Connection connection, String slotName, String outputPlugin)
throws SQLException, InterruptedException, TimeoutException {
//drop previos slot
dropReplicationSlot(connection, slotName);

PreparedStatement stm = null;
try {
stm = connection.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)");
stm.setString(1, slotName);
stm.setString(2, outputPlugin);
stm.execute();
} finally {
closeQuietly(stm);
}
}

public static void recreatePhysicalReplicationSlot(Connection connection, String slotName)
throws SQLException, InterruptedException, TimeoutException {
//drop previos slot
dropReplicationSlot(connection, slotName);

PreparedStatement stm = null;
try {
stm = connection.prepareStatement("SELECT * FROM pg_create_physical_replication_slot(?)");
stm.setString(1, slotName);
stm.execute();
} finally {
closeQuietly(stm);
}
}

public static void dropReplicationSlot(Connection connection, String slotName)
throws SQLException, InterruptedException, TimeoutException {
if (haveMinimumServerVersion(connection, ServerVersion.v9_5)) {
PreparedStatement stm = null;
try {
stm = connection.prepareStatement(
"select pg_terminate_backend(active_pid) from pg_replication_slots "
+ "where active = true and slot_name = ?");
stm.setString(1, slotName);
stm.execute();
} finally {
closeQuietly(stm);
}
}

waitStopReplicationSlot(connection, slotName);

PreparedStatement stm = null;
try {
stm = connection.prepareStatement(
"select pg_drop_replication_slot(slot_name) "
+ "from pg_replication_slots where slot_name = ?");
stm.setString(1, slotName);
stm.execute();
} finally {
closeQuietly(stm);
}
}

public static boolean isReplicationSlotActive(Connection connection, String slotName)
throws SQLException {
PreparedStatement stm = null;
ResultSet rs = null;

try {
stm =
connection.prepareStatement("select active from pg_replication_slots where slot_name = ?");
stm.setString(1, slotName);
rs = stm.executeQuery();
return rs.next() && rs.getBoolean(1);
} finally {
closeQuietly(rs);
closeQuietly(stm);
}
}

private static void waitStopReplicationSlot(Connection connection, String slotName)
throws InterruptedException, TimeoutException, SQLException {
long startWaitTime = System.currentTimeMillis();
boolean stillActive;
long timeInWait = 0;

do {
stillActive = isReplicationSlotActive(connection, slotName);
if (stillActive) {
TimeUnit.MILLISECONDS.sleep(100L);
timeInWait = System.currentTimeMillis() - startWaitTime;
}
} while (stillActive && timeInWait <= 30000);

if (stillActive) {
throw new TimeoutException("Wait stop replication slot " + timeInWait + " timeout occurs");
}
}
}

0 comments on commit c1c48bd

Please sign in to comment.