Skip to content

Commit

Permalink
Logical replication support for initial data copy
Browse files Browse the repository at this point in the history
Add functionality for a new subscription to copy the initial data in the
tables and then sync with the ongoing apply process.

For the copying, add a new internal COPY option to have the COPY source
data provided by a callback function.  The initial data copy works on
the subscriber by receiving COPY data from the publisher and then
providing it locally into a COPY that writes to the destination table.

A WAL receiver can now execute full SQL commands.  This is used here to
obtain information about tables and publications.

Several new options were added to CREATE and ALTER SUBSCRIPTION to
control whether and when initial table syncing happens.

Change pg_dump option --no-create-subscription-slots to
--no-subscription-connect and use the new CREATE SUBSCRIPTION
... NOCONNECT option for that.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Tested-by: Erik Rijkers <er@xs4all.nl>
  • Loading branch information
petere committed Mar 23, 2017
1 parent 707576b commit 7c4f524
Show file tree
Hide file tree
Showing 62 changed files with 2,966 additions and 341 deletions.
5 changes: 4 additions & 1 deletion contrib/file_fdw/file_fdw.c
Expand Up @@ -662,6 +662,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
node->ss.ss_currentRelation,
filename,
is_program,
NULL,
NIL,
options);

Expand Down Expand Up @@ -737,6 +738,7 @@ fileReScanForeignScan(ForeignScanState *node)
node->ss.ss_currentRelation,
festate->filename,
festate->is_program,
NULL,
NIL,
festate->options);
}
Expand Down Expand Up @@ -1100,7 +1102,8 @@ file_acquire_sample_rows(Relation onerel, int elevel,
/*
* Create CopyState from FDW options.
*/
cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NIL, options);
cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
options);

