Skip to content

Commit

Permalink
Add job execution stats to telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
konskov committed Oct 20, 2022
1 parent f55aaf0 commit bffe462
Show file tree
Hide file tree
Showing 13 changed files with 531 additions and 83 deletions.
1 change: 1 addition & 0 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat (
last_run_success bool NOT NULL,
total_runs bigint NOT NULL,
total_duration interval NOT NULL,
total_duration_failures interval NOT NULL,
total_successes bigint NOT NULL,
total_failures bigint NOT NULL,
total_crashes bigint NOT NULL,
Expand Down
3 changes: 2 additions & 1 deletion sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat (
last_run_success bool NOT NULL,
total_runs bigint NOT NULL,
total_duration interval NOT NULL,
total_duration_failures interval NOT NULL,
total_successes bigint NOT NULL,
total_failures bigint NOT NULL,
total_crashes bigint NOT NULL,
Expand All @@ -69,7 +70,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat (
);

INSERT INTO _timescaledb_internal.bgw_job_stat SELECT
job_id, last_start, last_finish, next_start, last_successful_finish, last_run_success, total_runs, total_duration, total_successes, total_failures, total_crashes, consecutive_failures, consecutive_crashes, 0
job_id, last_start, last_finish, next_start, last_successful_finish, last_run_success, total_runs, total_duration, '00:00:00'::interval, total_successes, total_failures, total_crashes, consecutive_failures, consecutive_crashes, 0
FROM _timescaledb_internal._tmp_bgw_job_stat;
DROP TABLE _timescaledb_internal._tmp_bgw_job_stat;

Expand Down
14 changes: 10 additions & 4 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,6 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
duration = DatumGetIntervalP(DirectFunctionCall2(timestamp_mi,
TimestampTzGetDatum(fd->last_finish),
TimestampTzGetDatum(fd->last_start)));
fd->total_duration =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration),
IntervalPGetDatum(duration)));

/* undo marking created by start marks */
fd->last_run_success = result_ctx->result == JOB_SUCCESS ? true : false;
Expand All @@ -442,6 +438,10 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
fd->total_success++;
fd->consecutive_failures = 0;
fd->last_successful_finish = fd->last_finish;
fd->total_duration =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration),
IntervalPGetDatum(duration)));
/* Mark the next start at the end if the job itself hasn't */
if (!bgw_job_stat_next_start_was_set(fd))
fd->next_start = calculate_next_start_on_success(fd->last_finish, result_ctx->job);
Expand All @@ -450,6 +450,10 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
{
fd->total_failures++;
fd->consecutive_failures++;
fd->total_duration_failures =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration_failures),
IntervalPGetDatum(duration)));

/*
* Mark the next start at the end if the job itself hasn't (this may
Expand Down Expand Up @@ -536,6 +540,8 @@ bgw_job_stat_insert_relation(Relation rel, int32 bgw_job_id, bool mark_start,
Int64GetDatum((mark_start ? 1 : 0));
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_duration)] =
IntervalPGetDatum(&zero_ival);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_duration_failures)] =
IntervalPGetDatum(&zero_ival);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_success)] = Int64GetDatum(0);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_failures)] = Int64GetDatum(0);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_failures)] = Int32GetDatum(0);
Expand Down
12 changes: 12 additions & 0 deletions src/telemetry/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ typedef struct TelemetryStats
BaseStats views;
} TelemetryStats;

typedef struct TelemetryJobStats
{
int64 total_runs;
int64 total_successes;
int64 total_failures;
int64 total_crashes;
int32 max_consecutive_failures;
int32 max_consecutive_crashes;
Interval *total_duration;
Interval *total_duration_failures;
} TelemetryJobStats;

extern void ts_telemetry_stats_gather(TelemetryStats *stats);

#endif /* TIMESCALEDB_TELEMETRY_STATS_H */
274 changes: 274 additions & 0 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

#include "cross_module_fn.h"

#include <executor/spi.h>

