Skip to content

Commit

Permalink
Add job execution statistics to telemetry
Browse files Browse the repository at this point in the history
This patch adds two new fields to the telemetry report,
`stats_by_job_type` and `errors_by_sqlerrcode`. Both report results
grouped by job type (different types of policies or
user defined action).
The patch also adds a new field to the `bgw_job_stats` table,
`total_duration_errors` to separate the duration of the failed runs
from the duration of successful ones.
  • Loading branch information
konskov committed Nov 2, 2022
1 parent 2475c1b commit 0745da2
Show file tree
Hide file tree
Showing 13 changed files with 535 additions and 83 deletions.
1 change: 1 addition & 0 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat (
last_run_success bool NOT NULL,
total_runs bigint NOT NULL,
total_duration interval NOT NULL,
total_duration_failures interval NOT NULL,
total_successes bigint NOT NULL,
total_failures bigint NOT NULL,
total_crashes bigint NOT NULL,
Expand Down
3 changes: 2 additions & 1 deletion sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat (
last_run_success bool NOT NULL,
total_runs bigint NOT NULL,
total_duration interval NOT NULL,
total_duration_failures interval NOT NULL,
total_successes bigint NOT NULL,
total_failures bigint NOT NULL,
total_crashes bigint NOT NULL,
Expand All @@ -69,7 +70,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat (
);

INSERT INTO _timescaledb_internal.bgw_job_stat SELECT
job_id, last_start, last_finish, next_start, last_successful_finish, last_run_success, total_runs, total_duration, total_successes, total_failures, total_crashes, consecutive_failures, consecutive_crashes, 0
job_id, last_start, last_finish, next_start, last_successful_finish, last_run_success, total_runs, total_duration, '00:00:00'::interval, total_successes, total_failures, total_crashes, consecutive_failures, consecutive_crashes, 0
FROM _timescaledb_internal._tmp_bgw_job_stat;
DROP TABLE _timescaledb_internal._tmp_bgw_job_stat;

Expand Down
14 changes: 10 additions & 4 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,6 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
duration = DatumGetIntervalP(DirectFunctionCall2(timestamp_mi,
TimestampTzGetDatum(fd->last_finish),
TimestampTzGetDatum(fd->last_start)));
fd->total_duration =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration),
IntervalPGetDatum(duration)));

/* undo marking created by start marks */
fd->last_run_success = result_ctx->result == JOB_SUCCESS ? true : false;
Expand All @@ -449,6 +445,10 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
fd->total_success++;
fd->consecutive_failures = 0;
fd->last_successful_finish = fd->last_finish;
fd->total_duration =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration),
IntervalPGetDatum(duration)));
/* Mark the next start at the end if the job itself hasn't */
if (!bgw_job_stat_next_start_was_set(fd))
fd->next_start = calculate_next_start_on_success(fd->last_finish, result_ctx->job);
Expand All @@ -457,6 +457,10 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
{
fd->total_failures++;
fd->consecutive_failures++;
fd->total_duration_failures =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration_failures),
IntervalPGetDatum(duration)));

/*
* Mark the next start at the end if the job itself hasn't (this may
Expand Down Expand Up @@ -543,6 +547,8 @@ bgw_job_stat_insert_relation(Relation rel, int32 bgw_job_id, bool mark_start,
Int64GetDatum((mark_start ? 1 : 0));
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_duration)] =
IntervalPGetDatum(&zero_ival);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_duration_failures)] =
IntervalPGetDatum(&zero_ival);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_success)] = Int64GetDatum(0);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_failures)] = Int64GetDatum(0);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_failures)] = Int32GetDatum(0);
Expand Down
12 changes: 12 additions & 0 deletions src/telemetry/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ typedef struct TelemetryStats
BaseStats views;
} TelemetryStats;

typedef struct TelemetryJobStats
{
int64 total_runs;
int64 total_successes;
int64 total_failures;
int64 total_crashes;
int32 max_consecutive_failures;
int32 max_consecutive_crashes;
Interval *total_duration;
Interval *total_duration_failures;
} TelemetryJobStats;

extern void ts_telemetry_stats_gather(TelemetryStats *stats);

#endif /* TIMESCALEDB_TELEMETRY_STATS_H */
278 changes: 278 additions & 0 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