/*
* Use per-tuple memory context to prevent leak of memory used to read
Expand Down
78 changes: 78 additions & 0 deletions doc/src/sgml/catalogs.sgml
Expand Up @@ -300,6 +300,11 @@
<entry>logical replication subscriptions</entry>
</row>

<row>
<entry><link linkend="catalog-pg-subscription-rel"><structname>pg_subscription_rel</structname></link></entry>
<entry>relation state for subscriptions</entry>
</row>

<row>
<entry><link linkend="catalog-pg-tablespace"><structname>pg_tablespace</structname></link></entry>
<entry>tablespaces within this database cluster</entry>
Expand Down Expand Up @@ -6418,6 +6423,79 @@
</table>
</sect1>

<sect1 id="catalog-pg-subscription-rel">
<title><structname>pg_subscription_rel</structname></title>

<indexterm zone="catalog-pg-subscription-rel">
<primary>pg_subscription_rel</primary>
</indexterm>

<para>
The catalog <structname>pg_subscription_rel</structname> contains the
state for each replicated relation in each subscription. This is a
many-to-many mapping.
</para>

<para>
This catalog only contains tables known to the subscription after running
either <command>CREATE SUBSCRIPTION</command> or
<command>ALTER SUBSCRIPTION ... REFRESH</command>.
</para>

<table>
<title><structname>pg_subscription_rel</structname> Columns</title>

<tgroup cols="4">
<thead>
<row>
<entry>Name</entry>
<entry>Type</entry>
<entry>References</entry>
<entry>Description</entry>
</row>
</thead>

<tbody>
<row>
<entry><structfield>srsubid</structfield></entry>
<entry><type>oid</type></entry>
<entry><literal><link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>.oid</literal></entry>
<entry>Reference to subscription</entry>
</row>

<row>
<entry><structfield>srrelid</structfield></entry>
<entry><type>oid</type></entry>
<entry><literal><link linkend="catalog-pg-class"><structname>pg_class</structname></link>.oid</literal></entry>
<entry>Reference to relation</entry>
</row>

<row>
<entry><structfield>srsubstate</structfield></entry>
<entry><type>char</type></entry>
<entry></entry>
<entry>
State code:
<literal>i</> = initialize,
<literal>d</> = data is being copied,
<literal>s</> = synchronized,
<literal>r</> = ready (normal replication)
</entry>
</row>

<row>
<entry><structfield>srsublsn</structfield></entry>
<entry><type>pg_lsn</type></entry>
<entry></entry>
<entry>
End LSN for <literal>s</> and <literal>r</> states.
</entry>
</row>
</tbody>
</tgroup>
</table>
</sect1>

<sect1 id="catalog-pg-tablespace">
<title><structname>pg_tablespace</structname></title>

Expand Down
25 changes: 25 additions & 0 deletions doc/src/sgml/config.sgml
Expand Up @@ -3449,6 +3449,31 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>

<varlistentry id="guc-max-sync-workers-per-subscription" xreflabel="max_sync_workers_per_subscription">
<term><varname>max_sync_workers_per_subscription</varname> (<type>integer</type>)
<indexterm>
<primary><varname>max_sync_workers_per_subscription</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Maximum number of synchronization workers per subscription. This
parameter controls the amount of paralelism of the initial data copy
during the subscription initialization or when new tables are added.
</para>
<para>
Currently, there can be only one synchronization worker per table.
</para>
<para>
The synchronization workers are taken from the pool defined by
<varname>max_logical_replication_workers</varname>.
</para>
<para>
The default value is 2.
</para>
</listitem>
</varlistentry>

</variablelist>
</sect2>

Expand Down
55 changes: 40 additions & 15 deletions doc/src/sgml/logical-replication.sgml
Expand Up @@ -24,9 +24,11 @@
</para>

<para>
Logical replication sends changes on the publisher to the subscriber as
they occur in real-time. The subscriber applies the data in the same order
as the publisher so that transactional consistency is guaranteed for
Logical replication of a table typically starts with a taking a snapshot
of the data on the publisher database and copying that to the subscriber.
Once that is done, the changes on the publisher are sent to the subscriber
as they occur in real-time. The subscriber applies the data in the same
order as the publisher so that transactional consistency is guaranteed for
publications within a single subscription. This method of data replication
is sometimes referred to as transactional replication.
</para>
Expand Down Expand Up @@ -159,7 +161,9 @@

<para>
Each subscription will receive changes via one replication slot (see
<xref linkend="streaming-replication-slots">).
<xref linkend="streaming-replication-slots">). Additional temporary
replication slots may be required for the initial data synchronization
of pre-existing table data.
</para>

<para>
Expand Down Expand Up @@ -264,9 +268,25 @@
to <literal>replica</literal>, which produces the usual effects on triggers
and constraints.
</para>

<sect2 id="logical-replication-snapshot">
<title>Initial Snapshot</title>
<para>
The initial data in existing subscribed tables are snapshotted and
copied in a parallel instance of a special kind of apply process.
This process will create its own temporary replication slot and
copy the existing data. Once existing data is copied, the worker
enters synchronization mode, which ensures that the table is brought
up to a synchronized state with the main apply process by streaming
any changes that happened during the initial data copy using standard
logical replication. Once the synchronization is done, the control
of the replication of the table is given back to the main apply
process where the replication continues as normal.
</para>
</sect2>
</sect1>

<sect1 id="logical-replication-monitoring">
<sect1 id="logical-replication-monitoring">
<title>Monitoring</title>

<para>
Expand All @@ -287,7 +307,9 @@
<para>
Normally, there is a single apply process running for an enabled
subscription. A disabled subscription or a crashed subscription will have
zero rows in this view.
zero rows in this view. If the initial data synchronization of any
table is in progress, there will be additional workers for the tables
being synchronized.
</para>
</sect1>

Expand Down Expand Up @@ -337,20 +359,21 @@
<para>
On the publisher side, <varname>wal_level</varname> must be set to
<literal>logical</literal>, and <varname>max_replication_slots</varname>
must be set to at least the number of subscriptions expected to connect.
And <varname>max_wal_senders</varname> should be set to at least the same
as <varname>max_replication_slots</varname> plus the number of physical replicas
that are connected at the same time.
must be set to at least the number of subscriptions expected to connect,
plus some reserve for table synchronization. And
<varname>max_wal_senders</varname> should be set to at least the same as
<varname>max_replication_slots</varname> plus the number of physical
replicas that are connected at the same time.
</para>

<para>
The subscriber also requires the <varname>max_replication_slots</varname>
to be set. In this case it should be set to at least the number of
subscriptions that will be added to the subscriber.
<varname>max_logical_replication_workers</varname> must be set to at
least the number of subscriptions. Additionally the
<varname>max_worker_processes</varname> may need to be adjusted to
accommodate for replication workers, at least
least the number of subscriptions, again plus some reserve for the table
synchronization. Additionally the <varname>max_worker_processes</varname>
may need to be adjusted to accommodate for replication workers, at least
(<varname>max_logical_replication_workers</varname>
+ <literal>1</literal>). Note that some extensions and parallel queries
also take worker slots from <varname>max_worker_processes</varname>.
Expand Down Expand Up @@ -393,8 +416,10 @@ CREATE SUBSCRIPTION mysub CONNECTION 'dbname=foo host=bar user=repuser' PUBLICAT
</para>

<para>
The above will start the replication process of changes to
<literal>users</literal> and <literal>departments</literal> tables.
The above will start the replication process, which synchronizes the
initial table contents of the tables <literal>users</literal> and
<literal>departments</literal> and then starts replicating
incremental changes to those tables.
</para>
</sect1>
</chapter>
9 changes: 8 additions & 1 deletion doc/src/sgml/monitoring.sgml
Expand Up @@ -1863,6 +1863,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<entry><type>integer</></entry>
<entry>Process ID of the subscription worker process</entry>
</row>
<row>
<entry><structfield>relid</></entry>
<entry><type>Oid</></entry>
<entry>OID of the relation that the worker is synchronizing; null for the
main apply worker</entry>
</row>
<row>
<entry><structfield>received_lsn</></entry>
<entry><type>pg_lsn</></entry>
Expand Down Expand Up @@ -1899,7 +1905,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<para>
The <structname>pg_stat_subscription</structname> view will contain one
row per subscription for main worker (with null PID if the worker is
not running).
not running), and additional rows for workers handling the initial data
copy of the subscribed tables.
</para>

<table id="pg-stat-ssl-view" xreflabel="pg_stat_ssl">
Expand Down
9 changes: 7 additions & 2 deletions doc/src/sgml/protocol.sgml
Expand Up @@ -1487,7 +1487,7 @@ The commands accepted in walsender mode are:
</varlistentry>

<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> [ <literal>EXPORT_SNAPSHOT</> | <literal>NOEXPORT_SNAPSHOT</> ] }
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> [ <literal>EXPORT_SNAPSHOT</> | <literal>NOEXPORT_SNAPSHOT</> | <literal>USE_SNAPSHOT</> ] }
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
</term>
<listitem>
Expand Down Expand Up @@ -1542,12 +1542,17 @@ The commands accepted in walsender mode are:
<varlistentry>
<term><literal>EXPORT_SNAPSHOT</></term>
<term><literal>NOEXPORT_SNAPSHOT</></term>
<term><literal>USE_SNAPSHOT</></term>
<listitem>
<para>
Decides what to do with the snapshot created during logical slot
initialization. <literal>EXPORT_SNAPSHOT</>, which is the default,
will export the snapshot for use in other sessions. This option can't
be used inside a transaction. <literal>NOEXPORT_SNAPSHOT</> will
be used inside a transaction. <literal>USE_SNAPSHOT</> will use the
snapshot for the current transaction executing the command. This
option must be used in a transaction, and
<literal>CREATE_REPLICATION_SLOT</literal> must be the first command
run in that transaction. Finally, <literal>NOEXPORT_SNAPSHOT</> will
just use the snapshot for logical decoding as normal but won't do
anything else with it.
</para>
Expand Down
50 changes: 45 additions & 5 deletions doc/src/sgml/ref/alter_subscription.sgml
Expand Up @@ -21,15 +21,21 @@ PostgreSQL documentation

<refsynopsisdiv>
<synopsis>
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <replaceable class="PARAMETER">option</replaceable> [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <replaceable class="PARAMETER">suboption</replaceable> [, ... ] ) ]

<phrase>where <replaceable class="PARAMETER">option</replaceable> can be:</phrase>
<phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase>

SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>

ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION <replaceable class="PARAMETER">publication_name</replaceable> [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH }
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] )

<phrase>where <replaceable class="PARAMETER">puboption</replaceable> can be:</phrase>

COPY DATA | NOCOPY DATA

ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION <replaceable>publication_name</replaceable> [, ...]
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> ENABLE
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE
</synopsis>
Expand Down Expand Up @@ -65,7 +71,6 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE

<varlistentry>
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<term><literal>SET PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
<term><literal>SLOT NAME = <replaceable class="parameter">slot_name</replaceable></literal></term>
<listitem>
<para>
Expand All @@ -76,6 +81,40 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE
</listitem>
</varlistentry>

<varlistentry>
<term><literal>SET PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
<listitem>
<para>
Changes list of subscribed publications. See
<xref linkend="SQL-CREATESUBSCRIPTION"> for more information.
</para>
<para>
When <literal>REFRESH</literal> is specified, this command will also
act like <literal>REFRESH PUBLICATION</literal>. When
<literal>NOREFRESH</literal> is specified, the comamnd will not try to
refresh table information.
</para>
</listitem>
</varlistentry>

<varlistentry>
<term>REFRESH PUBLICATION</term>
<listitem>
<para>
Fetch missing table information from publisher. This will start
replication of tables that were added to the subscribed-to publications
since the last invocation of <command>REFRESH PUBLICATION</command> or
since <command>CREATE SUBSCRIPTION</command>.
</para>
<para>
The <literal>COPY DATA</literal> and <literal>NOCOPY DATA</literal>
options specify if the existing data in the publications that are being
subscribed to should be copied. <literal>COPY DATA</literal> is the
default.
</para>
</listitem>
</varlistentry>

<varlistentry>
<term><literal>ENABLE</literal></term>
<listitem>
Expand All @@ -95,6 +134,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE
</para>
</listitem>
</varlistentry>

</variablelist>
</refsect1>

Expand Down

0 comments on commit 7c4f524

Please sign in to comment.