#define TS_TELEMETRY_VERSION 2
#define TS_VERSION_JSON_FIELD "current_timescaledb_version"
#define TS_IS_UPTODATE_JSON_FIELD "is_up_to_date"
Expand Down Expand Up @@ -82,6 +84,9 @@
#define TIMESCALE_ANALYTICS "timescale_analytics"
#define TIMESCALEDB_TOOLKIT "timescaledb_toolkit"

#define REQ_JOB_STATS_BY_JOB_TYPE "stats_by_job_type"
#define REQ_NUM_ERR_BY_SQLERRCODE "errors_by_sqlerrcode"

static const char *related_extensions[] = {
PG_PROMETHEUS, PROMSCALE, POSTGIS, TIMESCALE_ANALYTICS, TIMESCALEDB_TOOLKIT,
};
Expand Down Expand Up @@ -232,6 +237,255 @@ add_job_counts(JsonbParseState *state)
ts_jsonb_add_int32(state, REQ_NUM_USER_DEFINED_ACTIONS, counts.user_defined_action);
}

static JsonbValue *
add_errors_by_sqlerrcode_internal(JsonbParseState *parse_state, const char *job_type,
Jsonb *sqlerrs_jsonb)
{
JsonbIterator *it;
JsonbIteratorToken type;
JsonbValue val;
JsonbValue *ret;
JsonbValue key = {
.type = jbvString,
.val.string.val = pstrdup(job_type),
.val.string.len = strlen(job_type),
};

ret = pushJsonbValue(&parse_state, WJB_KEY, &key);
ret = pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

// now need to iterate through the jsonb fields and add them
/* we don't expect nested values here */
it = JsonbIteratorInit(&sqlerrs_jsonb->root);
type = JsonbIteratorNext(&it, &val, true /*skip_nested*/);
if (type != WJB_BEGIN_OBJECT)
elog(ERROR, "invalid JSON format");
while ((type = JsonbIteratorNext(&it, &val, true)))
{
const char *errcode;
// JsonbValue errcode;
int64 errcnt;
if (type == WJB_END_OBJECT)
break;
else if (type == WJB_KEY)
{
errcode = pnstrdup(val.val.string.val, val.val.string.len);
// now get the value for this key
type = JsonbIteratorNext(&it, &val, true);
if (type != WJB_VALUE)
elog(ERROR, "unexpected jsonb type");
errcnt =
DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(val.val.numeric)));
// now that we have both key and value, push them
ts_jsonb_add_int64(parse_state, errcode, errcnt);
}
else
// we are not expecting anything else for this
elog(ERROR, "unexpected jsonb type");
}
// close the jsonb that corresponds to this job_type
ret = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return ret;
}
/* this function queries the database through SPI and gets back a set of records
that look like (job_type TEXT, jsonb_object_agg JSONB).
For example, (user_defined_action, {"P0001": 2, "42883": 5})
(we are expecting about 6 rows depending
on how we write the query and if we exclude any jobs)
Then for each returned row adds a new kv pair to the jsonb,
which looks like "job_type": {"errtype1": errcnt1, ...} */
static void
add_errors_by_sqlerrcode(JsonbParseState *parse_state)
{
int res;
StringInfo command;
MemoryContext old_context = CurrentMemoryContext, spi_context;

const char *command_string = "SELECT "
"job_type, pg_catalog.jsonb_object_agg(sqlerrcode, count) "
"FROM"
"("
" SELECT ("
" CASE "
" WHEN error_data OPERATOR(pg_catalog.->>) \'proc_schema\' OPERATOR(pg_catalog.=) "
"\'_timescaledb_internal\' AND error_data OPERATOR(pg_catalog.->>) \'proc_name\' OPERATOR(pg_catalog.~) "
"\'^policy_(retention|compression|reorder|refresh_continuous_"
"aggregate|telemetry|job_error_retention)$\' "
" THEN error_data OPERATOR(pg_catalog.->>) \'proc_name\' "
" ELSE \'user_defined_action\'"
" END"
" ) as job_type, "
" error_data OPERATOR(pg_catalog.->>) \'sqlerrcode\' as sqlerrcode, "
" pg_catalog.COUNT(*) "
" FROM "
" _timescaledb_internal.job_errors "
" WHERE error_data OPERATOR(pg_catalog.->>) \'sqlerrcode\' IS NOT NULL "
" GROUP BY job_type, error_data->> \'sqlerrcode\' "
" ORDER BY job_type"
") q "
"GROUP BY q.job_type";

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

command = makeStringInfo();

appendStringInfoString(command, command_string);
res = SPI_execute(command->data, true /*read only*/, 0 /* count */);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get errors by sqlerrcode and job type"))));

/* we expect about 6 rows returned, each row is a record (TEXT, JSONB) */
for (int i = 0; i < SPI_processed; i++)
{
// jobtype is text
Datum record_jobtype, record_jsonb;
bool isnull_jobtype, isnull_jsonb;

record_jobtype =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
if (isnull_jobtype)
elog(ERROR, "null job type returned");
record_jsonb =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &isnull_jsonb);
/* this jsonb looks like {"P0001": 32, "42883": 6} */
Jsonb *sqlerrs_jsonb = isnull_jsonb ? NULL : DatumGetJsonbP(record_jsonb);

if (sqlerrs_jsonb == NULL)
continue;
/* the jsonb object cannot be created in the SPI context or it will be lost */
spi_context = MemoryContextSwitchTo(old_context);
add_errors_by_sqlerrcode_internal(parse_state,
TextDatumGetCString(record_jobtype),
sqlerrs_jsonb);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_finish();

Assert(res == SPI_OK_FINISH);
}

