Skip to content

Commit

Permalink
Perform jsonb fill-in in correct memory context
Browse files Browse the repository at this point in the history
  • Loading branch information
konskov committed Oct 17, 2022
1 parent 686fa60 commit 24e8c0f
Showing 1 changed file with 60 additions and 75 deletions.
135 changes: 60 additions & 75 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@
#include "cross_module_fn.h"

#include <executor/spi.h>
#include <lib/stringinfo.h>
#include <utils/builtins.h>
#include <utils/rel.h>
#include <utils/relcache.h>
#include <utils/date.h>
#include <utils/snapmgr.h>

#define TS_TELEMETRY_VERSION 2
#define TS_VERSION_JSON_FIELD "current_timescaledb_version"
Expand Down Expand Up @@ -244,31 +238,67 @@ add_job_counts(JsonbParseState *state)
}

static JsonbValue *
add_errors_helper(JsonbParseState *parse_state, const char *keyname, int64 nerrors)
add_errors_helper_func(JsonbParseState *parse_state, const char *job_type, Jsonb *sqlerrs_jsonb)
{
JsonbValue name = {
JsonbIterator *it;
JsonbIteratorToken type;
JsonbValue val;
JsonbValue *ret;
JsonbValue key = {
.type = jbvString,
.val.string.val = pstrdup(keyname),
.val.string.len = strlen(keyname),
};
pushJsonbValue(&parse_state, WJB_KEY, &name);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

ts_jsonb_add_int64(parse_state, keyname, nerrors);
.val.string.val = pstrdup(job_type),
.val.string.len = strlen(job_type),
};

return pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
ret = pushJsonbValue(&parse_state, WJB_KEY, &key);
ret = pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

// now need to iterate through the jsonb fields and add them
/* we don't expect nested values here */
it = JsonbIteratorInit(&sqlerrs_jsonb->root);
type = JsonbIteratorNext(&it, &val, true /*skip_nested*/);
if (type != WJB_BEGIN_OBJECT)
elog(ERROR, "invalid JSON format");
while ((type = JsonbIteratorNext(&it, &val, true)))
{
const char *errcode;
// JsonbValue errcode;
int64 errcnt;
if (type == WJB_END_OBJECT)
break;
else if (type == WJB_KEY)
{
errcode = pnstrdup(val.val.string.val, val.val.string.len);
// now get the value for this key
type = JsonbIteratorNext(&it, &val, true);
if (type != WJB_VALUE)
elog(ERROR, "unexpected jsonb type");
errcnt = DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(val.val.numeric)));
// now that we have both key and value, push them
ts_jsonb_add_int64(parse_state, errcode, errcnt);
}
else
// we are not expecting anything else for this
elog(ERROR, "unexpected jsonb type");
}
// close the jsonb that corresponds to this job_type
ret = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return ret;
}
/* this function queries the database through SPI and gets back a set of records
that look like (job_type TEXT, jsonb_object_agg JSONB). For example, (user_defined_action, {"P0001": 2, "42883": 5})
we are expecting about 6 rows depending
on how we write the query and if we exclude any jobs.
Then for each row adds a new kv pair to the jsonb,
which looks like (key: string, value: jsonb) */
that look like (job_type TEXT, jsonb_object_agg JSONB).
For example, (user_defined_action, {"P0001": 2, "42883": 5})
(we are expecting about 6 rows depending
on how we write the query and if we exclude any jobs)
Then for each returned row adds a new kv pair to the jsonb,
which looks like "job_type": {"errtype1": errcnt1, ...} */
static void
add_errors_by_sqlerrcode(JsonbParseState *parse_state)
{
int res;
StringInfo command;
MemoryContext old_context = CurrentMemoryContext, spi_context;

const char *command_string = "select job_type, jsonb_object_agg(sqlerrcode, count) FROM"
"("
"select ("
Expand All @@ -294,77 +324,32 @@ add_errors_by_sqlerrcode(JsonbParseState *parse_state)
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not get errors by sqlerrcode and job type"))));

elog(LOG, "spi call in %s returned %lu rows", __func__, SPI_processed); // numvals only in pg13++
/* start a new jsonb object, now what I add will be added to this object */


// prepare a jsonb in which to append all the returned rows
// we expect about 6 rows returned, each row is a record (TEXT, JSONB)
// iterate over the rows and create a jsonb to be added to the parseState,
for (int i = 0; i < SPI_processed; i++)
{
// jobtype is text
Datum record_jobtype, record_jsonb;
bool isnull_jobtype, isnull_jsonb;
JsonbIterator *it;
JsonbIteratorToken type;
JsonbValue val;

record_jobtype = SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull_jobtype);
if (isnull_jobtype)
elog(ERROR, "null job type returned");
record_jsonb = SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &isnull_jsonb);
// this jsonb looks like {"P0001": 32, "42883": 6}
/* this jsonb looks like {"P0001": 32, "42883": 6} */
Jsonb *sqlerrs_jsonb = isnull_jsonb ? NULL : DatumGetJsonbP(record_jsonb);

