Skip to content
Permalink
Browse files

feat: Extend ReplicationCreateSlotBuilder DSL to support temporary re…

…plications slots (#1306)

* feat: Extend ReplicationCreateSlotBuilder DSL to support temporary replications slots

With Postgres 10 it is possible to create temporary replication slots.
The Postgres JDBC Driver provides a DSL to create replication slots, but there was no option to declare the slot as temporary.
This commit extends the DSL by a withTemporaryOption method.

BREAKING CHANGE: AbstractCreateSlotBuilder has no parameterless constructor anymore
Closes #1305

* refactor: Write tests in a more idiomatic way
  • Loading branch information...
PSanetra authored and davecramer committed Oct 6, 2018
1 parent 265f22b commit d514ceb502e7024cb302862880a8403bcd315ba3
@@ -5,10 +5,22 @@

package org.postgresql.replication.fluent;

import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.util.GT;

import java.sql.SQLFeatureNotSupportedException;

public abstract class AbstractCreateSlotBuilder<T extends ChainedCommonCreateSlotBuilder<T>>
implements ChainedCommonCreateSlotBuilder<T> {

protected String slotName;
protected boolean temporaryOption = false;
protected BaseConnection connection;

protected AbstractCreateSlotBuilder(BaseConnection connection) {
this.connection = connection;
}

protected abstract T self();

@@ -17,4 +29,17 @@ public T withSlotName(String slotName) {
this.slotName = slotName;
return self();
}

@Override
public T withTemporaryOption() throws SQLFeatureNotSupportedException {

if (!connection.haveMinimumServerVersion(ServerVersion.v10)) {
throw new SQLFeatureNotSupportedException(
GT.tr("Server does not support temporary replication slots")
);
}

this.temporaryOption = true;
return self();
}
}
@@ -6,6 +6,7 @@
package org.postgresql.replication.fluent;

import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;

/**
* Fluent interface for specify common parameters for create Logical and Physical replication slot.
@@ -22,6 +23,17 @@
*/
T withSlotName(String slotName);

/**
* <p>Temporary slots are not saved to disk and are automatically dropped on error or when
* the session has finished.</p>
*
* <p>This feature is only supported by PostgreSQL versions &gt;= 10.</p>
*
* @return T a slot builder
* @throws SQLFeatureNotSupportedException thrown if PostgreSQL version is less than 10.
*/
T withTemporaryOption() throws SQLFeatureNotSupportedException;

/**
* Create slot with specified parameters in database.
* @throws SQLException on error
@@ -15,10 +15,9 @@
extends AbstractCreateSlotBuilder<ChainedLogicalCreateSlotBuilder>
implements ChainedLogicalCreateSlotBuilder {
private String outputPlugin;
private BaseConnection connection;

public LogicalCreateSlotBuilder(BaseConnection connection) {
this.connection = connection;
super(connection);
}

@Override
@@ -45,7 +44,12 @@ public void make() throws SQLException {

Statement statement = connection.createStatement();
try {
statement.execute(String.format("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin));
statement.execute(String.format(
"CREATE_REPLICATION_SLOT %s %s LOGICAL %s",
slotName,
temporaryOption ? "TEMPORARY" : "",
outputPlugin
));
} finally {
statement.close();
}
@@ -14,10 +14,9 @@
public class PhysicalCreateSlotBuilder
extends AbstractCreateSlotBuilder<ChainedPhysicalCreateSlotBuilder>
implements ChainedPhysicalCreateSlotBuilder {
private BaseConnection connection;

public PhysicalCreateSlotBuilder(BaseConnection connection) {
this.connection = connection;
super(connection);
}

@Override
@@ -33,7 +32,11 @@ public void make() throws SQLException {

Statement statement = connection.createStatement();
try {
statement.execute(String.format("CREATE_REPLICATION_SLOT %s PHYSICAL", slotName));
statement.execute(String.format(
"CREATE_REPLICATION_SLOT %s %s PHYSICAL",
slotName,
temporaryOption ? "TEMPORARY" : ""
));
} finally {
statement.close();
}
@@ -7,9 +7,13 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.test.TestUtil;
import org.postgresql.test.util.rules.ServerVersionRule;
import org.postgresql.test.util.rules.annotation.HaveMinimalServerVersion;
@@ -23,6 +27,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.Properties;

@@ -80,7 +85,67 @@ public void testCreatePhysicalSlot() throws Exception {

boolean result = isPhysicalSlotExists(slotName);

assertThat(result, CoreMatchers.equalTo(true));
assertThat("Slot should exist", result, CoreMatchers.equalTo(true));

result = isSlotTemporary(slotName);

assertThat("Slot should not be temporary by default", result, CoreMatchers.equalTo(false));
}

@Test
public void testCreateTemporaryPhysicalSlotPg10AndHigher()
throws SQLException {
assumeTrue(TestUtil.haveMinimumServerVersion(replConnection, ServerVersion.v10));

BaseConnection baseConnection = (BaseConnection) replConnection;

String slotName = "pgjdbc_test_create_temporary_physical_replication_slot_pg_10_or_higher";

try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.physical()
.withSlotName(slotName)
.withTemporaryOption()
.make();

} catch (SQLFeatureNotSupportedException e) {

fail("PostgreSQL >= 10 should support temporary replication slots");

}

boolean result = isSlotTemporary(slotName);

assertThat("Slot is not temporary", result, CoreMatchers.equalTo(true));
}

@Test
public void testCreateTemporaryPhysicalSlotPgLowerThan10()
throws SQLException {
assumeFalse(TestUtil.haveMinimumServerVersion(replConnection, ServerVersion.v10));

BaseConnection baseConnection = (BaseConnection) replConnection;

String slotName = "pgjdbc_test_create_temporary_physical_replication_slot_pg_lower_than_10";

try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.physical()
.withSlotName(slotName)
.withTemporaryOption()
.make();

fail("PostgreSQL < 10 does not support temporary replication slots");

} catch (SQLFeatureNotSupportedException e) {
// success
}
}

@Test
@@ -151,7 +216,69 @@ public void testCreateLogicalSlot() throws Exception {

boolean result = isLogicalSlotExists(slotName);

assertThat(result, CoreMatchers.equalTo(true));
assertThat("Slot should exist", result, CoreMatchers.equalTo(true));

result = isSlotTemporary(slotName);

assertThat("Slot should not be temporary by default", result, CoreMatchers.equalTo(false));
}

@Test
public void testCreateTemporaryLogicalSlotPg10AndHigher()
throws SQLException {
assumeTrue(TestUtil.haveMinimumServerVersion(replConnection, ServerVersion.v10));

BaseConnection baseConnection = (BaseConnection) replConnection;

String slotName = "pgjdbc_test_create_temporary_logical_replication_slot_pg_10_or_higher";

try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(slotName)
.withOutputPlugin("test_decoding")
.withTemporaryOption()
.make();

} catch (SQLFeatureNotSupportedException e) {

fail("PostgreSQL >= 10 should support temporary replication slots");

}

boolean result = isSlotTemporary(slotName);

assertThat("Slot is not temporary", result, CoreMatchers.equalTo(true));
}

@Test
public void testCreateTemporaryLogicalSlotPgLowerThan10()
throws SQLException {
assumeFalse(TestUtil.haveMinimumServerVersion(replConnection, ServerVersion.v10));

BaseConnection baseConnection = (BaseConnection) replConnection;

String slotName = "pgjdbc_test_create_temporary_logical_replication_slot_pg_lower_than_10";

try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(slotName)
.withOutputPlugin("test_decoding")
.withTemporaryOption()
.make();

fail("PostgreSQL < 10 does not support temporary replication slots");

} catch (SQLFeatureNotSupportedException e) {
// success
}
}

@Test
@@ -205,6 +332,23 @@ private boolean isLogicalSlotExists(String slotName) throws SQLException {
return result;
}

private boolean isSlotTemporary(String slotName) throws SQLException {
if (!TestUtil.haveMinimumServerVersion(sqlConnection, ServerVersion.v10)) {
return false;
}

boolean result;

Statement st = sqlConnection.createStatement();
ResultSet resultSet = st.executeQuery(
"select 1 from pg_replication_slots where slot_name = '" + slotName
+ "' and temporary = true");
result = resultSet.next();
resultSet.close();
st.close();
return result;
}

private void dropReplicationSlot() throws Exception {
if (slotName != null) {
TestUtil.dropReplicationSlot(sqlConnection, slotName);

0 comments on commit d514ceb

Please sign in to comment.
You can’t perform that action at this time.