static JsonbValue *
add_job_stats_internal(JsonbParseState *state, const char *job_type, TelemetryJobStats *stats)
{
JsonbValue key = {
.type = jbvString,
.val.string.val = pstrdup(job_type),
.val.string.len = strlen(job_type),
};
pushJsonbValue(&state, WJB_KEY, &key);
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);

ts_jsonb_add_int64(state, "total_runs", stats->total_runs);
ts_jsonb_add_int64(state, "total_successes", stats->total_successes);
ts_jsonb_add_int64(state, "total_failures", stats->total_failures);
ts_jsonb_add_int64(state, "total_crashes", stats->total_crashes);
ts_jsonb_add_int32(state, "max_consecutive_failures", stats->max_consecutive_failures);
ts_jsonb_add_int32(state, "max_consecutive_crashes", stats->max_consecutive_crashes);
ts_jsonb_add_interval(state, "total_duration", stats->total_duration);
ts_jsonb_add_interval(state, "total_duration_failures", stats->total_duration_failures);

return pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}

static void
add_job_stats_by_job_type(JsonbParseState *parse_state)
{
StringInfo command;
int res;
MemoryContext old_context = CurrentMemoryContext, spi_context;
SPITupleTable *tuptable = NULL;

const char *command_string =
"SELECT ("
" CASE "
" WHEN j.proc_schema OPERATOR(pg_catalog.=) \'_timescaledb_internal\' AND j.proc_name OPERATOR(pg_catalog.~) "
"\'^policy_(retention|compression|reorder|refresh_continuous_aggregate|telemetry|job_error_"
"retention)$\' "
" THEN CAST(j.proc_name AS pg_catalog.text) "
" ELSE \'user_defined_action\' "
" END"
") AS job_type, "
" CAST(pg_catalog.sum(total_runs) AS pg_catalog.int8) as total_runs, "
" CAST(pg_catalog.sum(total_successes) AS pg_catalog.int8) as total_successes, "
" CAST(pg_catalog.sum(total_failures) AS pg_catalog.int8) as total_failures, "
" CAST(pg_catalog.sum(total_crashes) AS pg_catalog.int8) as total_crashes, "
" pg_catalog.sum(total_duration) as total_duration, "
" pg_catalog.sum(total_duration_failures) as total_duration_failures, "
" pg_catalog.max(consecutive_failures) as max_consecutive_failures, "
" pg_catalog.max(consecutive_crashes) as max_consecutive_crashes "
"FROM "
" _timescaledb_internal.bgw_job_stat s "
" JOIN _timescaledb_config.bgw_job j on j.id OPERATOR(pg_catalog.=) s.job_id "
"GROUP BY "
"job_type";

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

command = makeStringInfo();

appendStringInfoString(command, command_string);
res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get job statistics by job type"))));
/* a row returned looks like this:
("policy_telemetry", 12, 10, 1, 1, 00:00:11, 1, 1)
*/
for (int i = 0; i < SPI_processed; i++)
{
tuptable = SPI_tuptable;
TupleDesc tupdesc = tuptable->tupdesc;
Datum jobtype_datum;
Datum total_runs, total_successes, total_failures, total_crashes;
Datum total_duration, total_duration_failures, max_consec_crashes, max_consec_fails;

bool isnull_jobtype, isnull_runs, isnull_successes, isnull_failures, isnull_crashes;
bool isnull_duration, isnull_duration_failures, isnull_consec_crashes, isnull_consec_fails;

jobtype_datum =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
if (isnull_jobtype)
elog(ERROR, "null job type returned");
total_runs = SPI_getbinval(tuptable->vals[i], tupdesc, 2, &isnull_runs);
total_successes = SPI_getbinval(tuptable->vals[i], tupdesc, 3, &isnull_successes);
total_failures = SPI_getbinval(tuptable->vals[i], tupdesc, 4, &isnull_failures);
total_crashes = SPI_getbinval(tuptable->vals[i], tupdesc, 5, &isnull_crashes);
total_duration = SPI_getbinval(tuptable->vals[i], tupdesc, 6, &isnull_duration);
total_duration_failures =
SPI_getbinval(tuptable->vals[i], tupdesc, 7, &isnull_duration_failures);
max_consec_fails = SPI_getbinval(tuptable->vals[i], tupdesc, 8, &isnull_consec_fails);
max_consec_crashes = SPI_getbinval(tuptable->vals[i], tupdesc, 9, &isnull_consec_crashes);

if (isnull_jobtype || isnull_runs || isnull_successes || isnull_failures ||
isnull_crashes || isnull_duration || isnull_consec_crashes || isnull_consec_fails)
{
elog(ERROR, "null record field returned");
}

spi_context = MemoryContextSwitchTo(old_context);
TelemetryJobStats stats = { .total_runs = DatumGetInt64(total_runs),
.total_successes = DatumGetInt64(total_successes),
.total_failures = DatumGetInt64(total_failures),
.total_crashes = DatumGetInt64(total_crashes),
.max_consecutive_failures = DatumGetInt32(max_consec_fails),
.max_consecutive_crashes = DatumGetInt32(max_consec_crashes),
.total_duration = DatumGetIntervalP(total_duration),
.total_duration_failures =
DatumGetIntervalP(total_duration_failures) };
add_job_stats_internal(parse_state, TextDatumGetCString(jobtype_datum), &stats);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_finish();
Assert(res == SPI_OK_FINISH);
}

static int64
get_database_size()
{
Expand Down Expand Up @@ -515,6 +769,26 @@ build_telemetry_report()
ts_jsonb_add_str(parse_state, REQ_BUILD_ARCHITECTURE, BUILD_PROCESSOR);
ts_jsonb_add_int32(parse_state, REQ_BUILD_ARCHITECTURE_BIT_SIZE, get_architecture_bit_size());
ts_jsonb_add_int64(parse_state, REQ_DATA_VOLUME, get_database_size());
/* add job execution stats */
key.type = jbvString;
key.val.string.val = REQ_NUM_ERR_BY_SQLERRCODE;
key.val.string.len = strlen(REQ_NUM_ERR_BY_SQLERRCODE);
pushJsonbValue(&parse_state, WJB_KEY, &key);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

add_errors_by_sqlerrcode(parse_state);

pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

key.type = jbvString;
key.val.string.val = REQ_JOB_STATS_BY_JOB_TYPE;
key.val.string.len = strlen(REQ_JOB_STATS_BY_JOB_TYPE);
pushJsonbValue(&parse_state, WJB_KEY, &key);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

add_job_stats_by_job_type(parse_state);

pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

/* Add relation stats */
ts_telemetry_stats_gather(&relstats);
Expand Down

0 comments on commit bffe462

Please sign in to comment.