Skip to content

Commit

Permalink
feat: Extend ReplicationCreateSlotBuilder DSL to support temporary re…
Browse files Browse the repository at this point in the history
…plications slots

With Postgres 10 it is possible to create temporary replication slots.
The Postgres JDBC Driver provides a DSL to create replication slots, but there was no option to declare the slot as temporary.
This commit extends the DSL by a withTemporaryOption method.

BREAKING CHANGE: AbstractCreateSlotBuilder has no parameterless constructor anymore
Closes #1305
  • Loading branch information
PSanetra committed Oct 5, 2018
1 parent 265f22b commit 076c242
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@

package org.postgresql.replication.fluent;

import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.util.GT;

import java.sql.SQLFeatureNotSupportedException;

public abstract class AbstractCreateSlotBuilder<T extends ChainedCommonCreateSlotBuilder<T>>
implements ChainedCommonCreateSlotBuilder<T> {

protected String slotName;
protected boolean temporaryOption = false;
protected BaseConnection connection;

protected AbstractCreateSlotBuilder(BaseConnection connection) {
this.connection = connection;
}

protected abstract T self();

Expand All @@ -17,4 +29,17 @@ public T withSlotName(String slotName) {
this.slotName = slotName;
return self();
}

@Override
public T withTemporaryOption() throws SQLFeatureNotSupportedException {

if (!connection.haveMinimumServerVersion(ServerVersion.v10)) {
throw new SQLFeatureNotSupportedException(
GT.tr("Server does not support temporary replication slots")
);
}

this.temporaryOption = true;
return self();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.replication.fluent;

import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;

/**
* Fluent interface for specify common parameters for create Logical and Physical replication slot.
Expand All @@ -22,6 +23,17 @@ public interface ChainedCommonCreateSlotBuilder<T extends ChainedCommonCreateSlo
*/
T withSlotName(String slotName);

/**
* <p>Temporary slots are not saved to disk and are automatically dropped on error or when
* the session has finished.</p>
*
* <p>This feature is only supported by PostgreSQL versions &gt;= 10.</p>
*
* @return T a slot builder
* @throws SQLFeatureNotSupportedException thrown if PostgreSQL version is less than 10.
*/
T withTemporaryOption() throws SQLFeatureNotSupportedException;

/**
* Create slot with specified parameters in database.
* @throws SQLException on error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ public class LogicalCreateSlotBuilder
extends AbstractCreateSlotBuilder<ChainedLogicalCreateSlotBuilder>
implements ChainedLogicalCreateSlotBuilder {
private String outputPlugin;
private BaseConnection connection;

public LogicalCreateSlotBuilder(BaseConnection connection) {
this.connection = connection;
super(connection);
}

@Override
Expand All @@ -45,7 +44,12 @@ public void make() throws SQLException {

Statement statement = connection.createStatement();
try {
statement.execute(String.format("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin));
statement.execute(String.format(
"CREATE_REPLICATION_SLOT %s %s LOGICAL %s",
slotName,
temporaryOption ? "TEMPORARY" : "",
outputPlugin
));
} finally {
statement.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
public class PhysicalCreateSlotBuilder
extends AbstractCreateSlotBuilder<ChainedPhysicalCreateSlotBuilder>
implements ChainedPhysicalCreateSlotBuilder {
private BaseConnection connection;

public PhysicalCreateSlotBuilder(BaseConnection connection) {
this.connection = connection;
super(connection);
}

@Override
Expand All @@ -33,7 +32,11 @@ public void make() throws SQLException {

Statement statement = connection.createStatement();
try {
statement.execute(String.format("CREATE_REPLICATION_SLOT %s PHYSICAL", slotName));
statement.execute(String.format(
"CREATE_REPLICATION_SLOT %s %s PHYSICAL",
slotName,
temporaryOption ? "TEMPORARY" : ""
));
} finally {
statement.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.test.TestUtil;
import org.postgresql.test.util.rules.ServerVersionRule;
import org.postgresql.test.util.rules.annotation.HaveMinimalServerVersion;
Expand All @@ -23,6 +25,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.Properties;

Expand Down Expand Up @@ -80,9 +83,68 @@ public void testCreatePhysicalSlot() throws Exception {

boolean result = isPhysicalSlotExists(slotName);

assertThat("Slot should exist", result, CoreMatchers.equalTo(true));

result = isSlotTemporary(slotName);

assertThat("Slot should not be temporary by default", result, CoreMatchers.equalTo(false));
}

@Test
public void testCreateTemporaryPhysicalSlot() throws Exception {
BaseConnection baseConnection = (BaseConnection) replConnection;

slotName = "pgjdbc_test_create_temporary_physical_replication_slot";

if (TestUtil.haveMinimumServerVersion(baseConnection, ServerVersion.v10)) {
createTemporaryPhysicalSlotPg10AndHigher(baseConnection, slotName);
} else {
createTemporaryPhysicalSlotPgLowerThan10(baseConnection, slotName);
}
}

private void createTemporaryPhysicalSlotPg10AndHigher(BaseConnection baseConnection, String slotName)
throws SQLException {
try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.physical()
.withSlotName(slotName)
.withTemporaryOption()
.make();

} catch (SQLFeatureNotSupportedException e) {

fail("PostgreSQL >= 10 should support temporary replication slots");

}

boolean result = isSlotTemporary(slotName);

assertThat(result, CoreMatchers.equalTo(true));
}

private void createTemporaryPhysicalSlotPgLowerThan10(BaseConnection baseConnection, String slotName)
throws SQLException {
try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.physical()
.withSlotName(slotName)
.withTemporaryOption()
.make();

fail("PostgreSQL < 10 does not support temporary replication slots");

} catch (SQLFeatureNotSupportedException e) {
// success
}
}

@Test
public void testDropPhysicalSlot() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;
Expand Down Expand Up @@ -151,9 +213,70 @@ public void testCreateLogicalSlot() throws Exception {

boolean result = isLogicalSlotExists(slotName);

assertThat("Slot should exist", result, CoreMatchers.equalTo(true));

result = isSlotTemporary(slotName);

assertThat("Slot should not be temporary by default", result, CoreMatchers.equalTo(false));
}

@Test
public void testCreateTemporaryLogicalSlot() throws Exception {
BaseConnection baseConnection = (BaseConnection) replConnection;

slotName = "pgjdbc_test_create_temporary_logical_replication_slot";

if (TestUtil.haveMinimumServerVersion(baseConnection, ServerVersion.v10)) {
createTemporaryLogicalSlotPg10AndHigher(baseConnection, slotName);
} else {
createTemporaryLogicalSlotPgLowerThan10(baseConnection, slotName);
}
}

private void createTemporaryLogicalSlotPg10AndHigher(BaseConnection baseConnection, String slotName)
throws SQLException {
try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(slotName)
.withOutputPlugin("test_decoding")
.withTemporaryOption()
.make();

} catch (SQLFeatureNotSupportedException e) {

fail("PostgreSQL >= 10 should support temporary replication slots");

}

boolean result = isSlotTemporary(slotName);

assertThat(result, CoreMatchers.equalTo(true));
}

private void createTemporaryLogicalSlotPgLowerThan10(BaseConnection baseConnection, String slotName)
throws SQLException {
try {

baseConnection
.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(slotName)
.withOutputPlugin("test_decoding")
.withTemporaryOption()
.make();

fail("PostgreSQL < 10 does not support temporary replication slots");

} catch (SQLFeatureNotSupportedException e) {
// success
}
}

@Test
public void testDropLogicalSlot() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;
Expand Down Expand Up @@ -205,6 +328,23 @@ private boolean isLogicalSlotExists(String slotName) throws SQLException {
return result;
}

private boolean isSlotTemporary(String slotName) throws SQLException {
if (!TestUtil.haveMinimumServerVersion(sqlConnection, ServerVersion.v10)) {
return false;
}

boolean result;

Statement st = sqlConnection.createStatement();
ResultSet resultSet = st.executeQuery(
"select 1 from pg_replication_slots where slot_name = '" + slotName
+ "' and temporary = true");
result = resultSet.next();
resultSet.close();
st.close();
return result;
}

private void dropReplicationSlot() throws Exception {
if (slotName != null) {
TestUtil.dropReplicationSlot(sqlConnection, slotName);
Expand Down

0 comments on commit 076c242

Please sign in to comment.