#include "cross_module_fn.h"

#include <executor/spi.h>

#define TS_TELEMETRY_VERSION 2
#define TS_VERSION_JSON_FIELD "current_timescaledb_version"
#define TS_IS_UPTODATE_JSON_FIELD "is_up_to_date"
Expand Down Expand Up @@ -87,6 +89,9 @@
#define TIMESCALE_ANALYTICS "timescale_analytics"
#define TIMESCALEDB_TOOLKIT "timescaledb_toolkit"

#define REQ_JOB_STATS_BY_JOB_TYPE "stats_by_job_type"
#define REQ_NUM_ERR_BY_SQLERRCODE "errors_by_sqlerrcode"

static const char *related_extensions[] = {
PG_PROMETHEUS, PROMSCALE, POSTGIS, TIMESCALE_ANALYTICS, TIMESCALEDB_TOOLKIT,
};
Expand Down Expand Up @@ -264,6 +269,259 @@ add_job_counts(JsonbParseState *state)
ts_jsonb_add_int32(state, REQ_NUM_USER_DEFINED_ACTIONS_FIXED, counts.user_defined_action_fixed);
}

static JsonbValue *
add_errors_by_sqlerrcode_internal(JsonbParseState *parse_state, const char *job_type,
Jsonb *sqlerrs_jsonb)
{
JsonbIterator *it;
JsonbIteratorToken type;
JsonbValue val;
JsonbValue *ret;
JsonbValue key = {
.type = jbvString,
.val.string.val = pstrdup(job_type),
.val.string.len = strlen(job_type),
};

ret = pushJsonbValue(&parse_state, WJB_KEY, &key);
ret = pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

// now need to iterate through the jsonb fields and add them
/* we don't expect nested values here */
it = JsonbIteratorInit(&sqlerrs_jsonb->root);
type = JsonbIteratorNext(&it, &val, true /*skip_nested*/);
if (type != WJB_BEGIN_OBJECT)
elog(ERROR, "invalid JSON format");
while ((type = JsonbIteratorNext(&it, &val, true)))
{
const char *errcode;
int64 errcnt;

if (type == WJB_END_OBJECT)
break;
else if (type == WJB_KEY)
{
errcode = pnstrdup(val.val.string.val, val.val.string.len);
// now get the value for this key
type = JsonbIteratorNext(&it, &val, true);
if (type != WJB_VALUE)
elog(ERROR, "unexpected jsonb type");
errcnt =
DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(val.val.numeric)));
// now that we have both key and value, push them
ts_jsonb_add_int64(parse_state, errcode, errcnt);
}
else
// we are not expecting anything else for this
elog(ERROR, "unexpected jsonb type");
}
// close the jsonb that corresponds to this job_type
ret = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return ret;
}
/* this function queries the database through SPI and gets back a set of records
that look like (job_type TEXT, jsonb_object_agg JSONB).
For example, (user_defined_action, {"P0001": 2, "42883": 5})
(we are expecting about 6 rows depending
on how we write the query and if we exclude any jobs)
Then for each returned row adds a new kv pair to the jsonb,
which looks like "job_type": {"errtype1": errcnt1, ...} */
static void
add_errors_by_sqlerrcode(JsonbParseState *parse_state)
{
int res;
StringInfo command;
MemoryContext old_context = CurrentMemoryContext, spi_context;

const char *command_string =
"SELECT "
"job_type, pg_catalog.jsonb_object_agg(sqlerrcode, count) "
"FROM"
"("
" SELECT ("
" CASE "
" WHEN error_data OPERATOR(pg_catalog.->>) \'proc_schema\' "
"OPERATOR(pg_catalog.=) "
"\'_timescaledb_internal\' AND error_data OPERATOR(pg_catalog.->>) \'proc_name\' "
"OPERATOR(pg_catalog.~) "
"\'^policy_(retention|compression|reorder|refresh_continuous_"
"aggregate|telemetry|job_error_retention)$\' "
" THEN error_data OPERATOR(pg_catalog.->>) \'proc_name\' "
" ELSE \'user_defined_action\'"
" END"
" ) as job_type, "
" error_data OPERATOR(pg_catalog.->>) \'sqlerrcode\' as sqlerrcode, "
" pg_catalog.COUNT(*) "
" FROM "
" _timescaledb_internal.job_errors "
" WHERE error_data OPERATOR(pg_catalog.->>) \'sqlerrcode\' IS NOT NULL "
" GROUP BY job_type, error_data->> \'sqlerrcode\' "
" ORDER BY job_type"
") q "
"GROUP BY q.job_type";

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

command = makeStringInfo();

appendStringInfoString(command, command_string);
res = SPI_execute(command->data, true /*read only*/, 0 /* count */);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get errors by sqlerrcode and job type"))));

/* we expect about 6 rows returned, each row is a record (TEXT, JSONB) */
for (uint64_t i = 0; i < SPI_processed; i++)
{
// jobtype is text
Datum record_jobtype, record_jsonb;
bool isnull_jobtype, isnull_jsonb;

record_jobtype =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
if (isnull_jobtype)
elog(ERROR, "null job type returned");
record_jsonb =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &isnull_jsonb);
/* this jsonb looks like {"P0001": 32, "42883": 6} */
Jsonb *sqlerrs_jsonb = isnull_jsonb ? NULL : DatumGetJsonbP(record_jsonb);

if (sqlerrs_jsonb == NULL)
continue;
/* the jsonb object cannot be created in the SPI context or it will be lost */
spi_context = MemoryContextSwitchTo(old_context);
add_errors_by_sqlerrcode_internal(parse_state,
TextDatumGetCString(record_jobtype),
sqlerrs_jsonb);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_finish();

Assert(res == SPI_OK_FINISH);
}

