Skip to content

Commit

Permalink
Add telemetry about replication
Browse files Browse the repository at this point in the history
This commit adds telemetry about replication status to our telemetry
gatherer. It adds a new sub object `replication` containing two fields:
  - `is_wal_receiver` is a boolean which is true if-and-only-if the
     current cluster has a `wal_receiver`.
  - `num_wal_senders` is the number of `wal_senders` that the current
     cluster has.
  • Loading branch information
JLockerman committed Jun 13, 2022
1 parent 8f9975d commit 97ab772
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/telemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Add all *.c to sources in upperlevel directory
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/functions.c ${CMAKE_CURRENT_SOURCE_DIR}/stats.c
${CMAKE_CURRENT_SOURCE_DIR}/functions.c
${CMAKE_CURRENT_SOURCE_DIR}/replication.c
${CMAKE_CURRENT_SOURCE_DIR}/stats.c
${CMAKE_CURRENT_SOURCE_DIR}/telemetry_metadata.c
${CMAKE_CURRENT_SOURCE_DIR}/telemetry.c)
target_sources(${PROJECT_NAME} PRIVATE ${SOURCES})
56 changes: 56 additions & 0 deletions src/telemetry/replication.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>

#include "replication.h"

#include <executor/spi.h>

ReplicationInfo
ts_telemetry_replication_info_gather(void)
{
int res;
bool isnull;
Datum data;
ReplicationInfo info = {
.got_num_wal_senders = false,
.got_is_wal_receiver = false,
};

if (SPI_connect() != SPI_OK_CONNECT)
return info;

res = SPI_execute("SELECT cast(count(pid) as int) from pg_catalog.pg_stat_get_wal_senders() "
"WHERE pid is not null",
true, /* read_only */
0 /*count*/
);

if (res >= 0)
{
data = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
info.num_wal_senders = DatumGetInt32(data);
info.got_num_wal_senders = true;
}

/* use count() > 0 in case they start having pg_stat_get_wal_receiver()
* return no rows when the DB isn't a replica */
res = SPI_execute("SELECT count(pid) > 0 from pg_catalog.pg_stat_get_wal_receiver() WHERE pid "
"is not null",
true, /* read_only */
0 /*count*/
);
if (res >= 0)
{
data = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
info.is_wal_receiver = DatumGetBool(data);
info.got_is_wal_receiver = true;
}

SPI_finish();

return info;
}
24 changes: 24 additions & 0 deletions src/telemetry/replication.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#ifndef TIMESCALEDB_TELEMETRY_REPLICATION_H
#define TIMESCALEDB_TELEMETRY_REPLICATION_H

#include <postgres.h>

#include "utils.h"

typedef struct ReplicationInfo
{
bool got_num_wal_senders;
int32 num_wal_senders;

bool got_is_wal_receiver;
bool is_wal_receiver;
} ReplicationInfo;

extern ReplicationInfo ts_telemetry_replication_info_gather(void);

#endif /* TIMESCALEDB_TELEMETRY_REPLICATION_H */
26 changes: 26 additions & 0 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "ts_catalog/compression_chunk_size.h"
#include "stats.h"
#include "functions.h"
#include "replication.h"

#include "cross_module_fn.h"

Expand Down Expand Up @@ -72,6 +73,9 @@
#define REQ_INSTANCE_METADATA "instance_metadata"
#define REQ_TS_TELEMETRY_CLOUD "cloud"

#define REQ_NUM_WAL_SENDERS "num_wal_senders"
#define REQ_IS_WAL_RECEIVER "is_wal_receiver"

#define PG_PROMETHEUS "pg_prometheus"
#define PROMSCALE "promscale"
#define POSTGIS "postgis"
Expand Down Expand Up @@ -445,6 +449,17 @@ add_function_call_telemetry(JsonbParseState *state)
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}

static void
add_replication_telemetry(JsonbParseState *state)
{
ReplicationInfo info = ts_telemetry_replication_info_gather();
if (info.got_num_wal_senders)
ts_jsonb_add_int32(state, REQ_NUM_WAL_SENDERS, info.num_wal_senders);

if (info.got_is_wal_receiver)
ts_jsonb_add_bool(state, REQ_IS_WAL_RECEIVER, info.is_wal_receiver);
}

