Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy scheduled_jobs list before sorting it #5543

Merged
merged 2 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ accidentally triggering the load of a previous DB version.**
* #5544 Fix refresh from beginning of Continuous Aggregate with variable time bucket
* #5556 Fix duplicated entries on timescaledb_experimental.policies view
* #5433 Fix join rte in CAggs with joins
* #5543 Copy scheduled_jobs list before sorting it

**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher
Expand Down
4 changes: 1 addition & 3 deletions src/bgw/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ start_scheduled_jobs(register_background_worker_callback_type bgw_register)
ordered_scheduled_jobs = list_qsort(scheduled_jobs, cmp_next_start);
#else
/* PG13 does in-place sort */
ordered_scheduled_jobs = scheduled_jobs;
ordered_scheduled_jobs = list_copy(scheduled_jobs);
list_sort(ordered_scheduled_jobs, cmp_next_start);
#endif

Expand All @@ -588,9 +588,7 @@ start_scheduled_jobs(register_background_worker_callback_type bgw_register)
scheduled_ts_bgw_job_start(sjob, bgw_register);
}

#if PG13_LT
list_free(ordered_scheduled_jobs);
#endif
}

/* Returns the earliest time the scheduler should start a job that is waiting to be started */
Expand Down
152 changes: 137 additions & 15 deletions tsl/test/expected/bgw_custom.out
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,7 @@ SELECT delete_job(1000);
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
-- background workers are disabled, so the job will not run --
SELECT add_job( proc=>'custom_func',
schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05');
add_job
---------
1005
(1 row)

schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05') AS job_id_1 \gset
SELECT job_id, next_start, scheduled, schedule_interval
FROM timescaledb_information.jobs WHERE job_id > 1000;
job_id | next_start | scheduled | schedule_interval
Expand All @@ -272,6 +267,12 @@ total_successes | 0
total_failures | 0

\x
SELECT delete_job(:job_id_1);
delete_job
------------

(1 row)

-- tests for #3545
CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS
$BODY$
Expand Down Expand Up @@ -495,6 +496,13 @@ order by id;
3 | _hyper_1_3_chunk | 1
(3 rows)

-- Drop the compression job
SELECT delete_job(:job_id_4);
delete_job
------------

(1 row)

-- Decompress chunks before create the cagg
SELECT decompress_chunk(c) FROM show_chunks('conditions') c;
decompress_chunk
Expand Down Expand Up @@ -571,6 +579,9 @@ FROM _timescaledb_config.bgw_job WHERE id = :job_id_5;
----+-----------+---------------
(0 rows)

-- Cleanup
DROP TABLE conditions;
DROP TABLE custom_log;
-- Stop Background Workers
SELECT _timescaledb_internal.stop_background_workers();
stop_background_workers
Expand Down Expand Up @@ -861,8 +872,6 @@ BEGIN
RETURN j1 || j2;
END
$$ LANGUAGE PLPGSQL;
create table jsonb_values (j jsonb, i int);
insert into jsonb_values values ('{"refresh_after":"2 weeks"}', 1), ('{"compress_after":"2 weeks"}', 2), ('{"drop_after":"2 weeks"}', 3);
CREATE AGGREGATE sum_jsb (jsonb)
(
sfunc = jsonb_add,
Expand All @@ -872,6 +881,10 @@ CREATE AGGREGATE sum_jsb (jsonb)
-- for test coverage, check unsupported aggregate type
select add_job('test_proc_with_check', '5 secs', config => '{}', check_config => 'sum_jsb'::regproc);
ERROR: unsupported function type
-- Cleanup jobs
TRUNCATE _timescaledb_config.bgw_job CASCADE;
NOTICE: truncate cascades to table "bgw_job_stat"
NOTICE: truncate cascades to table "bgw_policy_chunk_stats"
-- github issue 4610
CREATE TABLE sensor_data
(
Expand Down Expand Up @@ -919,18 +932,127 @@ INSERT INTO sensor_data
generate_series(1, 30, 1 ) AS g2(sensor_id)
ORDER BY
time;
-- get the name of a new uncompressed chunk
SELECT chunk_name AS new_uncompressed_chunk_name
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data' AND NOT is_compressed LIMIT 1 \gset
-- change compression status so that this chunk is skipped when policy is run
update _timescaledb_catalog.chunk set status=3 where table_name = '_hyper_4_17_chunk';
update _timescaledb_catalog.chunk set status=3 where table_name = :'new_uncompressed_chunk_name';
-- verify that there are other uncompressed new chunks that need to be compressed
SELECT count(*) > 1
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data' AND NOT is_compressed;
?column?
----------
t
(1 row)

-- disable notice/warning as the new_uncompressed_chunk_name
-- is dynamic and it will be printed in those messages.
SET client_min_messages TO ERROR;
CALL run_job(:compressjob_id);
NOTICE: chunk "_hyper_4_17_chunk" is not compressed
WARNING: compressing chunk "_timescaledb_internal._hyper_4_17_chunk" failed when compression policy is executed
-- check compression status is not changed
SELECT status FROM _timescaledb_catalog.chunk where table_name = '_hyper_4_17_chunk';
SET client_min_messages TO NOTICE;
-- check compression status is not changed for the chunk whose status was manually updated
SELECT status FROM _timescaledb_catalog.chunk where table_name = :'new_uncompressed_chunk_name';
status
--------
3
(1 row)

-- confirm all the other new chunks are now compressed despite
-- facing an error when trying to compress :'new_uncompressed_chunk_name'
SELECT count(*) = 0
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data' AND NOT is_compressed;
?column?
----------
t
(1 row)

-- cleanup
DROP TABLE sensor_data;
-- Github issue #5537
-- Proc that waits until the given job enters the expected state
CREATE OR REPLACE PROCEDURE wait_for_job_status(job_param_id INTEGER, expected_status TEXT, spins INTEGER=:TEST_SPINWAIT_ITERS)
LANGUAGE PLPGSQL AS $$
DECLARE
jobstatus TEXT;
BEGIN
FOR i in 1..spins
LOOP
SELECT job_status FROM timescaledb_information.job_stats WHERE job_id = job_param_id INTO jobstatus;
IF jobstatus = expected_status THEN
RETURN;
END IF;
PERFORM pg_sleep(0.1);
ROLLBACK;
END LOOP;
RAISE EXCEPTION 'wait_for_job_status(%): timeout after % tries', job_param_id, spins;
END;
$$;
-- Proc that sleeps for 1m - to keep the test jobs in running state
CREATE OR REPLACE PROCEDURE proc_that_sleeps(job_id INT, config JSONB)
LANGUAGE PLPGSQL AS
$$
BEGIN
PERFORM pg_sleep(60);
END
$$;
-- create new jobs and ensure that the second one gets scheduled
-- before the first one by adjusting the initial_start values
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz + interval '2s') AS job_id_1 \gset
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz - interval '2s') AS job_id_2 \gset
-- wait for the jobs to start running job_2 will start running first
CALL wait_for_job_status(:job_id_2, 'Running');
CALL wait_for_job_status(:job_id_1, 'Running');
-- add a new job and wait for it to start
SELECT add_job('proc_that_sleeps', '1h') AS job_id_3 \gset
CALL wait_for_job_status(:job_id_3, 'Running');
-- verify that none of the jobs crashed
SELECT job_id, job_status, next_start,
total_runs, total_successes, total_failures
FROM timescaledb_information.job_stats
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3)
ORDER BY job_id;
job_id | job_status | next_start | total_runs | total_successes | total_failures
--------+------------+------------+------------+-----------------+----------------
1015 | Running | -infinity | 1 | 0 | 0
1016 | Running | -infinity | 1 | 0 | 0
1017 | Running | -infinity | 1 | 0 | 0
(3 rows)

