Skip to content
Permalink
Browse files

feat: return info on create slot of replication (#1335)

The CREATE_REPLICATION_SLOT replication command returns the following
info: slot_name, consistent_point, snapshot_name, output_plugin.

This info can be valuable, in particular snapshot_name is exported
to allow a consistent snapshot in some uses cases that require it.

A new class ReplicationSlotInfo is being returned now to provide easy
access to the values.
  • Loading branch information...
jorsol authored and davecramer committed Nov 22, 2018
1 parent 7f0e200 commit 84e8d90b4bbeecbdccbe7ec4d165cfaf3ef30bf4
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2018, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.replication;

/**
* Information returned on replication slot creation.
*
* <p>Returned keys of CREATE_REPLICATION_SLOT:
* <ol>
* <li><b>slot_name</b> String {@code =>} the slot name
* <li><b>consistent_point</b> String {@code =>} LSN at which we became consistent
* <li><b>snapshot_name</b> String {@code =>} exported snapshot's name (may be <code>null</code>)
* <li><b>output_plugin</b> String {@code =>} output plugin (may be <code>null</code>)
* </ol>
*/
public final class ReplicationSlotInfo {

private final String slotName;
private final ReplicationType replicationType;
private final LogSequenceNumber consistentPoint;
private final String snapshotName;
private final String outputPlugin;

public ReplicationSlotInfo(String slotName, ReplicationType replicationType,
LogSequenceNumber consistentPoint, String snapshotName, String outputPlugin) {
this.slotName = slotName;
this.replicationType = replicationType;
this.consistentPoint = consistentPoint;
this.snapshotName = snapshotName;
this.outputPlugin = outputPlugin;
}

/**
* Replication slot name.
*
* @return the slot name
*/
public String getSlotName() {
return slotName;
}

/**
* Replication type of the slot created, might be PHYSICAL or LOGICAL.
*
* @return ReplicationType, PHYSICAL or LOGICAL
*/
public ReplicationType getReplicationType() {
return replicationType;
}

/**
* LSN at which we became consistent.
*
* @return LogSequenceNumber with the consistent_point
*/
public LogSequenceNumber getConsistentPoint() {
return consistentPoint;
}

/**
* Exported snapshot name at the point of replication slot creation.
*
* <p>As long as the exporting transaction remains open, other transactions can import its snapshot,
* and thereby be guaranteed that they see exactly the same view of the database that the first
* transaction sees.
*
* @return exported snapshot_name (may be <code>null</code>)
*/
public String getSnapshotName() {
return snapshotName;
}

/**
* Output Plugin used on slot creation.
*
* @return output_plugin (may be <code>null</code>)
*/
public String getOutputPlugin() {
return outputPlugin;
}

}
@@ -5,13 +5,15 @@

package org.postgresql.replication.fluent;

import org.postgresql.replication.ReplicationSlotInfo;

import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;

/**
* Fluent interface for specify common parameters for create Logical and Physical replication slot.
*/
public interface ChainedCommonCreateSlotBuilder<T extends ChainedCommonCreateSlotBuilder<T>> {
public interface ChainedCommonCreateSlotBuilder<T extends ChainedCommonCreateSlotBuilder<T>> {

/**
* Replication slots provide an automated way to ensure that the master does not remove WAL
@@ -36,7 +38,9 @@

/**
* Create slot with specified parameters in database.
*
* @return ReplicationSlotInfo with the information of the created slot.
* @throws SQLException on error
*/
void make() throws SQLException;
ReplicationSlotInfo make() throws SQLException;
}
@@ -6,14 +6,19 @@
package org.postgresql.replication.fluent.logical;

import org.postgresql.core.BaseConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.ReplicationSlotInfo;
import org.postgresql.replication.ReplicationType;
import org.postgresql.replication.fluent.AbstractCreateSlotBuilder;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class LogicalCreateSlotBuilder
extends AbstractCreateSlotBuilder<ChainedLogicalCreateSlotBuilder>
implements ChainedLogicalCreateSlotBuilder {

private String outputPlugin;

public LogicalCreateSlotBuilder(BaseConnection connection) {
@@ -32,7 +37,7 @@ public ChainedLogicalCreateSlotBuilder withOutputPlugin(String outputPlugin) {
}

@Override
public void make() throws SQLException {
public ReplicationSlotInfo make() throws SQLException {
if (outputPlugin == null || outputPlugin.isEmpty()) {
throw new IllegalArgumentException(
"OutputPlugin required parameter for logical replication slot");
@@ -43,15 +48,30 @@ public void make() throws SQLException {
}

Statement statement = connection.createStatement();
ResultSet result = null;
ReplicationSlotInfo slotInfo = null;
try {
statement.execute(String.format(
"CREATE_REPLICATION_SLOT %s %s LOGICAL %s",
slotName,
temporaryOption ? "TEMPORARY" : "",
outputPlugin
"CREATE_REPLICATION_SLOT %s %s LOGICAL %s",
slotName,
temporaryOption ? "TEMPORARY" : "",
outputPlugin
));
result = statement.getResultSet();
if (result != null && result.next()) {
slotInfo = new ReplicationSlotInfo(
result.getString("slot_name"),
ReplicationType.LOGICAL,
LogSequenceNumber.valueOf(result.getString("consistent_point")),
result.getString("snapshot_name"),
result.getString("output_plugin"));
}
} finally {
if (result != null) {
result.close();
}
statement.close();
}
return slotInfo;
}
}
@@ -6,8 +6,12 @@
package org.postgresql.replication.fluent.physical;

import org.postgresql.core.BaseConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.ReplicationSlotInfo;
import org.postgresql.replication.ReplicationType;
import org.postgresql.replication.fluent.AbstractCreateSlotBuilder;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

@@ -25,20 +29,35 @@ protected ChainedPhysicalCreateSlotBuilder self() {
}

@Override
public void make() throws SQLException {
public ReplicationSlotInfo make() throws SQLException {
if (slotName == null || slotName.isEmpty()) {
throw new IllegalArgumentException("Replication slotName can't be null");
}

Statement statement = connection.createStatement();
ResultSet result = null;
ReplicationSlotInfo slotInfo = null;
try {
statement.execute(String.format(
"CREATE_REPLICATION_SLOT %s %s PHYSICAL",
slotName,
temporaryOption ? "TEMPORARY" : ""
"CREATE_REPLICATION_SLOT %s %s PHYSICAL",
slotName,
temporaryOption ? "TEMPORARY" : ""
));
result = statement.getResultSet();
if (result != null && result.next()) {
slotInfo = new ReplicationSlotInfo(
result.getString("slot_name"),
ReplicationType.PHYSICAL,
LogSequenceNumber.valueOf(result.getString("consistent_point")),
result.getString("snapshot_name"),
result.getString("output_plugin"));
}
} finally {
if (result != null) {
result.close();
}
statement.close();
}
return slotInfo;
}
}
@@ -6,6 +6,9 @@
package org.postgresql.replication;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;
@@ -223,6 +226,47 @@ public void testCreateLogicalSlot() throws Exception {
assertThat("Slot should not be temporary by default", result, CoreMatchers.equalTo(false));
}

@Test
public void testCreateLogicalSlotReturnedInfo() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;

slotName = "pgjdbc_test_create_logical_replication_slot_info";

ReplicationSlotInfo info = pgConnection
.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(slotName)
.withOutputPlugin("test_decoding")
.make();

assertEquals(slotName, info.getSlotName());
assertEquals(ReplicationType.LOGICAL, info.getReplicationType());
assertNotNull(info.getConsistentPoint());
assertNotNull(info.getSnapshotName());
assertEquals("test_decoding", info.getOutputPlugin());
}

@Test
public void testCreatePhysicalSlotReturnedInfo() throws Exception {
PGConnection pgConnection = (PGConnection) replConnection;

slotName = "pgjdbc_test_create_physical_replication_slot_info";

ReplicationSlotInfo info = pgConnection
.getReplicationAPI()
.createReplicationSlot()
.physical()
.withSlotName(slotName)
.make();

assertEquals(slotName, info.getSlotName());
assertEquals(ReplicationType.PHYSICAL, info.getReplicationType());
assertNotNull(info.getConsistentPoint());
assertNull(info.getSnapshotName());
assertNull(info.getOutputPlugin());
}

@Test
public void testCreateTemporaryLogicalSlotPg10AndHigher()
throws SQLException {

0 comments on commit 84e8d90

Please sign in to comment.
You can’t perform that action at this time.