#define REQ_RELS "relations"
#define REQ_RELS_TABLES "tables"
#define REQ_RELS_PARTITIONED_TABLES "partitioned_tables"
Expand All @@ -455,6 +470,7 @@ add_function_call_telemetry(JsonbParseState *state)
#define REQ_RELS_DISTRIBUTED_HYPERTABLES_DN "distributed_hypertables_data_node"
#define REQ_RELS_CONTINUOUS_AGGS "continuous_aggregates"
#define REQ_FUNCTIONS_USED "functions_used"
#define REQ_REPLICATION "replication"

static Jsonb *
build_telemetry_report()
Expand Down Expand Up @@ -631,6 +647,16 @@ build_telemetry_report()
pushJsonbValue(&parse_state, WJB_KEY, &key);
add_function_call_telemetry(parse_state);

/* Add replication object */
key.type = jbvString;
key.val.string.val = REQ_REPLICATION;
key.val.string.len = strlen(REQ_REPLICATION);
pushJsonbValue(&parse_state, WJB_KEY, &key);

pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
add_replication_telemetry(parse_state);
pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

/* end of telemetry object */
result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

Expand Down
3 changes: 2 additions & 1 deletion test/expected/telemetry.out
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ WHERE key != 'os_name_pretty';
os_version
data_volume
db_metadata
replication
build_os_name
functions_used
install_method
Expand All @@ -394,7 +395,7 @@ WHERE key != 'os_name_pretty';
num_user_defined_actions
build_architecture_bit_size
num_continuous_aggs_policies
(28 rows)
(29 rows)

CREATE MATERIALIZED VIEW telemetry_report AS
SELECT t FROM get_telemetry_report() t;
Expand Down
58 changes: 58 additions & 0 deletions test/t/002_replication_telemetry.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.

use strict;
use warnings;
use TimescaleNode;
use TestLib;
use Data::Dumper;
use Test::More tests => 4;

# This test checks that the extension state is handled correctly
# across multiple sessions. Specifically, if the extension state
# changes in one session (e.g., the extension is created or dropped),
# this should be reflected in other concurrent sessions.
#
# To test this, we start one background psql session that stays open
# for the duration of the tests and then change the extension state
# from other sessions.
my $node_primary = TimescaleNode->create(
'primary',
allows_streaming => 1,
auth_extra => [ '--create-role', 'repl_role' ]);
my $backup_name = 'my_backup';

# Take backup
$node_primary->backup($backup_name);

# Create streaming standby linking to primary
my $node_standby = PostgresNode->get_new_node('standby_1');
$node_standby->init_from_backup($node_primary, $backup_name,
has_streaming => 1);
$node_standby->start;

# Wait for standby to catch up
$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('insert'));


my $result = $node_primary->safe_psql('postgres',
"SELECT get_telemetry_report()->'replication'->>'num_wal_senders'");
is($result, qq(1), 'number of wal senders on primary');

$result = $node_primary->safe_psql('postgres',
"SELECT get_telemetry_report()->'replication'->>'is_wal_receiver'");
is($result, qq(false), 'primary is wal receiver');

$result = $node_standby->safe_psql('postgres',
"SELECT get_telemetry_report()->'replication'->>'num_wal_senders'");
is($result, qq(0), 'number of wal senders on standby');

$result = $node_standby->safe_psql('postgres',
"SELECT get_telemetry_report()->'replication'->>'is_wal_receiver'");
is($result, qq(true), 'standby is wal receiver');

done_testing();

1;
3 changes: 2 additions & 1 deletion test/t/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
set(PROVE_DEBUG_TEST_FILES)
if(${PG_VERSION_MAJOR} GREATER "12")
list(APPEND PROVE_DEBUG_TEST_FILES 001_extension.pl)
list(APPEND PROVE_DEBUG_TEST_FILES 001_extension.pl
002_replication_telemetry.pl)
endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
Expand Down

0 comments on commit 97ab772

Please sign in to comment.