From c32dba351972497273737ab4f1c90d9cfb6ed11a Mon Sep 17 00:00:00 2001 From: Mats Kindahl Date: Tue, 25 Apr 2023 14:28:10 +0200 Subject: [PATCH] Enable run_job() for telemetry job Since telemetry job has a special code path to be able to be used both from Apache code and from TSL code, trying to execute the telemetry job with run_job() will fail. This code will allow run_job() to be used with the telemetry job to trigger a send of telemetry data. You have to belong to the group that owns the telemetry job (or be the owner of the telemetry job) to be able to use it. Closes #5605 --- CHANGELOG.md | 1 + src/bgw/job.c | 56 ++++++++++++++++++++--------- src/bgw/job.h | 11 ++++-- src/telemetry/telemetry.h | 2 +- test/src/bgw/scheduler_mock.c | 7 +++- tsl/src/bgw_policy/job.c | 30 ++++++++++++++++ tsl/test/expected/bgw_telemetry.out | 41 +++++++++++++++++++++ tsl/test/sql/CMakeLists.txt | 4 +++ tsl/test/sql/bgw_telemetry.sql | 38 ++++++++++++++++++++ 9 files changed, 170 insertions(+), 20 deletions(-) create mode 100644 tsl/test/expected/bgw_telemetry.out create mode 100644 tsl/test/sql/bgw_telemetry.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index c7bdc3a82a0..b8d7cb0c383 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ accidentally triggering the load of a previous DB version.** * #5583 Fix parameterization in DecompressChunk path generation * #5602 Fix broken CAgg with JOIN repair function * #5615 Add permission checks to run_job() +* #5614 Enable run_job() for telemetry job **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates diff --git a/src/bgw/job.c b/src/bgw/job.c index 3de45de0981..9de395b898c 100644 --- a/src/bgw/job.c +++ b/src/bgw/job.c @@ -48,13 +48,8 @@ #include "jsonb_utils.h" #include "debug_assert.h" -#define TELEMETRY_INITIAL_NUM_RUNS 12 - static scheduler_test_hook_type scheduler_test_hook = NULL; static char *job_entrypoint_function_name = "ts_bgw_job_entrypoint"; -#ifdef USE_TELEMETRY -static bool is_telemetry_job(BgwJob *job); -#endif typedef enum JobLockLifetime { @@ -377,7 +372,7 @@ ts_bgw_job_get_scheduled(size_t alloc_size, MemoryContext mctx) #ifdef USE_TELEMETRY /* ignore telemetry jobs if telemetry is disabled */ - if (!ts_telemetry_on() && is_telemetry_job(job)) + if (!ts_telemetry_on() && ts_is_telemetry_job(job)) { pfree(job); continue; @@ -1009,9 +1004,12 @@ ts_bgw_job_validate_job_owner(Oid owner) ReleaseSysCache(role_tup); } +/* + * Is the job the telemetry job? + */ #ifdef USE_TELEMETRY -static bool -is_telemetry_job(BgwJob *job) +bool +ts_is_telemetry_job(BgwJob *job) { return namestrcmp(&job->fd.proc_schema, INTERNAL_SCHEMA_NAME) == 0 && namestrcmp(&job->fd.proc_name, "policy_telemetry") == 0; @@ -1022,7 +1020,9 @@ bool ts_bgw_job_execute(BgwJob *job) { #ifdef USE_TELEMETRY - if (is_telemetry_job(job)) + /* The telemetry job has a separate code path since we want to be able to + * use telemetry even if the TSL code is not installed. */ + if (ts_is_telemetry_job(job)) { /* * In the first 12 hours, we want telemetry to ping every @@ -1033,7 +1033,9 @@ ts_bgw_job_execute(BgwJob *job) return ts_bgw_job_run_and_set_next_start(job, ts_telemetry_main_wrapper, TELEMETRY_INITIAL_NUM_RUNS, - &one_hour); + &one_hour, + /* atomic */ true, + /* mark */ false); } #endif @@ -1300,22 +1302,42 @@ ts_bgw_job_set_job_entrypoint_function_name(char *func_name) job_entrypoint_function_name = func_name; } +/* + * Run job and set next start. + * + * job: Job to run and update + * func: Function to execute for the job + * initial_runs: Limit on the number of runs to do + * next_interval: Interval to use when computing next start + * atomic: Should be executed as a single transaction. + * mark: Mark the start and end of the function execution in job_stats + */ bool ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial_runs, - Interval *next_interval) + Interval *next_interval, bool atomic, bool mark) { BgwJobStat *job_stat; - bool ret = func(); + bool had_error; - /* Now update next_start. */ - StartTransactionCommand(); + if (atomic) + StartTransactionCommand(); + + if (mark) + ts_bgw_job_stat_mark_start(job->fd.id); + + had_error = func(); + if (mark) + ts_bgw_job_stat_mark_end(job, had_error ? JOB_FAILURE : JOB_SUCCESS); + + /* Now update next_start. */ job_stat = ts_bgw_job_stat_find(job->fd.id); /* * Note that setting next_start explicitly from this function will * override any backoff calculation due to failure. */ + Ensure(job_stat != NULL, "job status for job %d not found", job->fd.id); if (job_stat->fd.total_runs < initial_runs) { TimestampTz next_start = @@ -1325,9 +1347,11 @@ ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial ts_bgw_job_stat_set_next_start(job->fd.id, next_start); } - CommitTransactionCommand(); - return ret; + if (atomic) + CommitTransactionCommand(); + + return had_error; } /* Insert a new job in the bgw_job relation */ diff --git a/src/bgw/job.h b/src/bgw/job.h index cf7721a512a..50bbd0d8e83 100644 --- a/src/bgw/job.h +++ b/src/bgw/job.h @@ -13,6 +13,8 @@ #include "export.h" #include "ts_catalog/catalog.h" +#define TELEMETRY_INITIAL_NUM_RUNS 12 + typedef struct BgwJob { FormData_bgw_job fd; @@ -57,9 +59,14 @@ extern TSDLLEXPORT void ts_bgw_job_run_config_check(Oid check, int32 job_id, Jso extern TSDLLEXPORT Datum ts_bgw_job_entrypoint(PG_FUNCTION_ARGS); extern void ts_bgw_job_set_scheduler_test_hook(scheduler_test_hook_type hook); extern void ts_bgw_job_set_job_entrypoint_function_name(char *func_name); -extern bool ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial_runs, - Interval *next_interval); +extern TSDLLEXPORT bool ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, + int64 initial_runs, + Interval *next_interval, bool atomic, + bool mark); extern TSDLLEXPORT bool ts_job_errors_insert_tuple(const FormData_job_error *job_err); extern TSDLLEXPORT void ts_bgw_job_validate_schedule_interval(Interval *schedule_interval); extern TSDLLEXPORT char *ts_bgw_job_validate_timezone(Datum timezone); + +extern TSDLLEXPORT bool ts_is_telemetry_job(BgwJob *job); + #endif /* BGW_JOB_H */ diff --git a/src/telemetry/telemetry.h b/src/telemetry/telemetry.h index 3a960b20f1c..e2475751045 100644 --- a/src/telemetry/telemetry.h +++ b/src/telemetry/telemetry.h @@ -56,7 +56,7 @@ extern void ts_check_version_response(const char *json); * Timescale via HTTPS. */ extern bool ts_telemetry_main(const char *host, const char *path, const char *service); -extern bool ts_telemetry_main_wrapper(void); +extern TSDLLEXPORT bool ts_telemetry_main_wrapper(void); extern TSDLLEXPORT Datum ts_telemetry_get_report_jsonb(PG_FUNCTION_ARGS); #endif /* TIMESCALEDB_TELEMETRY_TELEMETRY_H */ diff --git a/test/src/bgw/scheduler_mock.c b/test/src/bgw/scheduler_mock.c index 620f6fe5e4a..5263e6b9d7e 100644 --- a/test/src/bgw/scheduler_mock.c +++ b/test/src/bgw/scheduler_mock.c @@ -407,7 +407,12 @@ test_job_dispatcher(BgwJob *job) { /* Set next_start to 200ms */ Interval new_interval = { .time = .2 * USECS_PER_SEC }; - return ts_bgw_job_run_and_set_next_start(job, test_job_4, 3, &new_interval); + return ts_bgw_job_run_and_set_next_start(job, + test_job_4, + 3, + &new_interval, + /* atomic */ true, + /* mark */ false); } default: return ts_cm_functions->job_execute(job); diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c index 77d481c4979..87300d202b7 100644 --- a/tsl/src/bgw_policy/job.c +++ b/tsl/src/bgw_policy/job.c @@ -38,6 +38,9 @@ #include "continuous_aggs/materialize.h" #include "continuous_aggs/refresh.h" #include "ts_catalog/continuous_agg.h" +#ifdef USE_TELEMETRY +#include "telemetry/telemetry.h" +#endif #include "tsl/src/chunk.h" @@ -569,6 +572,12 @@ job_execute_procedure(FuncExpr *funcexpr) ExecuteCallStmt(call, params, false, dest); } +/* + * Execute the job. + * + * This function can be called both from a portal and from a background + * worker. + */ bool job_execute(BgwJob *job) { @@ -607,6 +616,27 @@ job_execute(BgwJob *job) #endif } +#ifdef USE_TELEMETRY + /* The telemetry job has a separate code path and since we can reach this + * code also when using run_job(), we have a special case here. This will + * not be triggered when executed from ts_bgw_job_execute(). */ + if (ts_is_telemetry_job(job)) + { + /* + * In the first 12 hours, we want telemetry to ping every + * hour. After that initial period, we default to the + * schedule_interval listed in the job table. + */ + Interval one_hour = { .time = 1 * USECS_PER_HOUR }; + return ts_bgw_job_run_and_set_next_start(job, + ts_telemetry_main_wrapper, + TELEMETRY_INITIAL_NUM_RUNS, + &one_hour, + /* atomic */ false, + /* mark */ true); + } +#endif + object = makeNode(ObjectWithArgs); object->objname = list_make2(makeString(NameStr(job->fd.proc_schema)), makeString(NameStr(job->fd.proc_name))); diff --git a/tsl/test/expected/bgw_telemetry.out b/tsl/test/expected/bgw_telemetry.out new file mode 100644 index 00000000000..06f3b15d861 --- /dev/null +++ b/tsl/test/expected/bgw_telemetry.out @@ -0,0 +1,41 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +-- Check that we can use run_job() with the telemetry job, so first +-- locate the job id for it (should be 1, but who knows, and it is not +-- important for this test). +SELECT id AS job_id FROM _timescaledb_config.bgw_job + WHERE proc_schema = '_timescaledb_internal' + AND proc_name = 'policy_telemetry' \gset +-- It should be possible to run it twice and running it should change +-- the last_finish time. Since job_stats can be empty to start with, +-- we run it once first to populate job_stats. +CALL run_job(:job_id); +SELECT last_finish AS last_finish + FROM _timescaledb_internal.bgw_job_stat + WHERE job_id = :job_id \gset +SELECT pg_sleep(1); + pg_sleep +---------- + +(1 row) + +CALL run_job(:job_id); +SELECT last_finish > :'last_finish' AS job_executed, + last_run_success + FROM _timescaledb_internal.bgw_job_stat + WHERE job_id = :job_id; + job_executed | last_run_success +--------------+------------------ + t | t +(1 row) + +-- Running it as the default user should fail since they do not own +-- the job. This should be the case also for the telemetry job, which +-- is a little special. +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +\set ON_ERROR_STOP 0 +CALL run_job(:job_id); +ERROR: insufficient permissions to run job 1 +\set ON_ERROR_STOP 1 diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 89a4416538a..5a524fca0ce 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -28,6 +28,10 @@ set(TEST_FILES skip_scan.sql size_utils_tsl.sql) +if(USE_TELEMETRY) + list(APPEND TEST_FILES bgw_telemetry.sql) +endif() + if(CMAKE_BUILD_TYPE MATCHES Debug) list( APPEND diff --git a/tsl/test/sql/bgw_telemetry.sql b/tsl/test/sql/bgw_telemetry.sql new file mode 100644 index 00000000000..e6e36efdb07 --- /dev/null +++ b/tsl/test/sql/bgw_telemetry.sql @@ -0,0 +1,38 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\c :TEST_DBNAME :ROLE_SUPERUSER + +-- Check that we can use run_job() with the telemetry job, so first +-- locate the job id for it (should be 1, but who knows, and it is not +-- important for this test). +SELECT id AS job_id FROM _timescaledb_config.bgw_job + WHERE proc_schema = '_timescaledb_internal' + AND proc_name = 'policy_telemetry' \gset + +-- It should be possible to run it twice and running it should change +-- the last_finish time. Since job_stats can be empty to start with, +-- we run it once first to populate job_stats. +CALL run_job(:job_id); + +SELECT last_finish AS last_finish + FROM _timescaledb_internal.bgw_job_stat + WHERE job_id = :job_id \gset +SELECT pg_sleep(1); + +CALL run_job(:job_id); + +SELECT last_finish > :'last_finish' AS job_executed, + last_run_success + FROM _timescaledb_internal.bgw_job_stat + WHERE job_id = :job_id; + +-- Running it as the default user should fail since they do not own +-- the job. This should be the case also for the telemetry job, which +-- is a little special. +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +\set ON_ERROR_STOP 0 +CALL run_job(:job_id); +\set ON_ERROR_STOP 1 +