Skip to content

Commit

Permalink
Set the search_path instead
Browse files Browse the repository at this point in the history
  • Loading branch information
konskov committed Nov 3, 2022
1 parent 1777de1 commit 8da38a9
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ add_errors_by_sqlerrcode_internal(JsonbParseState *parse_state, const char *job_
ret = pushJsonbValue(&parse_state, WJB_KEY, &key);
ret = pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

// now need to iterate through the jsonb fields and add them
/* we don't expect nested values here */
it = JsonbIteratorInit(&sqlerrs_jsonb->root);
type = JsonbIteratorNext(&it, &val, true /*skip_nested*/);
Expand All @@ -302,20 +301,18 @@ add_errors_by_sqlerrcode_internal(JsonbParseState *parse_state, const char *job_
else if (type == WJB_KEY)
{
errcode = pnstrdup(val.val.string.val, val.val.string.len);
// now get the value for this key
/* get the corresponding value for this key */
type = JsonbIteratorNext(&it, &val, true);
if (type != WJB_VALUE)
elog(ERROR, "unexpected jsonb type");
errcnt =
DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(val.val.numeric)));
// now that we have both key and value, push them
ts_jsonb_add_int64(parse_state, errcode, errcnt);
}
else
// we are not expecting anything else for this
elog(ERROR, "unexpected jsonb type");
}
// close the jsonb that corresponds to this job_type

ret = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return ret;
}
Expand All @@ -335,26 +332,24 @@ add_errors_by_sqlerrcode(JsonbParseState *parse_state)

const char *command_string =
"SELECT "
"job_type, pg_catalog.jsonb_object_agg(sqlerrcode, count) "
"job_type, jsonb_object_agg(sqlerrcode, count) "
"FROM"
"("
" SELECT ("
" CASE "
" WHEN error_data OPERATOR(pg_catalog.->>) \'proc_schema\' "
"OPERATOR(pg_catalog.=) "
"\'_timescaledb_internal\' AND error_data OPERATOR(pg_catalog.->>) \'proc_name\' "
"OPERATOR(pg_catalog.~) "
" WHEN error_data ->> \'proc_schema\' = \'_timescaledb_internal\'"
" AND error_data ->> \'proc_name\' ~ "
"\'^policy_(retention|compression|reorder|refresh_continuous_"
"aggregate|telemetry|job_error_retention)$\' "
" THEN error_data OPERATOR(pg_catalog.->>) \'proc_name\' "
" THEN error_data ->> \'proc_name\' "
" ELSE \'user_defined_action\'"
" END"
" ) as job_type, "
" error_data OPERATOR(pg_catalog.->>) \'sqlerrcode\' as sqlerrcode, "
" error_data ->> \'sqlerrcode\' as sqlerrcode, "
" pg_catalog.COUNT(*) "
" FROM "
" _timescaledb_internal.job_errors "
" WHERE error_data OPERATOR(pg_catalog.->>) \'sqlerrcode\' IS NOT NULL "
" WHERE error_data ->> \'sqlerrcode\' IS NOT NULL "
" GROUP BY job_type, error_data->> \'sqlerrcode\' "
" ORDER BY job_type"
") q "
Expand All @@ -363,6 +358,11 @@ add_errors_by_sqlerrcode(JsonbParseState *parse_state)
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

/* SPI calls must be qualified otherwise they are unsafe */
res = SPI_exec("SET search_path TO pg_catalog, pg_temp", 0);
if (res < 0)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), (errmsg("could not set search_path"))));

command = makeStringInfo();

appendStringInfoString(command, command_string);
Expand All @@ -375,7 +375,6 @@ add_errors_by_sqlerrcode(JsonbParseState *parse_state)
/* we expect about 6 rows returned, each row is a record (TEXT, JSONB) */
for (uint64 i = 0; i < SPI_processed; i++)
{
// jobtype is text
Datum record_jobtype, record_jsonb;
bool isnull_jobtype, isnull_jsonb;

Expand All @@ -398,6 +397,7 @@ add_errors_by_sqlerrcode(JsonbParseState *parse_state)
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_exec("RESET search_path", 0);
res = SPI_finish();

Assert(res == SPI_OK_FINISH);
Expand Down Expand Up @@ -437,31 +437,35 @@ add_job_stats_by_job_type(JsonbParseState *parse_state)
const char *command_string =
"SELECT ("
" CASE "
" WHEN j.proc_schema OPERATOR(pg_catalog.=) \'_timescaledb_internal\' AND "
"j.proc_name OPERATOR(pg_catalog.~) "
" WHEN j.proc_schema = \'_timescaledb_internal\' AND j.proc_name ~ "
"\'^policy_(retention|compression|reorder|refresh_continuous_aggregate|telemetry|job_error_"
"retention)$\' "
" THEN CAST(j.proc_name AS pg_catalog.text) "
" THEN j.proc_name::TEXT "
" ELSE \'user_defined_action\' "
" END"
") AS job_type, "
" CAST(pg_catalog.sum(total_runs) AS pg_catalog.int8) as total_runs, "
" CAST(pg_catalog.sum(total_successes) AS pg_catalog.int8) as total_successes, "
" CAST(pg_catalog.sum(total_failures) AS pg_catalog.int8) as total_failures, "
" CAST(pg_catalog.sum(total_crashes) AS pg_catalog.int8) as total_crashes, "
" pg_catalog.sum(total_duration) as total_duration, "
" pg_catalog.sum(total_duration_failures) as total_duration_failures, "
" pg_catalog.max(consecutive_failures) as max_consecutive_failures, "
" pg_catalog.max(consecutive_crashes) as max_consecutive_crashes "
" SUM(total_runs)::BIGINT AS total_runs, "
" SUM(total_successes)::BIGINT AS total_successes, "
" SUM(total_failures)::BIGINT AS total_failures, "
" SUM(total_crashes)::BIGINT AS total_crashes, "
" SUM(total_duration) AS total_duration, "
" SUM(total_duration_failures) AS total_duration_failures, "
" MAX(consecutive_failures) AS max_consecutive_failures, "
" MAX(consecutive_crashes) AS max_consecutive_crashes "
"FROM "
" _timescaledb_internal.bgw_job_stat s "
" JOIN _timescaledb_config.bgw_job j on j.id OPERATOR(pg_catalog.=) s.job_id "
" JOIN _timescaledb_config.bgw_job j on j.id = s.job_id "
"GROUP BY "
"job_type";

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

/* SPI calls must be qualified otherwise they are unsafe */
res = SPI_exec("SET search_path TO pg_catalog, pg_temp", 0);
if (res < 0)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), (errmsg("could not set search_path"))));

command = makeStringInfo();

appendStringInfoString(command, command_string);
Expand Down Expand Up @@ -520,7 +524,7 @@ add_job_stats_by_job_type(JsonbParseState *parse_state)
add_job_stats_internal(parse_state, TextDatumGetCString(jobtype_datum), &stats);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_exec("RESET search_path", 0);
res = SPI_finish();
Assert(res == SPI_OK_FINISH);
}
Expand Down

0 comments on commit 8da38a9

Please sign in to comment.