From a32a694152412f0f045def071de4c20b092cc3a0 Mon Sep 17 00:00:00 2001 From: Joshua Lockerman Date: Thu, 2 Jun 2022 16:11:44 -0400 Subject: [PATCH] Add telemetry about replication This commit adds telemetry about replication status to our telemetry gatherer. It adds a new sub object `replication` containing two fields: - `is_wal_receiver` is a boolean which is true if-and-only-if the current cluster has a `wal_receiver`. - `num_wal_senders` is the number of `wal_senders` that the current cluster has. --- src/telemetry/CMakeLists.txt | 4 +- src/telemetry/replication.c | 56 ++++++++++++++++++++++++++++ src/telemetry/replication.h | 24 ++++++++++++ src/telemetry/telemetry.c | 26 +++++++++++++ test/expected/telemetry.out | 3 +- test/t/002_replication_telemetry.pl | 58 +++++++++++++++++++++++++++++ test/t/CMakeLists.txt | 3 +- 7 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 src/telemetry/replication.c create mode 100644 src/telemetry/replication.h create mode 100644 test/t/002_replication_telemetry.pl diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt index 9f0a12a83ee..aec3ec78288 100644 --- a/src/telemetry/CMakeLists.txt +++ b/src/telemetry/CMakeLists.txt @@ -1,6 +1,8 @@ # Add all *.c to sources in upperlevel directory set(SOURCES - ${CMAKE_CURRENT_SOURCE_DIR}/functions.c ${CMAKE_CURRENT_SOURCE_DIR}/stats.c + ${CMAKE_CURRENT_SOURCE_DIR}/functions.c + ${CMAKE_CURRENT_SOURCE_DIR}/replication.c + ${CMAKE_CURRENT_SOURCE_DIR}/stats.c ${CMAKE_CURRENT_SOURCE_DIR}/telemetry_metadata.c ${CMAKE_CURRENT_SOURCE_DIR}/telemetry.c) target_sources(${PROJECT_NAME} PRIVATE ${SOURCES}) diff --git a/src/telemetry/replication.c b/src/telemetry/replication.c new file mode 100644 index 00000000000..1d20a6b4dd8 --- /dev/null +++ b/src/telemetry/replication.c @@ -0,0 +1,56 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ +#include + +#include "replication.h" + +#include + +ReplicationInfo +ts_telemetry_replication_info_gather(void) +{ + int res; + bool isnull; + Datum data; + ReplicationInfo info = { + .got_num_wal_senders = false, + .got_is_wal_receiver = false, + }; + + if (SPI_connect() != SPI_OK_CONNECT) + return info; + + res = SPI_execute("SELECT cast(count(pid) as int) from pg_catalog.pg_stat_get_wal_senders() " + "WHERE pid is not null", + true, /* read_only */ + 0 /*count*/ + ); + + if (res >= 0) + { + data = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull); + info.num_wal_senders = DatumGetInt32(data); + info.got_num_wal_senders = true; + } + + /* use count() > 0 in case they start having pg_stat_get_wal_receiver() + * return no rows when the DB isn't a replica */ + res = SPI_execute("SELECT count(pid) > 0 from pg_catalog.pg_stat_get_wal_receiver() WHERE pid " + "is not null", + true, /* read_only */ + 0 /*count*/ + ); + if (res >= 0) + { + data = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull); + info.is_wal_receiver = DatumGetBool(data); + info.got_is_wal_receiver = true; + } + + SPI_finish(); + + return info; +} diff --git a/src/telemetry/replication.h b/src/telemetry/replication.h new file mode 100644 index 00000000000..4d5d0e04227 --- /dev/null +++ b/src/telemetry/replication.h @@ -0,0 +1,24 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ +#ifndef TIMESCALEDB_TELEMETRY_REPLICATION_H +#define TIMESCALEDB_TELEMETRY_REPLICATION_H + +#include + +#include "utils.h" + +typedef struct ReplicationInfo +{ + bool got_num_wal_senders; + int32 num_wal_senders; + + bool got_is_wal_receiver; + bool is_wal_receiver; +} ReplicationInfo; + +extern ReplicationInfo ts_telemetry_replication_info_gather(void); + +#endif /* TIMESCALEDB_TELEMETRY_REPLICATION_H */ diff --git a/src/telemetry/telemetry.c b/src/telemetry/telemetry.c index 27039597544..d8d564ac8ee 100644 --- a/src/telemetry/telemetry.c +++ b/src/telemetry/telemetry.c @@ -31,6 +31,7 @@ #include "ts_catalog/compression_chunk_size.h" #include "stats.h" #include "functions.h" +#include "replication.h" #include "cross_module_fn.h" @@ -72,6 +73,9 @@ #define REQ_INSTANCE_METADATA "instance_metadata" #define REQ_TS_TELEMETRY_CLOUD "cloud" +#define REQ_NUM_WAL_SENDERS "num_wal_senders" +#define REQ_IS_WAL_RECEIVER "is_wal_receiver" + #define PG_PROMETHEUS "pg_prometheus" #define PROMSCALE "promscale" #define POSTGIS "postgis" @@ -445,6 +449,17 @@ add_function_call_telemetry(JsonbParseState *state) pushJsonbValue(&state, WJB_END_OBJECT, NULL); } +static void +add_replication_telemetry(JsonbParseState *state) +{ + ReplicationInfo info = ts_telemetry_replication_info_gather(); + if (info.got_num_wal_senders) + ts_jsonb_add_int32(state, REQ_NUM_WAL_SENDERS, info.num_wal_senders); + + if (info.got_is_wal_receiver) + ts_jsonb_add_bool(state, REQ_IS_WAL_RECEIVER, info.is_wal_receiver); +} + #define REQ_RELS "relations" #define REQ_RELS_TABLES "tables" #define REQ_RELS_PARTITIONED_TABLES "partitioned_tables" @@ -455,6 +470,7 @@ add_function_call_telemetry(JsonbParseState *state) #define REQ_RELS_DISTRIBUTED_HYPERTABLES_DN "distributed_hypertables_data_node" #define REQ_RELS_CONTINUOUS_AGGS "continuous_aggregates" #define REQ_FUNCTIONS_USED "functions_used" +#define REQ_REPLICATION "replication" static Jsonb * build_telemetry_report() @@ -631,6 +647,16 @@ build_telemetry_report() pushJsonbValue(&parse_state, WJB_KEY, &key); add_function_call_telemetry(parse_state); + /* Add replication object */ + key.type = jbvString; + key.val.string.val = REQ_REPLICATION; + key.val.string.len = strlen(REQ_REPLICATION); + pushJsonbValue(&parse_state, WJB_KEY, &key); + + pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL); + add_replication_telemetry(parse_state); + pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL); + /* end of telemetry object */ result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL); diff --git a/test/expected/telemetry.out b/test/expected/telemetry.out index b2574f5c716..6243745449e 100644 --- a/test/expected/telemetry.out +++ b/test/expected/telemetry.out @@ -374,6 +374,7 @@ WHERE key != 'os_name_pretty'; os_version data_volume db_metadata + replication build_os_name functions_used install_method @@ -394,7 +395,7 @@ WHERE key != 'os_name_pretty'; num_user_defined_actions build_architecture_bit_size num_continuous_aggs_policies -(28 rows) +(29 rows) CREATE MATERIALIZED VIEW telemetry_report AS SELECT t FROM get_telemetry_report() t; diff --git a/test/t/002_replication_telemetry.pl b/test/t/002_replication_telemetry.pl new file mode 100644 index 00000000000..ed95e56d475 --- /dev/null +++ b/test/t/002_replication_telemetry.pl @@ -0,0 +1,58 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +use strict; +use warnings; +use TimescaleNode; +use TestLib; +use Data::Dumper; +use Test::More tests => 4; + +# This test checks that the extension state is handled correctly +# across multiple sessions. Specifically, if the extension state +# changes in one session (e.g., the extension is created or dropped), +# this should be reflected in other concurrent sessions. +# +# To test this, we start one background psql session that stays open +# for the duration of the tests and then change the extension state +# from other sessions. +my $node_primary = TimescaleNode->create( + 'primary', + allows_streaming => 1, + auth_extra => [ '--create-role', 'repl_role' ]); +my $backup_name = 'my_backup'; + +# Take backup +$node_primary->backup($backup_name); + +# Create streaming standby linking to primary +my $node_standby = PostgresNode->get_new_node('standby_1'); +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->start; + +# Wait for standby to catch up +$node_primary->wait_for_catchup($node_standby, 'replay', + $node_primary->lsn('insert')); + + +my $result = $node_primary->safe_psql('postgres', + "SELECT get_telemetry_report()->'replication'->>'num_wal_senders'"); +is($result, qq(1), 'number of wal senders on primary'); + +$result = $node_primary->safe_psql('postgres', + "SELECT get_telemetry_report()->'replication'->>'is_wal_receiver'"); +is($result, qq(false), 'primary is wal receiver'); + +$result = $node_standby->safe_psql('postgres', + "SELECT get_telemetry_report()->'replication'->>'num_wal_senders'"); +is($result, qq(0), 'number of wal senders on standby'); + +$result = $node_standby->safe_psql('postgres', + "SELECT get_telemetry_report()->'replication'->>'is_wal_receiver'"); +is($result, qq(true), 'standby is wal receiver'); + +done_testing(); + +1; diff --git a/test/t/CMakeLists.txt b/test/t/CMakeLists.txt index 729b18c1c76..2417e1b6911 100644 --- a/test/t/CMakeLists.txt +++ b/test/t/CMakeLists.txt @@ -1,6 +1,7 @@ set(PROVE_DEBUG_TEST_FILES) if(${PG_VERSION_MAJOR} GREATER "12") - list(APPEND PROVE_DEBUG_TEST_FILES 001_extension.pl) + list(APPEND PROVE_DEBUG_TEST_FILES 001_extension.pl + 002_replication_telemetry.pl) endif() if(CMAKE_BUILD_TYPE MATCHES Debug)