Skip to content

Commit

Permalink
Enable run_job() for telemetry job
Browse files Browse the repository at this point in the history
Since telemetry job has a special code path to be able to be used both
from Apache code and from TSL code, trying to execute the telemetry job
with run_job() will fail.

This code will allow run_job() to be used with the telemetry job to
trigger a send of telemetry data. You have to belong to the group that
owns the telemetry job (or be the owner of the telemetry job) to be
able to use it.

Closes timescale#5605
  • Loading branch information
mkindahl committed Apr 26, 2023
1 parent d3730a4 commit 149ce69
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -28,6 +28,7 @@ accidentally triggering the load of a previous DB version.**
* #5583 Fix parameterization in DecompressChunk path generation
* #5602 Fix broken CAgg with JOIN repair function
* #5615 Add permission checks to run_job()
* #5614 Enable run_job() for telemetry job

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
Expand Down
62 changes: 44 additions & 18 deletions src/bgw/job.c
Expand Up @@ -48,13 +48,8 @@
#include "jsonb_utils.h"
#include "debug_assert.h"

#define TELEMETRY_INITIAL_NUM_RUNS 12

static scheduler_test_hook_type scheduler_test_hook = NULL;
static char *job_entrypoint_function_name = "ts_bgw_job_entrypoint";
#ifdef USE_TELEMETRY
static bool is_telemetry_job(BgwJob *job);
#endif

typedef enum JobLockLifetime
{
Expand Down Expand Up @@ -377,7 +372,7 @@ ts_bgw_job_get_scheduled(size_t alloc_size, MemoryContext mctx)

#ifdef USE_TELEMETRY
/* ignore telemetry jobs if telemetry is disabled */
if (!ts_telemetry_on() && is_telemetry_job(job))
if (!ts_telemetry_on() && ts_is_telemetry_job(job))
{
pfree(job);
continue;
Expand Down Expand Up @@ -1009,20 +1004,27 @@ ts_bgw_job_validate_job_owner(Oid owner)
ReleaseSysCache(role_tup);
}

#ifdef USE_TELEMETRY
static bool
is_telemetry_job(BgwJob *job)
/*
* Is the job the telemetry job?
*/
bool
ts_is_telemetry_job(BgwJob *job)
{
#ifdef USE_TELEMETRY
return namestrcmp(&job->fd.proc_schema, INTERNAL_SCHEMA_NAME) == 0 &&
namestrcmp(&job->fd.proc_name, "policy_telemetry") == 0;
}
#else
return false;
#endif
}

bool
ts_bgw_job_execute(BgwJob *job)
{
#ifdef USE_TELEMETRY
if (is_telemetry_job(job))
/* The telemetry job has a separate code path since we want to be able to
* use telemetry even if the TSL code is not installed. */
if (ts_is_telemetry_job(job))
{
/*
* In the first 12 hours, we want telemetry to ping every
Expand All @@ -1033,7 +1035,9 @@ ts_bgw_job_execute(BgwJob *job)
return ts_bgw_job_run_and_set_next_start(job,
ts_telemetry_main_wrapper,
TELEMETRY_INITIAL_NUM_RUNS,
&one_hour);
&one_hour,
/* atomic */ true,
/* mark */ false);
}
#endif

Expand Down Expand Up @@ -1300,22 +1304,42 @@ ts_bgw_job_set_job_entrypoint_function_name(char *func_name)
job_entrypoint_function_name = func_name;
}

/*
* Run job and set next start.
*
* job: Job to run and update
* func: Function to execute for the job
* initial_runs: Limit on the number of runs to do
* next_interval: Interval to use when computing next start
* atomic: Should be executed as a single transaction.
* mark: Mark the start and end of the function execution in job_stats
*/
bool
ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial_runs,
Interval *next_interval)
Interval *next_interval, bool atomic, bool mark)
{
BgwJobStat *job_stat;
bool ret = func();
bool had_error;

/* Now update next_start. */
StartTransactionCommand();
if (atomic)
StartTransactionCommand();

if (mark)
ts_bgw_job_stat_mark_start(job->fd.id);

had_error = func();

if (mark)
ts_bgw_job_stat_mark_end(job, had_error ? JOB_FAILURE : JOB_SUCCESS);

/* Now update next_start. */
job_stat = ts_bgw_job_stat_find(job->fd.id);

/*
* Note that setting next_start explicitly from this function will
* override any backoff calculation due to failure.
*/
Ensure(job_stat != NULL, "job status for job %d not found", job->fd.id);
if (job_stat->fd.total_runs < initial_runs)
{
TimestampTz next_start =
Expand All @@ -1325,9 +1349,11 @@ ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial

ts_bgw_job_stat_set_next_start(job->fd.id, next_start);
}
CommitTransactionCommand();

return ret;
if (atomic)
CommitTransactionCommand();

return had_error;
}