static JsonbValue *
add_job_stats_internal(JsonbParseState *state, const char *job_type, TelemetryJobStats *stats)
{
JsonbValue key = {
.type = jbvString,
.val.string.val = pstrdup(job_type),
.val.string.len = strlen(job_type),
};
pushJsonbValue(&state, WJB_KEY, &key);
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);

ts_jsonb_add_int64(state, "total_runs", stats->total_runs);
ts_jsonb_add_int64(state, "total_successes", stats->total_successes);
ts_jsonb_add_int64(state, "total_failures", stats->total_failures);
ts_jsonb_add_int64(state, "total_crashes", stats->total_crashes);
ts_jsonb_add_int32(state, "max_consecutive_failures", stats->max_consecutive_failures);
ts_jsonb_add_int32(state, "max_consecutive_crashes", stats->max_consecutive_crashes);
ts_jsonb_add_interval(state, "total_duration", stats->total_duration);
ts_jsonb_add_interval(state, "total_duration_failures", stats->total_duration_failures);

return pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}

static void
add_job_stats_by_job_type(JsonbParseState *parse_state)
{
StringInfo command;
int res;
MemoryContext old_context = CurrentMemoryContext, spi_context;
SPITupleTable *tuptable = NULL;

const char *command_string =
"SELECT ("
" CASE "
" WHEN j.proc_schema OPERATOR(pg_catalog.=) \'_timescaledb_internal\' AND "
"j.proc_name OPERATOR(pg_catalog.~) "
"\'^policy_(retention|compression|reorder|refresh_continuous_aggregate|telemetry|job_error_"
"retention)$\' "
" THEN CAST(j.proc_name AS pg_catalog.text) "
" ELSE \'user_defined_action\' "
" END"
") AS job_type, "
" CAST(pg_catalog.sum(total_runs) AS pg_catalog.int8) as total_runs, "
" CAST(pg_catalog.sum(total_successes) AS pg_catalog.int8) as total_successes, "
" CAST(pg_catalog.sum(total_failures) AS pg_catalog.int8) as total_failures, "
" CAST(pg_catalog.sum(total_crashes) AS pg_catalog.int8) as total_crashes, "
" pg_catalog.sum(total_duration) as total_duration, "
" pg_catalog.sum(total_duration_failures) as total_duration_failures, "
" pg_catalog.max(consecutive_failures) as max_consecutive_failures, "
" pg_catalog.max(consecutive_crashes) as max_consecutive_crashes "
"FROM "
" _timescaledb_internal.bgw_job_stat s "
" JOIN _timescaledb_config.bgw_job j on j.id OPERATOR(pg_catalog.=) s.job_id "
"GROUP BY "
"job_type";

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

command = makeStringInfo();

appendStringInfoString(command, command_string);
res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get job statistics by job type"))));
/* a row returned looks like this:
("policy_telemetry", 12, 10, 1, 1, 00:00:11, 1, 1)
*/
for (uint64_t i = 0; i < SPI_processed; i++)
{
tuptable = SPI_tuptable;
TupleDesc tupdesc = tuptable->tupdesc;
Datum jobtype_datum;
Datum total_runs, total_successes, total_failures, total_crashes;
Datum total_duration, total_duration_failures, max_consec_crashes, max_consec_fails;

bool isnull_jobtype, isnull_runs, isnull_successes, isnull_failures, isnull_crashes;
bool isnull_duration, isnull_duration_failures, isnull_consec_crashes, isnull_consec_fails;

jobtype_datum =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
if (isnull_jobtype)
elog(ERROR, "null job type returned");
total_runs = SPI_getbinval(tuptable->vals[i], tupdesc, 2, &isnull_runs);
total_successes = SPI_getbinval(tuptable->vals[i], tupdesc, 3, &isnull_successes);
total_failures = SPI_getbinval(tuptable->vals[i], tupdesc, 4, &isnull_failures);
total_crashes = SPI_getbinval(tuptable->vals[i], tupdesc, 5, &isnull_crashes);
total_duration = SPI_getbinval(tuptable->vals[i], tupdesc, 6, &isnull_duration);
total_duration_failures =
SPI_getbinval(tuptable->vals[i], tupdesc, 7, &isnull_duration_failures);
max_consec_fails = SPI_getbinval(tuptable->vals[i], tupdesc, 8, &isnull_consec_fails);
max_consec_crashes = SPI_getbinval(tuptable->vals[i], tupdesc, 9, &isnull_consec_crashes);

if (isnull_jobtype || isnull_runs || isnull_successes || isnull_failures ||
isnull_crashes || isnull_duration || isnull_consec_crashes || isnull_consec_fails)
{
elog(ERROR, "null record field returned");
}

spi_context = MemoryContextSwitchTo(old_context);
TelemetryJobStats stats = { .total_runs = DatumGetInt64(total_runs),
.total_successes = DatumGetInt64(total_successes),
.total_failures = DatumGetInt64(total_failures),
.total_crashes = DatumGetInt64(total_crashes),
.max_consecutive_failures = DatumGetInt32(max_consec_fails),
.max_consecutive_crashes = DatumGetInt32(max_consec_crashes),
.total_duration = DatumGetIntervalP(total_duration),
.total_duration_failures =
DatumGetIntervalP(total_duration_failures) };
add_job_stats_internal(parse_state, TextDatumGetCString(jobtype_datum), &stats);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_finish();
Assert(res == SPI_OK_FINISH);
}

static int64
get_database_size()
{
Expand Down Expand Up @@ -545,6 +803,26 @@ build_telemetry_report()
ts_jsonb_add_str(parse_state, REQ_BUILD_ARCHITECTURE, BUILD_PROCESSOR);
ts_jsonb_add_int32(parse_state, REQ_BUILD_ARCHITECTURE_BIT_SIZE, get_architecture_bit_size());
ts_jsonb_add_int64(parse_state, REQ_DATA_VOLUME, get_database_size());
/* add job execution stats */
key.type = jbvString;
key.val.string.val = REQ_NUM_ERR_BY_SQLERRCODE;
key.val.string.len = strlen(REQ_NUM_ERR_BY_SQLERRCODE);
pushJsonbValue(&parse_state, WJB_KEY, &key);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

add_errors_by_sqlerrcode(parse_state);

pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

key.type = jbvString;
key.val.string.val = REQ_JOB_STATS_BY_JOB_TYPE;
key.val.string.len = strlen(REQ_JOB_STATS_BY_JOB_TYPE);
pushJsonbValue(&parse_state, WJB_KEY, &key);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

add_job_stats_by_job_type(parse_state);

pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

/* Add relation stats */
ts_telemetry_stats_gather(&relstats);
Expand Down

0 comments on commit 0745da2

Please sign in to comment.