SELECT job_id, err_message
FROM timescaledb_information.job_errors
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3);
job_id | err_message
--------+-------------
(0 rows)

-- cleanup
DROP TABLE sensor_data CASCADE;
NOTICE: drop cascades to 8 other objects
SELECT _timescaledb_internal.stop_background_workers();
stop_background_workers
-------------------------
t
(1 row)

CALL wait_for_job_status(:job_id_1, 'Scheduled');
CALL wait_for_job_status(:job_id_2, 'Scheduled');
CALL wait_for_job_status(:job_id_3, 'Scheduled');
SELECT delete_job(:job_id_1);
delete_job
------------

(1 row)

SELECT delete_job(:job_id_2);
delete_job
------------

(1 row)

SELECT delete_job(:job_id_3);
delete_job
------------

(1 row)

110 changes: 101 additions & 9 deletions tsl/test/sql/bgw_custom.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ SELECT delete_job(1000);
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
-- background workers are disabled, so the job will not run --
SELECT add_job( proc=>'custom_func',
schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05');
schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05') AS job_id_1 \gset

SELECT job_id, next_start, scheduled, schedule_interval
FROM timescaledb_information.jobs WHERE job_id > 1000;
\x
SELECT * FROM timescaledb_information.job_stats WHERE job_id > 1000;
\x

SELECT delete_job(:job_id_1);

-- tests for #3545
CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS
$BODY$
Expand Down Expand Up @@ -275,6 +277,8 @@ where hypertable_id = (select id from _timescaledb_catalog.hypertable
where table_name = 'conditions')
order by id;

-- Drop the compression job
SELECT delete_job(:job_id_4);

-- Decompress chunks before create the cagg
SELECT decompress_chunk(c) FROM show_chunks('conditions') c;
Expand Down Expand Up @@ -319,6 +323,10 @@ DROP MATERIALIZED VIEW conditions_summary_daily;
SELECT id, proc_name, hypertable_id
FROM _timescaledb_config.bgw_job WHERE id = :job_id_5;

-- Cleanup
DROP TABLE conditions;
DROP TABLE custom_log;

-- Stop Background Workers
SELECT _timescaledb_internal.stop_background_workers();

Expand Down Expand Up @@ -532,9 +540,6 @@ BEGIN
END
$$ LANGUAGE PLPGSQL;

create table jsonb_values (j jsonb, i int);
insert into jsonb_values values ('{"refresh_after":"2 weeks"}', 1), ('{"compress_after":"2 weeks"}', 2), ('{"drop_after":"2 weeks"}', 3);

CREATE AGGREGATE sum_jsb (jsonb)
(
sfunc = jsonb_add,
Expand All @@ -545,6 +550,9 @@ CREATE AGGREGATE sum_jsb (jsonb)
-- for test coverage, check unsupported aggregate type
select add_job('test_proc_with_check', '5 secs', config => '{}', check_config => 'sum_jsb'::regproc);

-- Cleanup jobs
TRUNCATE _timescaledb_config.bgw_job CASCADE;

-- github issue 4610
CREATE TABLE sensor_data
(
Expand Down Expand Up @@ -576,7 +584,6 @@ SELECT add_compression_policy('sensor_data', INTERVAL '1' minute) AS compressjob
SELECT alter_job(id,config:=jsonb_set(config,'{recompress}', 'true')) FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;

-- create new chunks

INSERT INTO sensor_data
SELECT
time + (INTERVAL '1 minute' * random()) AS time,
Expand All @@ -588,11 +595,96 @@ INSERT INTO sensor_data
generate_series(1, 30, 1 ) AS g2(sensor_id)
ORDER BY
time;

-- get the name of a new uncompressed chunk
SELECT chunk_name AS new_uncompressed_chunk_name
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data' AND NOT is_compressed LIMIT 1 \gset

-- change compression status so that this chunk is skipped when policy is run
update _timescaledb_catalog.chunk set status=3 where table_name = '_hyper_4_17_chunk';
update _timescaledb_catalog.chunk set status=3 where table_name = :'new_uncompressed_chunk_name';

-- verify that there are other uncompressed new chunks that need to be compressed
SELECT count(*) > 1
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data' AND NOT is_compressed;

-- disable notice/warning as the new_uncompressed_chunk_name
-- is dynamic and it will be printed in those messages.
SET client_min_messages TO ERROR;
CALL run_job(:compressjob_id);
-- check compression status is not changed
SELECT status FROM _timescaledb_catalog.chunk where table_name = '_hyper_4_17_chunk';
SET client_min_messages TO NOTICE;

-- check compression status is not changed for the chunk whose status was manually updated
SELECT status FROM _timescaledb_catalog.chunk where table_name = :'new_uncompressed_chunk_name';

-- confirm all the other new chunks are now compressed despite
-- facing an error when trying to compress :'new_uncompressed_chunk_name'
SELECT count(*) = 0
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data' AND NOT is_compressed;

-- cleanup
DROP TABLE sensor_data CASCADE;
DROP TABLE sensor_data;

-- Github issue #5537
-- Proc that waits until the given job enters the expected state
CREATE OR REPLACE PROCEDURE wait_for_job_status(job_param_id INTEGER, expected_status TEXT, spins INTEGER=:TEST_SPINWAIT_ITERS)
LANGUAGE PLPGSQL AS $$
DECLARE
jobstatus TEXT;
BEGIN
FOR i in 1..spins
LOOP
SELECT job_status FROM timescaledb_information.job_stats WHERE job_id = job_param_id INTO jobstatus;
IF jobstatus = expected_status THEN
RETURN;
END IF;
PERFORM pg_sleep(0.1);
ROLLBACK;
END LOOP;
RAISE EXCEPTION 'wait_for_job_status(%): timeout after % tries', job_param_id, spins;
END;
$$;

-- Proc that sleeps for 1m - to keep the test jobs in running state
CREATE OR REPLACE PROCEDURE proc_that_sleeps(job_id INT, config JSONB)
LANGUAGE PLPGSQL AS
$$
BEGIN
PERFORM pg_sleep(60);
END
$$;

-- create new jobs and ensure that the second one gets scheduled
-- before the first one by adjusting the initial_start values
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz + interval '2s') AS job_id_1 \gset
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz - interval '2s') AS job_id_2 \gset

-- wait for the jobs to start running job_2 will start running first
CALL wait_for_job_status(:job_id_2, 'Running');
CALL wait_for_job_status(:job_id_1, 'Running');

-- add a new job and wait for it to start
SELECT add_job('proc_that_sleeps', '1h') AS job_id_3 \gset
CALL wait_for_job_status(:job_id_3, 'Running');

-- verify that none of the jobs crashed
SELECT job_id, job_status, next_start,
total_runs, total_successes, total_failures
FROM timescaledb_information.job_stats
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3)
ORDER BY job_id;
SELECT job_id, err_message
FROM timescaledb_information.job_errors
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3);

-- cleanup
SELECT _timescaledb_internal.stop_background_workers();
CALL wait_for_job_status(:job_id_1, 'Scheduled');
CALL wait_for_job_status(:job_id_2, 'Scheduled');
CALL wait_for_job_status(:job_id_3, 'Scheduled');
SELECT delete_job(:job_id_1);
SELECT delete_job(:job_id_2);
SELECT delete_job(:job_id_3);