/* Insert a new job in the bgw_job relation */
Expand Down
11 changes: 9 additions & 2 deletions src/bgw/job.h
Expand Up @@ -13,6 +13,8 @@
#include "export.h"
#include "ts_catalog/catalog.h"

#define TELEMETRY_INITIAL_NUM_RUNS 12

typedef struct BgwJob
{
FormData_bgw_job fd;
Expand Down Expand Up @@ -57,9 +59,14 @@ extern TSDLLEXPORT void ts_bgw_job_run_config_check(Oid check, int32 job_id, Jso
extern TSDLLEXPORT Datum ts_bgw_job_entrypoint(PG_FUNCTION_ARGS);
extern void ts_bgw_job_set_scheduler_test_hook(scheduler_test_hook_type hook);
extern void ts_bgw_job_set_job_entrypoint_function_name(char *func_name);
extern bool ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial_runs,
Interval *next_interval);
extern TSDLLEXPORT bool ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func,
int64 initial_runs,
Interval *next_interval, bool atomic,
bool mark);
extern TSDLLEXPORT bool ts_job_errors_insert_tuple(const FormData_job_error *job_err);
extern TSDLLEXPORT void ts_bgw_job_validate_schedule_interval(Interval *schedule_interval);
extern TSDLLEXPORT char *ts_bgw_job_validate_timezone(Datum timezone);

extern TSDLLEXPORT bool ts_is_telemetry_job(BgwJob *job);

#endif /* BGW_JOB_H */
2 changes: 1 addition & 1 deletion src/telemetry/telemetry.h
Expand Up @@ -56,7 +56,7 @@ extern void ts_check_version_response(const char *json);
* Timescale via HTTPS.
*/
extern bool ts_telemetry_main(const char *host, const char *path, const char *service);
extern bool ts_telemetry_main_wrapper(void);
extern TSDLLEXPORT bool ts_telemetry_main_wrapper(void);
extern TSDLLEXPORT Datum ts_telemetry_get_report_jsonb(PG_FUNCTION_ARGS);

#endif /* TIMESCALEDB_TELEMETRY_TELEMETRY_H */
7 changes: 6 additions & 1 deletion test/src/bgw/scheduler_mock.c
Expand Up @@ -407,7 +407,12 @@ test_job_dispatcher(BgwJob *job)
{
/* Set next_start to 200ms */
Interval new_interval = { .time = .2 * USECS_PER_SEC };
return ts_bgw_job_run_and_set_next_start(job, test_job_4, 3, &new_interval);
return ts_bgw_job_run_and_set_next_start(job,
test_job_4,
3,
&new_interval,
/* atomic */ true,
/* mark */ false);
}
default:
return ts_cm_functions->job_execute(job);
Expand Down
30 changes: 30 additions & 0 deletions tsl/src/bgw_policy/job.c
Expand Up @@ -38,6 +38,9 @@
#include "continuous_aggs/materialize.h"
#include "continuous_aggs/refresh.h"
#include "ts_catalog/continuous_agg.h"
#ifdef USE_TELEMETRY
#include "telemetry/telemetry.h"
#endif

#include "tsl/src/chunk.h"

Expand Down Expand Up @@ -569,6 +572,12 @@ job_execute_procedure(FuncExpr *funcexpr)
ExecuteCallStmt(call, params, false, dest);
}

