Skip to content

Commit

Permalink
[#22040] YSQL: Add yb_restart_commit_ht column in pg_replication_slot…
Browse files Browse the repository at this point in the history
…s view

Summary:
Summary
This is related to the project to support Replication slot API in YSQL (#18724).
(https://phorge.dev.yugabyte.com/D29194).
This is also related to the PG Compatible Logical Replication Consumption project.

The schema of the pg_replication_slots view has been modified by adding an extra yb-specific
column yb_restart_commit_ht which is a int8.

The value of this column is a uint64 representation of the commit Hybrid Time corresponding
to the restart_lsn. This can be used by the client (like YB-PG Connector) to perform a
consistent snapshot (as of the consistent_point) in the case when a replication slot already
exists.

UPGRADE/ROLLBACK SAFETY:
These changes are protected via the preview flag: ysql_yb_enable_replication_commands
Jira: DB-10956

Test Plan:
Manual Testing
./yb_build.sh --java-test 'org.yb.pgsql.TestPgReplicationSlot'
./yb_build.sh --java-test 'org.yb.pgsql.TestPgRegressReplicationSlot'
./yb_build.sh --java-test 'org.yb.pgsql.TestYsqlUpgrade#upgradeIsIdempotent'
./yb_build.sh --java-test 'org.yb.pgsql.TestYsqlUpgrade#upgradeIsIdempotentSingleConn'

Reviewers: stiwary, skumar

Reviewed By: stiwary

Subscribers: yql, ycdcxcluster

Differential Revision: https://phorge.dev.yugabyte.com/D34279
  • Loading branch information
asrinivasanyb committed Apr 24, 2024
1 parent 54ceff9 commit 30b1982
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/postgres/src/backend/catalog/yb_system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,8 @@ CREATE VIEW pg_replication_slots AS
L.catalog_xmin,
L.restart_lsn,
L.confirmed_flush_lsn,
L.yb_stream_id
L.yb_stream_id,
L.yb_restart_commit_ht
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

Expand Down
10 changes: 9 additions & 1 deletion src/postgres/src/backend/replication/slotfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_GET_REPLICATION_SLOTS_COLS 11
/* YB specific fields in pg_get_replication_slots */
#define YB_PG_GET_REPLICATION_SLOTS_COLS 1
#define YB_PG_GET_REPLICATION_SLOTS_COLS 2

if (IsYugaByteEnabled() && !yb_enable_replication_commands)
ereport(ERROR,
Expand Down Expand Up @@ -355,6 +355,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)

const char *yb_stream_id;
bool yb_stream_active;
uint64 yb_restart_commit_ht;

if (IsYugaByteEnabled())
{
Expand All @@ -368,6 +369,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)

restart_lsn = slot->restart_lsn;
confirmed_flush_lsn = slot->confirmed_flush;
yb_restart_commit_ht = slot->record_id_commit_time_ht;
xmin = slot->xmin;
/*
* Set catalog_xmin as xmin to make the PG Debezium connector work.
Expand Down Expand Up @@ -452,9 +454,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
nulls[i++] = true;

if (IsYugaByteEnabled())
{
values[i++] = CStringGetTextDatum(yb_stream_id);
values[i++] = Int64GetDatum(yb_restart_commit_ht);
}
else
{
nulls[i++] = true;
nulls[i++] = true;
}

tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
Expand Down
6 changes: 3 additions & 3 deletions src/postgres/src/include/catalog/pg_proc.dat
Original file line number Diff line number Diff line change
Expand Up @@ -9927,9 +9927,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,yb_stream_id}',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,yb_stream_id,yb_restart_commit_ht}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
Expand Down
4 changes: 2 additions & 2 deletions src/postgres/src/include/catalog/pg_yb_migration.dat
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[

# For better version control conflict detection, list latest migration filename
# here: V49__19132__yb_local_tablets.sql
{ major => '49', minor => '0', name => '<baseline>', time_applied => '_null_' }
# here: V50__22040__yb_restart_commit_ht_in_pg_replication_slots.sql
{ major => '50', minor => '0', name => '<baseline>', time_applied => '_null_' }

]
5 changes: 3 additions & 2 deletions src/postgres/src/test/regress/expected/yb_pg_rules.out
Original file line number Diff line number Diff line change
Expand Up @@ -1453,8 +1453,9 @@ pg_replication_slots| SELECT l.slot_name,
l.catalog_xmin,
l.restart_lsn,
l.confirmed_flush_lsn,
l.yb_stream_id
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, yb_stream_id)
l.yb_stream_id,
l.yb_restart_commit_ht
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, yb_stream_id, yb_restart_commit_ht)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false)
testslot2 | 0/2
(1 row)

-- Cannot do SELECT * since yb_stream_id changes across runs.
-- Cannot do SELECT * since yb_stream_id, yb_restart_commit_ht changes across runs.
SELECT slot_name, plugin, slot_type, database, temporary, active,
active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/test/regress/sql/yb_replication_slot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ SET SESSION AUTHORIZATION 'regress_replicationslot_user';
SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false);
SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false);

-- Cannot do SELECT * since yb_stream_id changes across runs.
-- Cannot do SELECT * since yb_stream_id, yb_restart_commit_ht changes across runs.
SELECT slot_name, plugin, slot_type, database, temporary, active,
active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,32 @@ COMMIT;

-- Recreating system views that use pg_get_replication_slots to update their corresponding
-- pg_rewrite entries.
CREATE OR REPLACE VIEW pg_catalog.pg_replication_slots WITH (use_initdb_acl = true) AS
SELECT
L.slot_name,
L.plugin,
L.slot_type,
L.datoid,
D.datname AS database,
L.temporary,
L.active,
L.active_pid,
L.xmin,
L.catalog_xmin,
L.restart_lsn,
L.confirmed_flush_lsn,
L.yb_stream_id
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
DO $$
BEGIN
IF NOT EXISTS (
SELECT TRUE FROM pg_attribute
WHERE attrelid = 'pg_catalog.pg_replication_slots'::regclass
AND attname = 'yb_stream_id'
AND NOT attisdropped
) THEN
CREATE OR REPLACE VIEW pg_catalog.pg_replication_slots
WITH (use_initdb_acl = true)
AS
SELECT
L.slot_name,
L.plugin,
L.slot_type,
L.datoid,
D.datname AS database,
L.temporary,
L.active,
L.active_pid,
L.xmin,
L.catalog_xmin,
L.restart_lsn,
L.confirmed_flush_lsn,
L.yb_stream_id
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
BEGIN;
SET LOCAL yb_non_ddl_txn_for_sys_tables_allowed TO true;

-- Add a column for restart commit ht in pg_get_replication_slots
-- TODO: As a workaround for GHI #13500, we perform a delete + insert instead
-- of an update into pg_proc. Restore to UPDATE once fixed.
DELETE FROM pg_catalog.pg_proc WHERE proname = 'pg_get_replication_slots' AND
pronamespace = 'pg_catalog'::regnamespace;
INSERT INTO pg_catalog.pg_proc (
oid, proname, pronamespace, proowner, prolang, procost, prorows, provariadic, protransform,
prokind, prosecdef, proleakproof, proisstrict, proretset, provolatile, proparallel,
pronargs, pronargdefaults, prorettype, proargtypes, proallargtypes, proargmodes,
proargnames, proargdefaults, protrftypes, prosrc, probin, proconfig, proacl
) VALUES (
3781, 'pg_get_replication_slots', 11, 10, 12, 1, 10, 0, '-', 'f', false, false, false,
true, 's', 's', 0, 0, 2249, '', '{19,19,25,26,16,16,23,28,28,3220,3220,25,20}',
'{o,o,o,o,o,o,o,o,o,o,o,o,o}', '{slot_name,plugin,slot_type,datoid,temporary,active,
active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,yb_stream_id,yb_restart_commit_ht}',
NULL, NULL, 'pg_get_replication_slots', NULL, NULL, NULL)
ON CONFLICT DO NOTHING;
COMMIT;

-- Recreating system views that use pg_get_replication_slots to update their corresponding
-- pg_rewrite entries.
DO $$
BEGIN
IF NOT EXISTS (
SELECT TRUE FROM pg_attribute
WHERE attrelid = 'pg_catalog.pg_replication_slots'::regclass
AND attname = 'yb_restart_commit_ht'
AND NOT attisdropped
) THEN
CREATE OR REPLACE VIEW pg_catalog.pg_replication_slots
WITH (use_initdb_acl = true)
AS
SELECT
L.slot_name,
L.plugin,
L.slot_type,
L.datoid,
D.datname AS database,
L.temporary,
L.active,
L.active_pid,
L.xmin,
L.catalog_xmin,
L.restart_lsn,
L.confirmed_flush_lsn,
L.yb_stream_id,
L.yb_restart_commit_ht
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
END IF;
END $$;

0 comments on commit 30b1982

Please sign in to comment.