Skip to content

Commit

Permalink
Add rest of the stats with SPI query
Browse files Browse the repository at this point in the history
  • Loading branch information
konskov committed Oct 17, 2022
1 parent 94496ae commit fc64761
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 35 deletions.
12 changes: 6 additions & 6 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,9 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
fd->consecutive_failures = 0;
fd->last_successful_finish = fd->last_finish;
fd->total_duration =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration),
IntervalPGetDatum(duration)));
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration),
IntervalPGetDatum(duration)));
/* Mark the next start at the end if the job itself hasn't */
if (!bgw_job_stat_next_start_was_set(fd))
fd->next_start = calculate_next_start_on_success(fd->last_finish, result_ctx->job);
Expand All @@ -356,9 +356,9 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
fd->total_failures++;
fd->consecutive_failures++;
fd->total_duration_failures =
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration_failures),
IntervalPGetDatum(duration)));
*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
IntervalPGetDatum(&fd->total_duration_failures),
IntervalPGetDatum(duration)));

/*
* Mark the next start at the end if the job itself hasn't (this may
Expand Down
11 changes: 11 additions & 0 deletions src/telemetry/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ typedef struct TelemetryStats
BaseStats views;
} TelemetryStats;

typedef struct TelemetryJobStats
{
int64 total_runs;
int64 total_successes;
int64 total_failures;
int64 total_crashes;
int32 max_consecutive_failures;
int32 max_consecutive_crashes;
Interval *total_duration;
} TelemetryJobStats;

extern void ts_telemetry_stats_gather(TelemetryStats *stats);

#endif /* TIMESCALEDB_TELEMETRY_STATS_H */
187 changes: 158 additions & 29 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ add_job_counts(JsonbParseState *state)
}

static JsonbValue *
add_errors_helper_func(JsonbParseState *parse_state, const char *job_type, Jsonb *sqlerrs_jsonb)
add_errors_by_sqlerrcode_internal(JsonbParseState *parse_state, const char *job_type,
Jsonb *sqlerrs_jsonb)
{
JsonbIterator *it;
JsonbIteratorToken type;
Expand All @@ -248,11 +249,11 @@ add_errors_helper_func(JsonbParseState *parse_state, const char *job_type, Jsonb
.type = jbvString,
.val.string.val = pstrdup(job_type),
.val.string.len = strlen(job_type),
};
};

ret = pushJsonbValue(&parse_state, WJB_KEY, &key);
ret = pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

// now need to iterate through the jsonb fields and add them
/* we don't expect nested values here */
it = JsonbIteratorInit(&sqlerrs_jsonb->root);
Expand All @@ -273,24 +274,25 @@ add_errors_helper_func(JsonbParseState *parse_state, const char *job_type, Jsonb
type = JsonbIteratorNext(&it, &val, true);
if (type != WJB_VALUE)
elog(ERROR, "unexpected jsonb type");
errcnt = DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(val.val.numeric)));
errcnt =
DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(val.val.numeric)));
// now that we have both key and value, push them
ts_jsonb_add_int64(parse_state, errcode, errcnt);
}
else
// we are not expecting anything else for this
// we are not expecting anything else for this
elog(ERROR, "unexpected jsonb type");
}
// close the jsonb that corresponds to this job_type
ret = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return ret;
}
/* this function queries the database through SPI and gets back a set of records
/* this function queries the database through SPI and gets back a set of records
that look like (job_type TEXT, jsonb_object_agg JSONB).
For example, (user_defined_action, {"P0001": 2, "42883": 5})
(we are expecting about 6 rows depending
on how we write the query and if we exclude any jobs)
Then for each returned row adds a new kv pair to the jsonb,
on how we write the query and if we exclude any jobs)
Then for each returned row adds a new kv pair to the jsonb,
which looks like "job_type": {"errtype1": errcnt1, ...} */
static void
add_errors_by_sqlerrcode(JsonbParseState *parse_state)
Expand All @@ -299,18 +301,20 @@ add_errors_by_sqlerrcode(JsonbParseState *parse_state)
StringInfo command;
MemoryContext old_context = CurrentMemoryContext, spi_context;

