Skip to content

Commit

Permalink
Include replication origins in SQL functions for commit timestamp
Browse files Browse the repository at this point in the history
This includes two changes:
- Addition of a new function pg_xact_commit_timestamp_origin() able, for
a given transaction ID, to return the commit timestamp and replication
origin of this transaction.  An equivalent function existed in
pglogical.
- Addition of the replication origin to pg_last_committed_xact().

The commit timestamp manager includes already APIs able to return the
replication origin of a transaction on top of its commit timestamp, but
the code paths for replication origins were never stressed as those
functions have never looked for a replication origin, and the SQL
functions available have never included this information since their
introduction in 73c986a.

While on it, refactor a test of modules/commit_ts/ to use tstzrange() to
check that a transaction timestamp is within the wanted range, making
the test a bit easier to read.

Bump catalog version.

Author: Movead Li
Reviewed-by: Madan Kumar, Michael Paquier
Discussion: https://postgr.es/m/2020051116430836450630@highgo.ca
  • Loading branch information
michaelpq committed Jul 12, 2020
1 parent cd22d3c commit b1e48bb
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 18 deletions.
22 changes: 19 additions & 3 deletions doc/src/sgml/func.sgml
Expand Up @@ -23397,6 +23397,21 @@ SELECT collation for ('foo' COLLATE "de_DE");
</para></entry>
</row>

<row>
<entry role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_xact_commit_timestamp_origin</primary>
</indexterm>
<function>pg_xact_commit_timestamp_origin</function> ( <type>xid</type> )
<returnvalue>record</returnvalue>
( <parameter>timestamp</parameter> <type>timestamp with time zone</type>,
<parameter>roident</parameter> <type>oid</type>)
</para>
<para>
Returns the commit timestamp and replication origin of a transaction.
</para></entry>
</row>

<row>
<entry role="func_table_entry"><para role="func_signature">
<indexterm>
Expand All @@ -23405,11 +23420,12 @@ SELECT collation for ('foo' COLLATE "de_DE");
<function>pg_last_committed_xact</function> ()
<returnvalue>record</returnvalue>
( <parameter>xid</parameter> <type>xid</type>,
<parameter>timestamp</parameter> <type>timestamp with time zone</type> )
<parameter>timestamp</parameter> <type>timestamp with time zone</type>,
<parameter>roident</parameter> <type>oid</type> )
</para>
<para>
Returns the transaction ID and commit timestamp of the latest
committed transaction.
Returns the transaction ID, commit timestamp and replication origin
of the latest committed transaction.
</para></entry>
</row>
</tbody>
Expand Down
71 changes: 66 additions & 5 deletions src/backend/access/transam/commit_ts.c
Expand Up @@ -361,7 +361,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
* is concerned, anyway; it's up to the caller to ensure the value is useful
* for its purposes.)
*
* ts and extra are filled with the corresponding data; they can be passed
* ts and nodeid are filled with the corresponding data; they can be passed
* as NULL if not wanted.
*/
TransactionId
Expand Down Expand Up @@ -417,28 +417,38 @@ pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
}


/*
* pg_last_committed_xact
*
* SQL-callable wrapper to obtain some information about the latest
* committed transaction: transaction ID, timestamp and replication
* origin.
*/
Datum
pg_last_committed_xact(PG_FUNCTION_ARGS)
{
TransactionId xid;
RepOriginId nodeid;
TimestampTz ts;
Datum values[2];
bool nulls[2];
Datum values[3];
bool nulls[3];
TupleDesc tupdesc;
HeapTuple htup;

/* and construct a tuple with our data */
xid = GetLatestCommitTsData(&ts, NULL);
xid = GetLatestCommitTsData(&ts, &nodeid);

/*
* Construct a tuple descriptor for the result row. This must match this
* function's pg_proc entry!
*/
tupdesc = CreateTemplateTupleDesc(2);
tupdesc = CreateTemplateTupleDesc(3);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
XIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
OIDOID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);

if (!TransactionIdIsNormal(xid))
Expand All @@ -452,13 +462,64 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)

values[1] = TimestampTzGetDatum(ts);
nulls[1] = false;

values[2] = ObjectIdGetDatum((Oid) nodeid);
nulls[2] = false;
}

htup = heap_form_tuple(tupdesc, values, nulls);

PG_RETURN_DATUM(HeapTupleGetDatum(htup));
}

/*
* pg_xact_commit_timestamp_origin
*
* SQL-callable wrapper to obtain commit timestamp and replication origin
* of a given transaction.
*/
Datum
pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
{
TransactionId xid = PG_GETARG_UINT32(0);
RepOriginId nodeid;
TimestampTz ts;
Datum values[2];
bool nulls[2];
TupleDesc tupdesc;
HeapTuple htup;
bool found;

found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);

/*
* Construct a tuple descriptor for the result row. This must match this
* function's pg_proc entry!
*/
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
OIDOID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);

if (!found)
{
memset(nulls, true, sizeof(nulls));
}
else
{
values[0] = TimestampTzGetDatum(ts);
nulls[0] = false;

values[1] = ObjectIdGetDatum((Oid) nodeid);
nulls[1] = false;
}

htup = heap_form_tuple(tupdesc, values, nulls);

PG_RETURN_DATUM(HeapTupleGetDatum(htup));
}

