diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index ffd00df5bd0..8ed880a5fd0 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -43,6 +43,7 @@ #include #include #include +#include #include "compat/compat.h" @@ -69,6 +70,35 @@ static const CompressionAlgorithmDefinition definitions[_END_COMPRESSION_ALGORIT [COMPRESSION_ALGORITHM_DELTADELTA] = DELTA_DELTA_ALGORITHM_DEFINITION, }; +#if PG14_GE +/* The prefix of a logical replication message which is inserted into the + * replication stream right before decompression inserts are happening + */ +#define DECOMPRESSION_MARKER_START "::timescaledb-decompression-start" +/* The prefix of a logical replication message which is inserted into the + * replication stream right after all decompression inserts have finished + */ +#define DECOMPRESSION_MARKER_END "::timescaledb-decompression-end" + +static inline void +write_logical_replication_msg_decompression_start() +{ + if (XLogLogicalInfoActive()) + { + LogLogicalMessage(DECOMPRESSION_MARKER_START, "", 0, true); + } +} + +static inline void +write_logical_replication_msg_decompression_end() +{ + if (XLogLogicalInfoActive()) + { + LogLogicalMessage(DECOMPRESSION_MARKER_END, "", 0, true); + } +} +#endif + static Compressor * compressor_for_algorithm_and_type(CompressionAlgorithms algorithm, Oid type) { @@ -2033,7 +2063,13 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo decompressor.compressed_datums, decompressor.compressed_is_nulls); +#if PG14_GE + write_logical_replication_msg_decompression_start(); +#endif row_decompressor_decompress_row(&decompressor, NULL); +#if PG14_GE + write_logical_replication_msg_decompression_end(); +#endif TM_FailureData tmfd; TM_Result result pg_attribute_unused(); @@ -3161,6 +3197,9 @@ decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *est comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock); decompressor = build_decompressor(comp_chunk_rel, chunk_rel); +#if PG14_GE + write_logical_replication_msg_decompression_start(); +#endif if (filters) { scankeys = @@ -3199,6 +3238,10 @@ decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *est is_null, &chunk_status_changed); } +#if PG14_GE + write_logical_replication_msg_decompression_end(); +#endif + /* * tuples from compressed chunk has been decompressed and moved * to staging area, thus mark this chunk as partially compressed diff --git a/tsl/test/t/009_logrepl_decomp_marker.pl b/tsl/test/t/009_logrepl_decomp_marker.pl new file mode 100644 index 00000000000..98cb731656b --- /dev/null +++ b/tsl/test/t/009_logrepl_decomp_marker.pl @@ -0,0 +1,173 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +if ($ENV{PG_VERSION_MAJOR} < 14) +{ + done_testing(); + return; +} + +use strict; +use warnings; +use TimescaleNode; +use Test::More; + +# This test checks the creation of logical replication messages +# used to mark the start and end of inserts happening as a result +# of a (partial) decompression. + +# Publishing node +my $publisher = + TimescaleNode->create('publisher', allows_streaming => 'logical'); + +# Subscribing node +my $subscriber = + TimescaleNode->create('subscriber', allows_streaming => 'logical'); + +# Setup test structures +$publisher->safe_psql( + 'postgres', + qq( + CREATE TABLE test (ts timestamptz NOT NULL PRIMARY KEY , val INT); + SELECT create_hypertable('test', 'ts', chunk_time_interval := INTERVAL '1day'); + ) +); + +# To kick off replication we need to fake the setup of a hypertable +$subscriber->safe_psql('postgres', + "CREATE TABLE _timescaledb_internal._hyper_1_1_chunk (ts timestamptz NOT NULL PRIMARY KEY , val INT)" +); + +# Initial data insert and preparation of the internal chunk tables +$publisher->safe_psql( + 'postgres', + qq( + INSERT INTO test + SELECT s.s, (random() * 100)::INT + FROM generate_series('2023-01-01'::timestamptz, '2023-01-02'::timestamptz, INTERVAL '3 hour') s; + ) +); + +# Setup logical replication +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; +$publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE _timescaledb_internal._hyper_1_1_chunk" +); +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (binary = true)" +); + +# Wait for catchup and disable consumption of additional messages +$publisher->wait_for_catchup('tap_sub'); +$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); +$publisher->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", + 1); + +# Compress chunks and consume replication stream explicitly +$publisher->safe_psql( + 'postgres', + qq( + ALTER TABLE test SET (timescaledb.compress); + SELECT compress_chunk((c.schema_name || '.' || c.table_name)::regclass, TRUE) + FROM _timescaledb_catalog.hypertable h + LEFT JOIN _timescaledb_catalog.chunk c ON c.hypertable_id = h.id WHERE h.table_name = 'test'; + ) +); +$publisher->safe_psql( + 'postgres', + qq( + SELECT pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true' + ); + ) +); + +# Create a new entry which forces a decompression to happen +$publisher->safe_psql('postgres', + "INSERT INTO test VALUES ('2023-01-01 00:10:00', 5555)"); + +# Retrieve the replication log messages +my $result = $publisher->safe_psql( + 'postgres', + qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true' + ); + ) +); + +# Test: BEGIN, MESSAGE (start marker), RELATION, ... INSERT (decompression inserts x6) ..., MESSAGE (end marker), INSERT, COMMIT +is( $result, + qq(66 +77 +82 +73 +73 +73 +73 +73 +73 +77 +73 +67), + 'messages on slot meet expectation <>' +); + +# Get initial message entry +$result = $publisher->safe_psql( + 'postgres', + qq( + SELECT get_byte(data, 1), encode(substr(data, 11, 33), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true' + ) + OFFSET 1 LIMIT 1; + ) +); +is( $result, + qq(1|::timescaledb-decompression-start), + 'first entry is decompression marker start message'); + +# Get second message entry +$result = $publisher->safe_psql( + 'postgres', + qq( + SELECT get_byte(data, 1), encode(substr(data, 11, 31), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true' + ) + OFFSET 9 LIMIT 1; + ) +); +is( $result, + qq(1|::timescaledb-decompression-end), + '10th entry is decompression marker end message'); + +# Get last insert entry to check it is the user executed insert (and value is 5555 or 35353535 in hex) +$result = $publisher->safe_psql( + 'postgres', + qq( + SELECT get_byte(data, 0), encode(substring(data from 41 for 44), 'hex') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true' + ) + OFFSET 10 LIMIT 1; + ) +); +is($result, qq(73|35353535), '11th entry is an insert message'); + +done_testing(); diff --git a/tsl/test/t/CMakeLists.txt b/tsl/test/t/CMakeLists.txt index e905cd362b1..9782cd13cf3 100644 --- a/tsl/test/t/CMakeLists.txt +++ b/tsl/test/t/CMakeLists.txt @@ -1,4 +1,5 @@ -set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections_privs.pl) +set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections_privs.pl + 009_logrepl_decomp_marker.pl) set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl 004_multinode_rdwr_1pc.pl 005_add_data_node.pl 006_job_crash_log.pl 007_healthcheck.pl 008_mvcc_cagg.pl)