const char *command_string = "select job_type, jsonb_object_agg(sqlerrcode, count) FROM"
"("
"select ("
"case when error_data->> \'proc_schema\' = \'_timescaledb_internal\' then error_data->> \'proc_name\' "
"ELSE \'user_defined_action\'"
"end"
" ) as job_type, error_data->> \'sqlerrcode\' as sqlerrcode, count(*) "
"FROM _timescaledb_internal.job_errors "
"where error_data->> \'sqlerrcode\' IS NOT NULL "
"GROUP BY job_type, error_data->> \'sqlerrcode\' order by job_type"
") q "
"GROUP BY q.job_type";
const char *command_string =
"select job_type, jsonb_object_agg(sqlerrcode, count) FROM"
"("
"select ("
"case when error_data->> \'proc_schema\' = \'_timescaledb_internal\' then error_data->> "
"\'proc_name\' "
"ELSE \'user_defined_action\'"
"end"
" ) as job_type, error_data->> \'sqlerrcode\' as sqlerrcode, count(*) "
"FROM _timescaledb_internal.job_errors "
"where error_data->> \'sqlerrcode\' IS NOT NULL "
"GROUP BY job_type, error_data->> \'sqlerrcode\' order by job_type"
") q "
"GROUP BY q.job_type";

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");
Expand All @@ -321,37 +325,152 @@ add_errors_by_sqlerrcode(JsonbParseState *parse_state)
res = SPI_execute(command->data, true /*read only*/, 0 /* count */);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get errors by sqlerrcode and job type"))));
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get errors by sqlerrcode and job type"))));

// we expect about 6 rows returned, each row is a record (TEXT, JSONB)
// iterate over the rows and create a jsonb to be added to the parseState,
// iterate over the rows and create a jsonb to be added to the parseState,
for (int i = 0; i < SPI_processed; i++)
{
// jobtype is text
Datum record_jobtype, record_jsonb;
bool isnull_jobtype, isnull_jsonb;

record_jobtype = SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
record_jobtype =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
if (isnull_jobtype)
elog(ERROR, "null job type returned");
record_jsonb = SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &isnull_jsonb);
record_jsonb =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &isnull_jsonb);
/* this jsonb looks like {"P0001": 32, "42883": 6} */
Jsonb *sqlerrs_jsonb = isnull_jsonb ? NULL : DatumGetJsonbP(record_jsonb);

if (sqlerrs_jsonb == NULL)
continue;
/* the jsonb object cannot be created in the SPI context or it will be lost */
/* the jsonb object cannot be created in the SPI context or it will be lost */
spi_context = MemoryContextSwitchTo(old_context);
add_errors_helper_func(parse_state, TextDatumGetCString(record_jobtype), sqlerrs_jsonb);
add_errors_by_sqlerrcode_internal(parse_state,
TextDatumGetCString(record_jobtype),
sqlerrs_jsonb);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_finish();

Assert(res == SPI_OK_FINISH);
}

static JsonbValue *
add_job_stats_internal(JsonbParseState *state, const char *job_type, TelemetryJobStats *stats)
{
JsonbValue key = {
.type = jbvString,
.val.string.val = pstrdup(job_type),
.val.string.len = strlen(job_type),
};
pushJsonbValue(&state, WJB_KEY, &key);
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);

ts_jsonb_add_int64(state, "total_runs", stats->total_runs);
ts_jsonb_add_int64(state, "total_successes", stats->total_successes);
ts_jsonb_add_int64(state, "total_failures", stats->total_failures);
ts_jsonb_add_int64(state, "total_crashes", stats->total_crashes);
ts_jsonb_add_int32(state, "max_consecutive_failures", stats->max_consecutive_failures);
ts_jsonb_add_int32(state, "max_consecutive_crashes", stats->max_consecutive_crashes);
ts_jsonb_add_interval(state, "total_duration", stats->total_duration);

return pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}