if (sqlerrs_jsonb == NULL)
continue;
// again start a new jsonb object
JsonbValue key = {
.type = jbvString,
.val.string.val = pstrdup(TextDatumGetCString(record_jobtype)),
.val.string.len = strlen(TextDatumGetCString(record_jobtype)),
};
elog(LOG, "job_type for iteration %d is %s", i, TextDatumGetCString(record_jobtype));
pushJsonbValue(&parse_state, WJB_KEY, &key);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL); // second one

// now need to iterate through the jsonb fields and add them
/* we don't expect nested values here */
it = JsonbIteratorInit(&sqlerrs_jsonb->root);
type = JsonbIteratorNext(&it, &val, true /*skip_nested*/);
if (type != WJB_BEGIN_OBJECT)
elog(ERROR, "invalid JSON format");
while ((type = JsonbIteratorNext(&it, &val, true)))
{
const char *errcode;
// JsonbValue errcode;
int64 errcnt;
if (type == WJB_END_OBJECT)
break;
else if (type == WJB_KEY)
{
errcode = pnstrdup(val.val.string.val, val.val.string.len);
// now get the value for this key
type = JsonbIteratorNext(&it, &val, true);
if (type != WJB_VALUE)
elog(ERROR, "unexpected jsonb type");
errcnt = DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(val.val.numeric)));
// now that we have both key and value, push them
add_errors_helper(parse_state, errcode, errcnt);
}
else
// we are not expecting anything else for this
elog(ERROR, "unexpected jsonb type");
}
// close the jsonb that corresponds to this job_type
pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
continue;
/* the jsonb object cannot be created in the SPI context or it will be lost */
spi_context = MemoryContextSwitchTo(old_context);
add_errors_helper_func(parse_state, TextDatumGetCString(record_jobtype), sqlerrs_jsonb);
old_context = MemoryContextSwitchTo(spi_context);
}

res = SPI_finish();

Assert(res == SPI_OK_FINISH);

}

static int64
Expand Down Expand Up @@ -652,13 +637,13 @@ build_telemetry_report()
ts_jsonb_add_int64(parse_state, REQ_DATA_VOLUME, get_database_size());
/* add job execution stats */
key.type = jbvString;
key.val.string.val = pstrdup("errors_by_sqlerrcode");
key.val.string.len = strlen("errors_by_sqlerrcode");
key.val.string.val = REQ_NUM_ERR_BY_SQLERRCODE;
key.val.string.len = strlen(REQ_NUM_ERR_BY_SQLERRCODE);
pushJsonbValue(&parse_state, WJB_KEY, &key);
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

add_errors_by_sqlerrcode(parse_state);

pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

/* Add relation stats */
Expand Down

0 comments on commit 24e8c0f

Please sign in to comment.