Skip to content

Commit

Permalink
Add worker type to pg_stat_subscription.
Browse files Browse the repository at this point in the history
Thanks to commit 2a8b40e, the logical replication worker type is
easily determined.  The worker type could already be deduced via
other columns such as leader_pid and relid, but that is unnecessary
complexity for users.

Bumps catversion.

Author: Peter Smith
Reviewed-by: Michael Paquier, Maxim Orlov, Amit Kapila
Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com
  • Loading branch information
nathan-bossart committed Sep 25, 2023
1 parent 849d367 commit 13aeaf0
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 8 deletions.
13 changes: 12 additions & 1 deletion doc/src/sgml/monitoring.sgml
Expand Up @@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>worker_type</structfield> <type>text</type>
</para>
<para>
Type of the subscription worker process. Possible types are
<literal>apply</literal>, <literal>parallel apply</literal>, and
<literal>table synchronization</literal>.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>pid</structfield> <type>integer</type>
Expand All @@ -2008,7 +2019,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para>
<para>
Process ID of the leader apply worker if this process is a parallel
apply worker; NULL if this process is a leader apply worker or a
apply worker; NULL if this process is a leader apply worker or a table
synchronization worker
</para></entry>
</row>
Expand Down
1 change: 1 addition & 0 deletions src/backend/catalog/system_views.sql
Expand Up @@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
SELECT
su.oid AS subid,
su.subname,
st.worker_type,
st.pid,
st.leader_pid,
st.relid,
Expand Down
18 changes: 17 additions & 1 deletion src/backend/replication/logical/launcher.c
Expand Up @@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_SUBSCRIPTION_COLS 9
#define PG_STAT_GET_SUBSCRIPTION_COLS 10
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
int i;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
Expand Down Expand Up @@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
else
values[8] = TimestampTzGetDatum(worker.reply_time);

switch (worker.type)
{
case WORKERTYPE_APPLY:
values[9] = CStringGetTextDatum("apply");
break;
case WORKERTYPE_PARALLEL_APPLY:
values[9] = CStringGetTextDatum("parallel apply");
break;
case WORKERTYPE_TABLESYNC:
values[9] = CStringGetTextDatum("table synchronization");
break;
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "unknown worker type");
}

tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
values, nulls);

Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/catversion.h
Expand Up @@ -57,6 +57,6 @@
*/

/* yyyymmddN */
#define CATALOG_VERSION_NO 202309221
#define CATALOG_VERSION_NO 202309251

#endif
6 changes: 3 additions & 3 deletions src/include/catalog/pg_proc.dat
Expand Up @@ -5484,9 +5484,9 @@
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => 'oid',
proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
prosrc => 'pg_stat_get_subscription' },
{ oid => '2026', descr => 'statistics: current backend PID',
proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
Expand Down
3 changes: 2 additions & 1 deletion src/test/regress/expected/rules.out
Expand Up @@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
WHERE (client_port IS NOT NULL);
pg_stat_subscription| SELECT su.oid AS subid,
su.subname,
st.worker_type,
st.pid,
st.leader_pid,
st.relid,
Expand All @@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
st.latest_end_lsn,
st.latest_end_time
FROM (pg_subscription su
LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
ss.apply_error_count,
Expand Down
2 changes: 1 addition & 1 deletion src/test/subscription/t/004_sync.pl
Expand Up @@ -80,7 +80,7 @@

# wait for it to start
$node_subscriber->poll_query_until('postgres',
"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'apply'"
) or die "Timed out while waiting for subscriber to start";

# and drop both subscriptions
Expand Down

0 comments on commit 13aeaf0

Please sign in to comment.