static void
add_job_stats_by_job_type(JsonbParseState *parse_state)
{
StringInfo command;
int res;
MemoryContext old_context = CurrentMemoryContext, spi_context;
SPITupleTable *tuptable = NULL;

const char *command_string =
"SELECT ("
" case WHEN q.proc_schema = \'_timescaledb_internal\' then q.proc_name::text "
" ELSE \'user_defined_action\'::text "
" end"
") as job_type, sum(total_runs)::bigint as total_runs, sum(total_successes)::bigint as "
"total_successes,"
"sum(total_failures)::bigint as total_failures, sum(total_crashes)::bigint as "
"total_crashes,"
"sum(total_duration) as total_duration, max(consecutive_failures) as "
"max_consecutive_failures, max(consecutive_crashes) as max_consecutive_crashes "
"FROM "
"(select j.proc_schema, j.proc_name, s.total_runs, s.total_successes, s.total_failures,"
"s.total_crashes, s.total_duration, s.consecutive_crashes, s.consecutive_failures "
"FROM "
"_timescaledb_internal.bgw_job_stat s join _timescaledb_config.bgw_job j on j.id = "
"s.job_id) q "
"GROUP BY job_type";

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

command = makeStringInfo();

appendStringInfoString(command, command_string);
res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get job statistics by job type"))));
/* a row returned looks like this:
("policy_telemetry", 12, 10, 1, 1, 00:00:11, 1, 1)
*/
for (int i = 0; i < SPI_processed; i++)
{
tuptable = SPI_tuptable;
TupleDesc tupdesc = tuptable->tupdesc;
Datum jobtype_datum;
Datum total_runs_datum, total_successes_datum, total_failures_datum, total_crashes_datum;
Datum total_duration_datum, max_consec_crashes_datum, max_consec_fails_datum;

bool isnull_jobtype, isnull_runs, isnull_successes, isnull_failures, isnull_crashes;
bool isnull_duration, isnull_consec_crashes, isnull_consec_fails;

jobtype_datum =
SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
if (isnull_jobtype)
elog(ERROR, "null job type returned");
total_runs_datum = SPI_getbinval(tuptable->vals[i], tupdesc, 2, &isnull_runs);
total_successes_datum = SPI_getbinval(tuptable->vals[i], tupdesc, 3, &isnull_successes);
total_failures_datum = SPI_getbinval(tuptable->vals[i], tupdesc, 4, &isnull_failures);
total_crashes_datum = SPI_getbinval(tuptable->vals[i], tupdesc, 5, &isnull_crashes);
total_duration_datum = SPI_getbinval(tuptable->vals[i], tupdesc, 6, &isnull_duration);
max_consec_fails_datum = SPI_getbinval(tuptable->vals[i], tupdesc, 7, &isnull_consec_fails);
max_consec_crashes_datum =
SPI_getbinval(tuptable->vals[i], tupdesc, 8, &isnull_consec_crashes);

if (isnull_jobtype || isnull_runs || isnull_successes || isnull_failures ||
isnull_crashes || isnull_duration || isnull_consec_crashes || isnull_consec_fails)
{
elog(ERROR, "null record field returned");
}

TelemetryJobStats stats = {
.total_runs = DatumGetInt64(total_runs_datum),
.total_successes = DatumGetInt64(total_successes_datum),
.total_failures = DatumGetInt64(total_failures_datum),
.total_crashes = DatumGetInt64(total_crashes_datum),
.total_duration = DatumGetIntervalP(total_duration_datum),
.max_consecutive_failures = DatumGetInt32(max_consec_fails_datum),
.max_consecutive_crashes = DatumGetInt32(max_consec_crashes_datum),
};
spi_context = MemoryContextSwitchTo(old_context);
add_job_stats_internal(parse_state, TextDatumGetCString(jobtype_datum), &stats);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_finish();
Assert(res == SPI_OK_FINISH);
}

static int64
get_database_size()
{
Expand Down Expand Up @@ -643,7 +762,17 @@ build_telemetry_report()
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

add_errors_by_sqlerrcode(parse_state);


pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

key.type = jbvString;
key.val.string.val = REQ_JOB_STATS_BY_JOB_TYPE;
key.val.string.len = strlen(REQ_JOB_STATS_BY_JOB_TYPE);
pushJsonbValue(&parse_state, WJB_KEY, &key);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

add_job_stats_by_job_type(parse_state);

pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

/* Add relation stats */
Expand Down

0 comments on commit fc64761

Please sign in to comment.