/*
* Number of shared CommitTS buffers.
Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/catversion.h
Expand Up @@ -53,6 +53,6 @@
*/

/* yyyymmddN */
#define CATALOG_VERSION_NO 202007072
#define CATALOG_VERSION_NO 202007121

#endif
15 changes: 12 additions & 3 deletions src/include/catalog/pg_proc.dat
Expand Up @@ -5946,12 +5946,21 @@
prorettype => 'timestamptz', proargtypes => 'xid',
prosrc => 'pg_xact_commit_timestamp' },

{ oid => '8456',
descr => 'get commit timestamp and replication origin of a transaction',
proname => 'pg_xact_commit_timestamp_origin', provolatile => 'v',
prorettype => 'record', proargtypes => 'xid',
proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{i,o,o}',
proargnames => '{xid,timestamp,roident}',
prosrc => 'pg_xact_commit_timestamp_origin' },

{ oid => '3583',
descr => 'get transaction Id and commit timestamp of latest transaction commit',
descr => 'get transaction Id, commit timestamp and replication origin of latest transaction commit',
proname => 'pg_last_committed_xact', provolatile => 'v',
prorettype => 'record', proargtypes => '',
proallargtypes => '{xid,timestamptz}', proargmodes => '{o,o}',
proargnames => '{xid,timestamp}', prosrc => 'pg_last_committed_xact' },
proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{o,o,o}',
proargnames => '{xid,timestamp,roident}',
prosrc => 'pg_last_committed_xact' },

{ oid => '3537', descr => 'get identification of SQL object',
proname => 'pg_describe_object', provolatile => 's', prorettype => 'text',
Expand Down
93 changes: 89 additions & 4 deletions src/test/modules/commit_ts/expected/commit_timestamp.out
Expand Up @@ -39,9 +39,94 @@ SELECT pg_xact_commit_timestamp('2'::xid);

(1 row)

SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
?column? | ?column? | ?column?
----------+----------+----------
t | t | t
SELECT x.xid::text::bigint > 0 as xid_valid,
x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
roident != 0 AS valid_roident
FROM pg_last_committed_xact() x;
xid_valid | ts_in_range | valid_roident
-----------+-------------+---------------
t | t | f
(1 row)

-- Test non-normal transaction ids.
SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
timestamp | roident
-----------+---------
|
(1 row)

SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
ERROR: cannot retrieve commit timestamp for transaction 0
SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
timestamp | roident
-----------+---------
|
(1 row)

SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
timestamp | roident
-----------+---------
|
(1 row)

-- Test transaction without replication origin
SELECT txid_current() as txid_no_origin \gset
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
roident != 0 AS valid_roident
FROM pg_last_committed_xact() x;
ts_in_range | valid_roident
-------------+---------------
t | f
(1 row)

SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
roident != 0 AS valid_roident
FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
ts_in_range | valid_roident
-------------+---------------
t | f
(1 row)

-- Test transaction with replication origin
SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
AS valid_roident;
valid_roident
---------------
t
(1 row)

SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
pg_replication_origin_session_setup
-------------------------------------

(1 row)

SELECT txid_current() as txid_with_origin \gset
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
FROM pg_last_committed_xact() x, pg_replication_origin r
WHERE r.roident = x.roident;
ts_in_range | roname
-------------+----------------------------
t | test_commit_ts: get_origin
(1 row)

SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
WHERE r.roident = x.roident;
ts_in_range | roname
-------------+----------------------------
t | test_commit_ts: get_origin
(1 row)

SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-------------------------------------

(1 row)

SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
pg_replication_origin_drop
----------------------------

(1 row)

75 changes: 74 additions & 1 deletion src/test/modules/commit_ts/expected/commit_timestamp_1.out
Expand Up @@ -34,6 +34,79 @@ SELECT pg_xact_commit_timestamp('2'::xid);

(1 row)

SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
SELECT x.xid::text::bigint > 0 as xid_valid,
x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
roident != 0 AS valid_roident
FROM pg_last_committed_xact() x;
ERROR: could not get commit timestamp data
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
-- Test non-normal transaction ids.
SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
timestamp | roident
-----------+---------
|
(1 row)

SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
ERROR: cannot retrieve commit timestamp for transaction 0
SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
timestamp | roident
-----------+---------
|
(1 row)

SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
timestamp | roident
-----------+---------
|
(1 row)

-- Test transaction without replication origin
SELECT txid_current() as txid_no_origin \gset
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
roident != 0 AS valid_roident
FROM pg_last_committed_xact() x;
ERROR: could not get commit timestamp data
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
roident != 0 AS valid_roident
FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
ERROR: could not get commit timestamp data
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
-- Test transaction with replication origin
SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
AS valid_roident;
valid_roident
---------------
t
(1 row)

SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
pg_replication_origin_session_setup
-------------------------------------

(1 row)

SELECT txid_current() as txid_with_origin \gset
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
FROM pg_last_committed_xact() x, pg_replication_origin r
WHERE r.roident = x.roident;
ERROR: could not get commit timestamp data
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
WHERE r.roident = x.roident;
ERROR: could not get commit timestamp data
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-------------------------------------

(1 row)

SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
pg_replication_origin_drop
----------------------------

(1 row)

0 comments on commit b1e48bb

Please sign in to comment.