Permalink
Browse files

feat: add replication protocol API (#550)

The replication protocol is managed by PGReplicationStream. It hides low level
replication protocol details and enables end user deal with just payload data.

The entry point is `PGConnection#getReplicationAPI`.

Current PostgreSQL backend has issues with terminating of replication connection (e.g. "stop decode" message might be ignored for a while, so termination would take some time).
Relevant hacker's thread is https://www.postgresql.org/message-id/CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w%40mail.gmail.com

Locgical replication API:

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName("test_decoding")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();
    while (true) {
      ByteBuffer buffer = stream.read();
      //process logical changes
    }

Physical replication API:

    LogSequenceNumber lsn = getCurrentLSN();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(lsn)
            .start();

    while (true) {
      ByteBuffer read = stream.read();
      //process binary WAL logs
    }

The main purpose for supporting of the replication protocol at the driver level is to provide an ability to create realtime time integration with external systems (e.g. Kafka+ElasticSearch)
  • Loading branch information...
Gordiychuk authored and vlsi committed Nov 25, 2016
1 parent d32b077 commit f48c6bb7e726479bbf4be4ef95e4c138db5cac96
Showing with 4,502 additions and 27 deletions.
  1. +9 −0 .travis.yml
  2. +24 −0 .travis/travis_configure_replication.sh
  3. +1 −1 .travis/travis_start_postgres.sh
  4. +6 −0 pgjdbc/src/main/java/org/postgresql/PGConnection.java
  5. +21 −1 pgjdbc/src/main/java/org/postgresql/PGProperty.java
  6. +16 −0 pgjdbc/src/main/java/org/postgresql/copy/CopyDual.java
  7. +11 −0 pgjdbc/src/main/java/org/postgresql/copy/CopyManager.java
  8. +17 −0 pgjdbc/src/main/java/org/postgresql/copy/CopyOut.java
  9. +5 −0 pgjdbc/src/main/java/org/postgresql/copy/PGCopyInputStream.java
  10. +8 −0 pgjdbc/src/main/java/org/postgresql/core/BaseConnection.java
  11. +5 −0 pgjdbc/src/main/java/org/postgresql/core/QueryExecutor.java
  12. +31 −0 pgjdbc/src/main/java/org/postgresql/core/ReplicationProtocol.java
  13. +28 −0 pgjdbc/src/main/java/org/postgresql/core/v2/V2ReplicationProtocol.java
  14. +10 −3 pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java
  15. +55 −0 pgjdbc/src/main/java/org/postgresql/core/v3/CopyDualImpl.java
  16. +8 −0 pgjdbc/src/main/java/org/postgresql/core/v3/CopyInImpl.java
  17. +9 −1 pgjdbc/src/main/java/org/postgresql/core/v3/CopyOperationImpl.java
  18. +7 −2 pgjdbc/src/main/java/org/postgresql/core/v3/CopyOutImpl.java
  19. +44 −14 pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
  20. +268 −0 pgjdbc/src/main/java/org/postgresql/core/v3/replication/V3PGReplicationStream.java
  21. +135 −0 pgjdbc/src/main/java/org/postgresql/core/v3/replication/V3ReplicationProtocol.java
  22. +15 −0 pgjdbc/src/main/java/org/postgresql/ds/common/BaseDataSource.java
  23. +29 −4 pgjdbc/src/main/java/org/postgresql/jdbc/PgConnection.java
  24. +109 −0 pgjdbc/src/main/java/org/postgresql/replication/LogSequenceNumber.java
  25. +44 −0 pgjdbc/src/main/java/org/postgresql/replication/PGReplicationConnection.java
  26. +48 −0 pgjdbc/src/main/java/org/postgresql/replication/PGReplicationConnectionImpl.java
  27. +127 −0 pgjdbc/src/main/java/org/postgresql/replication/PGReplicationStream.java
  28. +20 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/AbstractCreateSlotBuilder.java
  29. +39 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/AbstractStreamBuilder.java
  30. +28 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/ChainedCommonCreateSlotBuilder.java
  31. +48 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/ChainedCommonStreamBuilder.java
  32. +81 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/ChainedCreateReplicationSlotBuilder.java
  33. +76 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/ChainedStreamBuilder.java
  34. +35 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/CommonOptions.java
  35. +31 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/ReplicationCreateSlotBuilder.java
  36. +54 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/ReplicationStreamBuilder.java
  37. +26 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/logical/ChainedLogicalCreateSlotBuilder.java
  38. +32 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/logical/ChainedLogicalStreamBuilder.java
  39. +53 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/logical/LogicalCreateSlotBuilder.java
  40. +28 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/logical/LogicalReplicationOptions.java
  41. +93 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/logical/LogicalStreamBuilder.java
  42. +15 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/logical/StartLogicalReplicationCallback.java
  43. +15 −0 ...bc/src/main/java/org/postgresql/replication/fluent/physical/ChainedPhysicalCreateSlotBuilder.java
  44. +22 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/physical/ChainedPhysicalStreamBuilder.java
  45. +42 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/physical/PhysicalCreateSlotBuilder.java
  46. +11 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/physical/PhysicalReplicationOptions.java
  47. +52 −0 pgjdbc/src/main/java/org/postgresql/replication/fluent/physical/PhysicalStreamBuilder.java
  48. +14 −0 ...bc/src/main/java/org/postgresql/replication/fluent/physical/StartPhysicalReplicationCallback.java
  49. +2 −0 pgjdbc/src/main/java/org/postgresql/util/PSQLState.java
  50. +100 −0 pgjdbc/src/test/java/org/postgresql/replication/LogSequenceNumberTest.java
  51. +560 −0 pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java
  52. +891 −0 pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationTest.java
  53. +277 −0 pgjdbc/src/test/java/org/postgresql/replication/PhysicalReplicationTest.java
  54. +121 −0 pgjdbc/src/test/java/org/postgresql/replication/ReplicationConnectionTest.java
  55. +226 −0 pgjdbc/src/test/java/org/postgresql/replication/ReplicationSlotTest.java
  56. +80 −0 pgjdbc/src/test/java/org/postgresql/replication/ReplicationTestSuite.java
  57. +5 −1 pgjdbc/src/test/java/org/postgresql/test/TestUtil.java
  58. +204 −0 pgjdbc/src/test/java/org/postgresql/test/jdbc2/CopyBothResponseTest.java
  59. +1 −0 pgjdbc/src/test/java/org/postgresql/test/jdbc2/Jdbc2TestSuite.java
  60. +104 −0 pgjdbc/src/test/java/org/postgresql/test/util/rules/ServerVersionRule.java
  61. +26 −0 pgjdbc/src/test/java/org/postgresql/test/util/rules/annotation/HaveMinimalServerVersion.java
@@ -5,8 +5,10 @@ before_script:
- export PG_DATADIR="/etc/postgresql/${PG_VERSION}/main"
- ./.travis/travis_install_postgres.sh
- test "x$XA" == 'x' || ./.travis/travis_configure_xa.sh
- test "x$REPLICATION" == 'x' || ./.travis/travis_configure_replication.sh
- ./.travis/travis_start_postgres.sh
- psql -U postgres -c "create user test with password 'test';"
- test "x$REPLICATION" == 'x' || psql -U postgres -c "alter user test with replication;"
- psql -c 'create database test owner test;' -U postgres
- echo "MAVEN_OPTS='-Xmx1g -Dgpg.skip=true'" > ~/.mavenrc
- test "x$PG_VERSION" == 'x' || test "x$NO_HSTORE" == 'xY' || psql test -c 'CREATE EXTENSION hstore;' -U postgres
@@ -69,6 +71,7 @@ matrix:
env:
- PG_VERSION=9.6
- XA=true
- REPLICATION=Y
- COVERAGE=Y
- jdk: oraclejdk8
sudo: required
@@ -78,6 +81,7 @@ matrix:
env:
- PG_VERSION=9.5
- XA=true
- REPLICATION=Y
- COVERAGE=Y
- jdk: oraclejdk8
sudo: required
@@ -106,8 +110,12 @@ matrix:
apt:
packages:
- oracle-java8-installer
sudo: required
dist: trusty
env:
- PG_VERSION=9.4
- XA=true
- REPLICATION=Y
- COVERAGE=Y
- MCENTRAL=Y
- JDOC=Y
@@ -117,6 +125,7 @@ matrix:
env:
- PG_VERSION=HEAD
- XA=true
- REPLICATION=Y
- jdk: openjdk7
addons:
postgresql: "9.4"
@@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -x -e
if [ "${REPLICATION}" = "Y" ]
then
if [ "${PG_VERSION}" = "HEAD" ]
then
PG_VERSION="9.5"
fi
if (( $(echo "${PG_VERSION} >= 9.1" | bc -l)))
then
sudo sed -i -e 's/#max_wal_senders = 0/max_wal_senders = 4/g' ${PG_DATADIR}/postgresql.conf
sudo sed -i -e 's/#wal_keep_segments = 0/wal_keep_segments = 4/g' ${PG_DATADIR}/postgresql.conf
sudo sed -i -e 's/#wal_sender_timeout = .*/wal_sender_timeout = 2000/g' ${PG_DATADIR}/postgresql.conf
sudo sed -i -e 's/^#local\s\+replication\s\+postgres\s\+\(.*\)/local replication all \1/g' ${PG_DATADIR}/pg_hba.conf
sudo sed -i -e 's/^#host\s\+replication\s\+postgres\s\+\(.*\)\s\+\(.*\)/host replication all \1 \2/g' ${PG_DATADIR}/pg_hba.conf
if (( $(echo "${PG_VERSION} >= 9.4" | bc -l)))
then
sudo sed -i -e 's/#wal_level = minimal/wal_level = logical/g' ${PG_DATADIR}/postgresql.conf
sudo sed -i -e 's/#max_replication_slots = 0/max_replication_slots = 4/g' ${PG_DATADIR}/postgresql.conf
fi
fi
fi
@@ -9,7 +9,7 @@ then
#Start head postgres
sudo su postgres -c "/usr/local/pgsql/bin/pg_ctl -D ${PG_DATADIR} -w -t 300 -o '-p 5432' -l /tmp/postgres.log start"
sudo tail /tmp/postgres.log
elif [ "$XA" = "true" ]
elif [ "$XA" = "true" ] || [ "${REPLICATION}" = "Y" ]
then
sudo service postgresql stop
sudo service postgresql start ${PG_VERSION}
@@ -10,6 +10,7 @@
import org.postgresql.jdbc.AutoSave;
import org.postgresql.jdbc.PreferQueryMode;
import org.postgresql.largeobject.LargeObjectManager;
import org.postgresql.replication.PGReplicationConnection;
import org.postgresql.util.PGobject;
import java.sql.SQLException;
@@ -192,4 +193,9 @@
* @param autoSave connection configuration regarding automatic per-query savepoints
*/
void setAutosave(AutoSave autoSave);
/**
* @return replication API for the current connection
*/
PGReplicationConnection getReplicationAPI();
}
@@ -377,7 +377,27 @@
* Configure optimization to enable batch insert re-writing.
*/
REWRITE_BATCHED_INSERTS ("reWriteBatchedInserts", "false",
"Enable optimization to rewrite and collapse compatible INSERT statements that are batched.");
"Enable optimization to rewrite and collapse compatible INSERT statements that are batched."),
/**
* <p>Connection parameter for startup message Available value(true, database). A Boolean value of
* true tells the backend to go into walsender mode, wherein a small set of replication commands
* can be issued instead of SQL statements. Only the simple query protocol can be used in
* walsender mode. Passing database as the value instructs walsender to connect to the database
* specified in the dbname parameter, which will allow the connection to be used for logical
* replication from that database. <p>Parameter should be use together with {@link
* PGProperty#ASSUME_MIN_SERVER_VERSION} with parameter &gt;= 9.4 (backend &gt;= 9.4)
*/
REPLICATION("replication", null,
"Connection parameter for startup message Available value(true, database). "
+ "A Boolean value of true tells the backend to go into walsender mode, "
+ "wherein a small set of replication commands can be issued instead of SQL statements. "
+ "Only the simple query protocol can be used in walsender mode. "
+ "Passing database as the value instructs walsender to connect "
+ "to the database specified in the dbname parameter, "
+ "which will allow the connection to be used for logical replication "
+ "from that database. "
+ "(backend >= 9.4)");
private String _name;
private String _defaultValue;
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2016, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/
package org.postgresql.copy;
/**
* Bidirectional via copy stream protocol. Via bidirectional copy protocol work PostgreSQL
* replication.
*
* @see CopyIn
* @see CopyOut
*/
public interface CopyDual extends CopyIn, CopyOut {
}
@@ -66,6 +66,17 @@ public CopyOut copyOut(String sql) throws SQLException {
}
}
public CopyDual copyDual(String sql) throws SQLException {
CopyOperation op = queryExecutor.startCopy(sql, connection.getAutoCommit());
if (op == null || op instanceof CopyDual) {
return (CopyDual) op;
} else {
op.cancelCopy();
throw new PSQLException(GT.tr("Requested CopyDual but got {0}", op.getClass().getName()),
PSQLState.WRONG_OBJECT_TYPE);
}
}
/**
* Pass results of a COPY TO STDOUT query from database into a Writer.
*
@@ -8,5 +8,22 @@
import java.sql.SQLException;
public interface CopyOut extends CopyOperation {
/**
* Blocks wait for a row of data to be received from server on an active copy operation.
*
* @return byte array received from server, null if server complete copy operation
* @throws SQLException if something goes wrong for example socket timeout
*/
byte[] readFromCopy() throws SQLException;
/**
* Wait for a row of data to be received from server on an active copy operation.
*
* @param block {@code true} if need wait data from server otherwise {@code false} and will read
* pending message from server
* @return byte array received from server, if pending message from server absent and use no
* blocking mode return null
* @throws SQLException if something goes wrong for example socket timeout
*/
byte[] readFromCopy(boolean block) throws SQLException;
}
@@ -112,6 +112,11 @@ public int read(byte[] buf, int off, int siz) throws IOException {
return result;
}
@Override
public byte[] readFromCopy(boolean block) throws SQLException {
return readFromCopy();
}
public void close() throws IOException {
// Don't complain about a double close.
if (op == null) {
@@ -55,6 +55,14 @@ ResultSet execSQLQuery(String s, int resultSetType, int resultSetConcurrency)
*/
QueryExecutor getQueryExecutor();
/**
* Internal protocol for work with physical and logical replication. Physical replication available
* only since PostgreSQL version 9.1. Logical replication available only since PostgreSQL version 9.4.
*
* @return not null replication protocol
*/
ReplicationProtocol getReplicationProtocol();
/**
* Construct and return an appropriate object for the given type and value. This only considers
* the types registered via {@link org.postgresql.PGConnection#addDataType(String, Class)} and
@@ -421,4 +421,9 @@ Object createQueryKey(String sql, boolean escapeProcessing, boolean isParameteri
* @param flushCacheOnDeallocate true if statement cache should be reset when "deallocate/discard" message observed
*/
void setFlushCacheOnDeallocate(boolean flushCacheOnDeallocate);
/**
* @return the ReplicationProtocol instance for this connection.
*/
ReplicationProtocol getReplicationProtocol();
}
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2016, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/
package org.postgresql.core;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.LogicalReplicationOptions;
import org.postgresql.replication.fluent.physical.PhysicalReplicationOptions;
import java.sql.SQLException;
/**
* <p>Abstracts the protocol-specific details of physic and logic replication. <p>With each
* connection open with replication options associate own instance ReplicationProtocol.
*/
public interface ReplicationProtocol {
/**
* @param options not null options for logical replication stream
* @return not null stream instance from which available fetch wal logs that was decode by output
* plugin
*/
PGReplicationStream startLogical(LogicalReplicationOptions options) throws SQLException;
/**
* @param options not null options for physical replication stream
* @return not null stream instance from which available fetch wal logs
*/
PGReplicationStream startPhysical(PhysicalReplicationOptions options) throws SQLException;
}
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2016, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/
package org.postgresql.core.v2;
import org.postgresql.core.ReplicationProtocol;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.LogicalReplicationOptions;
import org.postgresql.replication.fluent.physical.PhysicalReplicationOptions;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
public class V2ReplicationProtocol implements ReplicationProtocol {
public PGReplicationStream startLogical(LogicalReplicationOptions options) throws PSQLException {
throw new PSQLException(GT.tr("ReplicationProtocol not implemented for protocol version 2"),
PSQLState.NOT_IMPLEMENTED);
}
public PGReplicationStream startPhysical(PhysicalReplicationOptions options)
throws PSQLException {
throw new PSQLException(GT.tr("ReplicationProtocol not implemented for protocol version 2"),
PSQLState.NOT_IMPLEMENTED);
}
}
@@ -15,6 +15,7 @@
import org.postgresql.core.SetupQueryRunner;
import org.postgresql.core.SocketFactoryFactory;
import org.postgresql.core.Utils;
import org.postgresql.core.Version;
import org.postgresql.hostchooser.GlobalHostStatusTracker;
import org.postgresql.hostchooser.HostChooser;
import org.postgresql.hostchooser.HostChooserFactory;
@@ -192,8 +193,10 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin
paramList.add(new String[]{"client_encoding", "UTF8"});
paramList.add(new String[]{"DateStyle", "ISO"});
paramList.add(new String[]{"TimeZone", createPostgresTimeZone()});
String assumeMinServerVersion = PGProperty.ASSUME_MIN_SERVER_VERSION.get(info);
if (Utils.parseServerVersionStr(assumeMinServerVersion)
Version assumeVersion = ServerVersion.from(PGProperty.ASSUME_MIN_SERVER_VERSION.get(info));
if (assumeVersion.getVersionNum()
>= ServerVersion.v9_0.getVersionNum()) {
// User is explicitly telling us this is a 9.0+ server so set properties here:
paramList.add(new String[]{"extra_float_digits", "3"});
@@ -206,11 +209,15 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin
paramList.add(new String[]{"extra_float_digits", "2"});
}
String replication = PGProperty.REPLICATION.get(info);
if (replication != null && assumeVersion.getVersionNum() >= ServerVersion.v9_4.getVersionNum()) {
paramList.add(new String[]{"replication", replication});
}
String currentSchema = PGProperty.CURRENT_SCHEMA.get(info);
if (currentSchema != null) {
paramList.add(new String[]{"search_path", currentSchema});
}
sendStartupPacket(newStream, paramList, logger);
// Do authentication (until AuthenticationOk).
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2016, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/
package org.postgresql.core.v3;
import org.postgresql.copy.CopyDual;
import org.postgresql.util.PSQLException;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.Queue;
public class CopyDualImpl extends CopyOperationImpl implements CopyDual {
private Queue<byte[]> received = new LinkedList<byte[]>();
public void writeToCopy(byte[] data, int off, int siz) throws SQLException {
queryExecutor.writeToCopy(this, data, off, siz);
}
public void flushCopy() throws SQLException {
queryExecutor.flushCopy(this);
}
public long endCopy() throws SQLException {
return queryExecutor.endCopy(this);
}
public byte[] readFromCopy() throws SQLException {
if (received.isEmpty()) {
queryExecutor.readFromCopy(this, true);
}
return received.poll();
}
@Override
public byte[] readFromCopy(boolean block) throws SQLException {
if (received.isEmpty()) {
queryExecutor.readFromCopy(this, block);
}
return received.poll();
}
@Override
public void handleCommandStatus(String status) throws PSQLException {
}
protected void handleCopydata(byte[] data) {
received.add(data);
}
}
@@ -6,6 +6,9 @@
package org.postgresql.core.v3;
import org.postgresql.copy.CopyIn;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.sql.SQLException;
@@ -39,4 +42,9 @@ public void flushCopy() throws SQLException {
public long endCopy() throws SQLException {
return queryExecutor.endCopy(this);
}
protected void handleCopydata(byte[] data) throws PSQLException {
throw new PSQLException(GT.tr("CopyIn copy direction can't receive data"),
PSQLState.PROTOCOL_VIOLATION);
}
}
Oops, something went wrong.

0 comments on commit f48c6bb

Please sign in to comment.