From a63d7c09a49e8d83690bb848dea8112d388dc70e Mon Sep 17 00:00:00 2001 From: Lakshmi Narayanan Sreethar Date: Thu, 13 Apr 2023 20:20:34 +0530 Subject: [PATCH 1/2] Update bgw_custom testcase Added a few cleanup steps and updated a test logic to make the testcase runs more stable. --- tsl/test/expected/bgw_custom.out | 67 +++++++++++++++++++++++++------- tsl/test/sql/bgw_custom.sql | 48 ++++++++++++++++++----- 2 files changed, 91 insertions(+), 24 deletions(-) diff --git a/tsl/test/expected/bgw_custom.out b/tsl/test/expected/bgw_custom.out index cb0258490e9..75327b72266 100644 --- a/tsl/test/expected/bgw_custom.out +++ b/tsl/test/expected/bgw_custom.out @@ -242,12 +242,7 @@ SELECT delete_job(1000); \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER -- background workers are disabled, so the job will not run -- SELECT add_job( proc=>'custom_func', - schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05'); - add_job ---------- - 1005 -(1 row) - + schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05') AS job_id_1 \gset SELECT job_id, next_start, scheduled, schedule_interval FROM timescaledb_information.jobs WHERE job_id > 1000; job_id | next_start | scheduled | schedule_interval @@ -272,6 +267,12 @@ total_successes | 0 total_failures | 0 \x +SELECT delete_job(:job_id_1); + delete_job +------------ + +(1 row) + -- tests for #3545 CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS $BODY$ @@ -495,6 +496,13 @@ order by id; 3 | _hyper_1_3_chunk | 1 (3 rows) +-- Drop the compression job +SELECT delete_job(:job_id_4); + delete_job +------------ + +(1 row) + -- Decompress chunks before create the cagg SELECT decompress_chunk(c) FROM show_chunks('conditions') c; decompress_chunk @@ -571,6 +579,9 @@ FROM _timescaledb_config.bgw_job WHERE id = :job_id_5; ----+-----------+--------------- (0 rows) +-- Cleanup +DROP TABLE conditions; +DROP TABLE custom_log; -- Stop Background Workers SELECT _timescaledb_internal.stop_background_workers(); stop_background_workers @@ -861,8 +872,6 @@ BEGIN RETURN j1 || j2; END $$ LANGUAGE PLPGSQL; -create table jsonb_values (j jsonb, i int); -insert into jsonb_values values ('{"refresh_after":"2 weeks"}', 1), ('{"compress_after":"2 weeks"}', 2), ('{"drop_after":"2 weeks"}', 3); CREATE AGGREGATE sum_jsb (jsonb) ( sfunc = jsonb_add, @@ -872,6 +881,10 @@ CREATE AGGREGATE sum_jsb (jsonb) -- for test coverage, check unsupported aggregate type select add_job('test_proc_with_check', '5 secs', config => '{}', check_config => 'sum_jsb'::regproc); ERROR: unsupported function type +-- Cleanup jobs +TRUNCATE _timescaledb_config.bgw_job CASCADE; +NOTICE: truncate cascades to table "bgw_job_stat" +NOTICE: truncate cascades to table "bgw_policy_chunk_stats" -- github issue 4610 CREATE TABLE sensor_data ( @@ -919,18 +932,42 @@ INSERT INTO sensor_data generate_series(1, 30, 1 ) AS g2(sensor_id) ORDER BY time; +-- get the name of a new uncompressed chunk +SELECT chunk_name AS new_uncompressed_chunk_name + FROM timescaledb_information.chunks + WHERE hypertable_name = 'sensor_data' AND NOT is_compressed LIMIT 1 \gset -- change compression status so that this chunk is skipped when policy is run -update _timescaledb_catalog.chunk set status=3 where table_name = '_hyper_4_17_chunk'; +update _timescaledb_catalog.chunk set status=3 where table_name = :'new_uncompressed_chunk_name'; +-- verify that there are other uncompressed new chunks that need to be compressed +SELECT count(*) > 1 + FROM timescaledb_information.chunks + WHERE hypertable_name = 'sensor_data' AND NOT is_compressed; + ?column? +---------- + t +(1 row) + +-- disable notice/warning as the new_uncompressed_chunk_name +-- is dynamic and it will be printed in those messages. +SET client_min_messages TO ERROR; CALL run_job(:compressjob_id); -NOTICE: chunk "_hyper_4_17_chunk" is not compressed -WARNING: compressing chunk "_timescaledb_internal._hyper_4_17_chunk" failed when compression policy is executed --- check compression status is not changed -SELECT status FROM _timescaledb_catalog.chunk where table_name = '_hyper_4_17_chunk'; +SET client_min_messages TO NOTICE; +-- check compression status is not changed for the chunk whose status was manually updated +SELECT status FROM _timescaledb_catalog.chunk where table_name = :'new_uncompressed_chunk_name'; status -------- 3 (1 row) +-- confirm all the other new chunks are now compressed despite +-- facing an error when trying to compress :'new_uncompressed_chunk_name' +SELECT count(*) = 0 + FROM timescaledb_information.chunks + WHERE hypertable_name = 'sensor_data' AND NOT is_compressed; + ?column? +---------- + t +(1 row) + -- cleanup -DROP TABLE sensor_data CASCADE; -NOTICE: drop cascades to 8 other objects +DROP TABLE sensor_data; diff --git a/tsl/test/sql/bgw_custom.sql b/tsl/test/sql/bgw_custom.sql index 078be67e09d..f5f22864e96 100644 --- a/tsl/test/sql/bgw_custom.sql +++ b/tsl/test/sql/bgw_custom.sql @@ -118,7 +118,7 @@ SELECT delete_job(1000); \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER -- background workers are disabled, so the job will not run -- SELECT add_job( proc=>'custom_func', - schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05'); + schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05') AS job_id_1 \gset SELECT job_id, next_start, scheduled, schedule_interval FROM timescaledb_information.jobs WHERE job_id > 1000; @@ -126,6 +126,8 @@ FROM timescaledb_information.jobs WHERE job_id > 1000; SELECT * FROM timescaledb_information.job_stats WHERE job_id > 1000; \x +SELECT delete_job(:job_id_1); + -- tests for #3545 CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS $BODY$ @@ -275,6 +277,8 @@ where hypertable_id = (select id from _timescaledb_catalog.hypertable where table_name = 'conditions') order by id; +-- Drop the compression job +SELECT delete_job(:job_id_4); -- Decompress chunks before create the cagg SELECT decompress_chunk(c) FROM show_chunks('conditions') c; @@ -319,6 +323,10 @@ DROP MATERIALIZED VIEW conditions_summary_daily; SELECT id, proc_name, hypertable_id FROM _timescaledb_config.bgw_job WHERE id = :job_id_5; +-- Cleanup +DROP TABLE conditions; +DROP TABLE custom_log; + -- Stop Background Workers SELECT _timescaledb_internal.stop_background_workers(); @@ -532,9 +540,6 @@ BEGIN END $$ LANGUAGE PLPGSQL; -create table jsonb_values (j jsonb, i int); -insert into jsonb_values values ('{"refresh_after":"2 weeks"}', 1), ('{"compress_after":"2 weeks"}', 2), ('{"drop_after":"2 weeks"}', 3); - CREATE AGGREGATE sum_jsb (jsonb) ( sfunc = jsonb_add, @@ -545,6 +550,9 @@ CREATE AGGREGATE sum_jsb (jsonb) -- for test coverage, check unsupported aggregate type select add_job('test_proc_with_check', '5 secs', config => '{}', check_config => 'sum_jsb'::regproc); +-- Cleanup jobs +TRUNCATE _timescaledb_config.bgw_job CASCADE; + -- github issue 4610 CREATE TABLE sensor_data ( @@ -576,7 +584,6 @@ SELECT add_compression_policy('sensor_data', INTERVAL '1' minute) AS compressjob SELECT alter_job(id,config:=jsonb_set(config,'{recompress}', 'true')) FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; -- create new chunks - INSERT INTO sensor_data SELECT time + (INTERVAL '1 minute' * random()) AS time, @@ -588,11 +595,34 @@ INSERT INTO sensor_data generate_series(1, 30, 1 ) AS g2(sensor_id) ORDER BY time; + +-- get the name of a new uncompressed chunk +SELECT chunk_name AS new_uncompressed_chunk_name + FROM timescaledb_information.chunks + WHERE hypertable_name = 'sensor_data' AND NOT is_compressed LIMIT 1 \gset + -- change compression status so that this chunk is skipped when policy is run -update _timescaledb_catalog.chunk set status=3 where table_name = '_hyper_4_17_chunk'; +update _timescaledb_catalog.chunk set status=3 where table_name = :'new_uncompressed_chunk_name'; + +-- verify that there are other uncompressed new chunks that need to be compressed +SELECT count(*) > 1 + FROM timescaledb_information.chunks + WHERE hypertable_name = 'sensor_data' AND NOT is_compressed; +-- disable notice/warning as the new_uncompressed_chunk_name +-- is dynamic and it will be printed in those messages. +SET client_min_messages TO ERROR; CALL run_job(:compressjob_id); --- check compression status is not changed -SELECT status FROM _timescaledb_catalog.chunk where table_name = '_hyper_4_17_chunk'; +SET client_min_messages TO NOTICE; + +-- check compression status is not changed for the chunk whose status was manually updated +SELECT status FROM _timescaledb_catalog.chunk where table_name = :'new_uncompressed_chunk_name'; + +-- confirm all the other new chunks are now compressed despite +-- facing an error when trying to compress :'new_uncompressed_chunk_name' +SELECT count(*) = 0 + FROM timescaledb_information.chunks + WHERE hypertable_name = 'sensor_data' AND NOT is_compressed; + -- cleanup -DROP TABLE sensor_data CASCADE; +DROP TABLE sensor_data; From aa1d7bfc80dd73a9d6f7234b0d81770c57cd016e Mon Sep 17 00:00:00 2001 From: Lakshmi Narayanan Sreethar Date: Tue, 11 Apr 2023 16:44:13 +0530 Subject: [PATCH 2/2] Copy scheduled_jobs list before sorting it The start_scheduled_jobs function mistakenly sorts the scheduled_jobs list in-place. As a result, when the ts_update_scheduled_jobs_list function compares the updated list of scheduled jobs with the existing scheduled jobs list, it is comparing a list that is sorted by job_id to one that is sorted by next_start time. Fix that by properly copying the scheduled_jobs list into a new list and use that for sorting. Fixes #5537 --- CHANGELOG.md | 1 + src/bgw/scheduler.c | 4 +- tsl/test/expected/bgw_custom.out | 85 ++++++++++++++++++++++++++++++++ tsl/test/sql/bgw_custom.sql | 62 +++++++++++++++++++++++ 4 files changed, 149 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4d1293eb2d..1c4da0b5f47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ accidentally triggering the load of a previous DB version.** * #5544 Fix refresh from beginning of Continuous Aggregate with variable time bucket * #5556 Fix duplicated entries on timescaledb_experimental.policies view * #5433 Fix join rte in CAggs with joins +* #5543 Copy scheduled_jobs list before sorting it **Thanks** * @nikolaps for reporting an issue with the COPY fetcher diff --git a/src/bgw/scheduler.c b/src/bgw/scheduler.c index e2958498c14..1f855701f97 100644 --- a/src/bgw/scheduler.c +++ b/src/bgw/scheduler.c @@ -575,7 +575,7 @@ start_scheduled_jobs(register_background_worker_callback_type bgw_register) ordered_scheduled_jobs = list_qsort(scheduled_jobs, cmp_next_start); #else /* PG13 does in-place sort */ - ordered_scheduled_jobs = scheduled_jobs; + ordered_scheduled_jobs = list_copy(scheduled_jobs); list_sort(ordered_scheduled_jobs, cmp_next_start); #endif @@ -588,9 +588,7 @@ start_scheduled_jobs(register_background_worker_callback_type bgw_register) scheduled_ts_bgw_job_start(sjob, bgw_register); } -#if PG13_LT list_free(ordered_scheduled_jobs); -#endif } /* Returns the earliest time the scheduler should start a job that is waiting to be started */ diff --git a/tsl/test/expected/bgw_custom.out b/tsl/test/expected/bgw_custom.out index 75327b72266..0f2de308cc3 100644 --- a/tsl/test/expected/bgw_custom.out +++ b/tsl/test/expected/bgw_custom.out @@ -971,3 +971,88 @@ SELECT count(*) = 0 -- cleanup DROP TABLE sensor_data; +-- Github issue #5537 +-- Proc that waits until the given job enters the expected state +CREATE OR REPLACE PROCEDURE wait_for_job_status(job_param_id INTEGER, expected_status TEXT, spins INTEGER=:TEST_SPINWAIT_ITERS) +LANGUAGE PLPGSQL AS $$ +DECLARE + jobstatus TEXT; +BEGIN + FOR i in 1..spins + LOOP + SELECT job_status FROM timescaledb_information.job_stats WHERE job_id = job_param_id INTO jobstatus; + IF jobstatus = expected_status THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + ROLLBACK; + END LOOP; + RAISE EXCEPTION 'wait_for_job_status(%): timeout after % tries', job_param_id, spins; +END; +$$; +-- Proc that sleeps for 1m - to keep the test jobs in running state +CREATE OR REPLACE PROCEDURE proc_that_sleeps(job_id INT, config JSONB) +LANGUAGE PLPGSQL AS +$$ +BEGIN + PERFORM pg_sleep(60); +END +$$; +-- create new jobs and ensure that the second one gets scheduled +-- before the first one by adjusting the initial_start values +SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz + interval '2s') AS job_id_1 \gset +SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz - interval '2s') AS job_id_2 \gset +-- wait for the jobs to start running job_2 will start running first +CALL wait_for_job_status(:job_id_2, 'Running'); +CALL wait_for_job_status(:job_id_1, 'Running'); +-- add a new job and wait for it to start +SELECT add_job('proc_that_sleeps', '1h') AS job_id_3 \gset +CALL wait_for_job_status(:job_id_3, 'Running'); +-- verify that none of the jobs crashed +SELECT job_id, job_status, next_start, + total_runs, total_successes, total_failures + FROM timescaledb_information.job_stats + WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3) + ORDER BY job_id; + job_id | job_status | next_start | total_runs | total_successes | total_failures +--------+------------+------------+------------+-----------------+---------------- + 1015 | Running | -infinity | 1 | 0 | 0 + 1016 | Running | -infinity | 1 | 0 | 0 + 1017 | Running | -infinity | 1 | 0 | 0 +(3 rows) + +SELECT job_id, err_message + FROM timescaledb_information.job_errors + WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3); + job_id | err_message +--------+------------- +(0 rows) + +-- cleanup +SELECT _timescaledb_internal.stop_background_workers(); + stop_background_workers +------------------------- + t +(1 row) + +CALL wait_for_job_status(:job_id_1, 'Scheduled'); +CALL wait_for_job_status(:job_id_2, 'Scheduled'); +CALL wait_for_job_status(:job_id_3, 'Scheduled'); +SELECT delete_job(:job_id_1); + delete_job +------------ + +(1 row) + +SELECT delete_job(:job_id_2); + delete_job +------------ + +(1 row) + +SELECT delete_job(:job_id_3); + delete_job +------------ + +(1 row) + diff --git a/tsl/test/sql/bgw_custom.sql b/tsl/test/sql/bgw_custom.sql index f5f22864e96..4e8b3f3838a 100644 --- a/tsl/test/sql/bgw_custom.sql +++ b/tsl/test/sql/bgw_custom.sql @@ -626,3 +626,65 @@ SELECT count(*) = 0 -- cleanup DROP TABLE sensor_data; + +-- Github issue #5537 +-- Proc that waits until the given job enters the expected state +CREATE OR REPLACE PROCEDURE wait_for_job_status(job_param_id INTEGER, expected_status TEXT, spins INTEGER=:TEST_SPINWAIT_ITERS) +LANGUAGE PLPGSQL AS $$ +DECLARE + jobstatus TEXT; +BEGIN + FOR i in 1..spins + LOOP + SELECT job_status FROM timescaledb_information.job_stats WHERE job_id = job_param_id INTO jobstatus; + IF jobstatus = expected_status THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + ROLLBACK; + END LOOP; + RAISE EXCEPTION 'wait_for_job_status(%): timeout after % tries', job_param_id, spins; +END; +$$; + +-- Proc that sleeps for 1m - to keep the test jobs in running state +CREATE OR REPLACE PROCEDURE proc_that_sleeps(job_id INT, config JSONB) +LANGUAGE PLPGSQL AS +$$ +BEGIN + PERFORM pg_sleep(60); +END +$$; + +-- create new jobs and ensure that the second one gets scheduled +-- before the first one by adjusting the initial_start values +SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz + interval '2s') AS job_id_1 \gset +SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz - interval '2s') AS job_id_2 \gset + +-- wait for the jobs to start running job_2 will start running first +CALL wait_for_job_status(:job_id_2, 'Running'); +CALL wait_for_job_status(:job_id_1, 'Running'); + +-- add a new job and wait for it to start +SELECT add_job('proc_that_sleeps', '1h') AS job_id_3 \gset +CALL wait_for_job_status(:job_id_3, 'Running'); + +-- verify that none of the jobs crashed +SELECT job_id, job_status, next_start, + total_runs, total_successes, total_failures + FROM timescaledb_information.job_stats + WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3) + ORDER BY job_id; +SELECT job_id, err_message + FROM timescaledb_information.job_errors + WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3); + +-- cleanup +SELECT _timescaledb_internal.stop_background_workers(); +CALL wait_for_job_status(:job_id_1, 'Scheduled'); +CALL wait_for_job_status(:job_id_2, 'Scheduled'); +CALL wait_for_job_status(:job_id_3, 'Scheduled'); +SELECT delete_job(:job_id_1); +SELECT delete_job(:job_id_2); +SELECT delete_job(:job_id_3); +