/*
* Execute the job.
*
* This function can be called both from a portal and from a background
* worker.
*/
bool
job_execute(BgwJob *job)
{
Expand Down Expand Up @@ -607,6 +616,27 @@ job_execute(BgwJob *job)
#endif
}

#ifdef USE_TELEMETRY
/* The telemetry job has a separate code path and since we can reach this
* code also when using run_job(), we have a special case here. This will
* not be triggered when executed from ts_bgw_job_execute(). */
if (ts_is_telemetry_job(job))
{
/*
* In the first 12 hours, we want telemetry to ping every
* hour. After that initial period, we default to the
* schedule_interval listed in the job table.
*/
Interval one_hour = { .time = 1 * USECS_PER_HOUR };
return ts_bgw_job_run_and_set_next_start(job,
ts_telemetry_main_wrapper,
TELEMETRY_INITIAL_NUM_RUNS,
&one_hour,
/* atomic */ false,
/* mark */ true);
}
#endif

object = makeNode(ObjectWithArgs);
object->objname = list_make2(makeString(NameStr(job->fd.proc_schema)),
makeString(NameStr(job->fd.proc_name)));
Expand Down
41 changes: 41 additions & 0 deletions tsl/test/expected/bgw_telemetry.out
@@ -0,0 +1,41 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
-- Check that we can use run_job() with the telemetry job, so first
-- locate the job id for it (should be 1, but who knows, and it is not
-- important for this test).
SELECT id AS job_id FROM _timescaledb_config.bgw_job
WHERE proc_schema = '_timescaledb_internal'
AND proc_name = 'policy_telemetry' \gset
-- It should be possible to run it twice and running it should change
-- the last_finish time. Since job_stats can be empty to start with,
-- we run it once first to populate job_stats.
CALL run_job(:job_id);
SELECT last_finish AS last_finish
FROM _timescaledb_internal.bgw_job_stat
WHERE job_id = :job_id \gset
SELECT pg_sleep(1);
pg_sleep
----------

(1 row)

CALL run_job(:job_id);
SELECT last_finish > :'last_finish' AS job_executed,
last_run_success
FROM _timescaledb_internal.bgw_job_stat
WHERE job_id = :job_id;
job_executed | last_run_success
--------------+------------------
t | t
(1 row)

-- Running it as the default user should fail since they do not own
-- the job. This should be the case also for the telemetry job, which
-- is a little special.
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
\set ON_ERROR_STOP 0
CALL run_job(:job_id);
ERROR: insufficient permissions to run job 1
\set ON_ERROR_STOP 1
4 changes: 4 additions & 0 deletions tsl/test/sql/CMakeLists.txt
Expand Up @@ -28,6 +28,10 @@ set(TEST_FILES
skip_scan.sql
size_utils_tsl.sql)

if(USE_TELEMETRY)
list(APPEND TEST_FILES bgw_telemetry.sql)
endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
list(
APPEND
Expand Down
38 changes: 38 additions & 0 deletions tsl/test/sql/bgw_telemetry.sql
@@ -0,0 +1,38 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

\c :TEST_DBNAME :ROLE_SUPERUSER

-- Check that we can use run_job() with the telemetry job, so first
-- locate the job id for it (should be 1, but who knows, and it is not
-- important for this test).
SELECT id AS job_id FROM _timescaledb_config.bgw_job
WHERE proc_schema = '_timescaledb_internal'
AND proc_name = 'policy_telemetry' \gset

-- It should be possible to run it twice and running it should change
-- the last_finish time. Since job_stats can be empty to start with,
-- we run it once first to populate job_stats.
CALL run_job(:job_id);

SELECT last_finish AS last_finish
FROM _timescaledb_internal.bgw_job_stat
WHERE job_id = :job_id \gset
SELECT pg_sleep(1);

CALL run_job(:job_id);

SELECT last_finish > :'last_finish' AS job_executed,
last_run_success
FROM _timescaledb_internal.bgw_job_stat
WHERE job_id = :job_id;

-- Running it as the default user should fail since they do not own
-- the job. This should be the case also for the telemetry job, which
-- is a little special.
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
\set ON_ERROR_STOP 0
CALL run_job(:job_id);
\set ON_ERROR_STOP 1

0 comments on commit 149ce69

Please sign in to comment.