Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add optional logging for maintenance jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
cevian committed Jul 2, 2021
1 parent 5dbb803 commit 4d26951
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 38 deletions.
12 changes: 10 additions & 2 deletions pkg/migrations/migration_files_generated.go

Large diffs are not rendered by default.

121 changes: 97 additions & 24 deletions pkg/migrations/sql/idempotent/base.sql
Expand Up @@ -1610,7 +1610,7 @@ GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.drop_metric_chunk_data(text, timestampt

--drop chunks from metrics tables and delete the appropriate series.
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.drop_metric_chunks(
metric_name TEXT, older_than TIMESTAMPTZ, ran_at TIMESTAMPTZ DEFAULT now()
metric_name TEXT, older_than TIMESTAMPTZ, ran_at TIMESTAMPTZ = now(), log_verbose BOOLEAN = FALSE
) AS $func$
DECLARE
metric_id int;
Expand All @@ -1619,6 +1619,8 @@ DECLARE
time_dimension_id INT;
last_updated TIMESTAMPTZ;
present_epoch BIGINT;
lastT TIMESTAMPTZ;
startT TIMESTAMPTZ;
BEGIN
SELECT id, table_name
INTO STRICT metric_id, metric_table
Expand All @@ -1627,6 +1629,12 @@ BEGIN
SELECT older_than + INTERVAL '1 hour'
INTO check_time;

startT := clock_timestamp();

IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: starting', metric_name;
END IF;

-- transaction 1
IF SCHEMA_CATALOG.is_timescaledb_installed() THEN
--Get the time dimension id for the time dimension
Expand Down Expand Up @@ -1659,27 +1667,45 @@ BEGIN
IF older_than IS NULL THEN
-- even though there are no new Ids in need of deletion,
-- we may still have old ones to delete
lastT := clock_timestamp();
PERFORM SCHEMA_CATALOG.delete_expired_series(metric_table, ran_at, present_epoch, last_updated);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series as only action in %', metric_name, clock_timestamp-lastT;
RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, clock_timestamp()-startT;
END IF;
RETURN;
END IF;

-- transaction 2
lastT := clock_timestamp();
PERFORM SCHEMA_CATALOG.mark_unused_series(metric_table, older_than, check_time);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done marking unused series in %', metric_name, clock_timestamp()-lastT;
END IF;
COMMIT;

-- transaction 3
lastT := clock_timestamp();
PERFORM SCHEMA_CATALOG.drop_metric_chunk_data(metric_name, older_than);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done dropping chunks in %', metric_name, clock_timestamp()-lastT;
END IF;
SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM
SCHEMA_CATALOG.ids_epoch LIMIT 1;
COMMIT;

-- transaction 4
lastT := clock_timestamp();
PERFORM SCHEMA_CATALOG.delete_expired_series(metric_table, ran_at, present_epoch, last_updated);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series in %', metric_name, clock_timestamp()-lastT;
RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, clock_timestamp()-startT;
END IF;
RETURN;
END
$func$
LANGUAGE PLPGSQL;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.drop_metric_chunks(text, timestamptz, timestamptz) TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.drop_metric_chunks(text, timestamptz, timestamptz, boolean) TO prom_maintenance;

--Order by random with stable marking gives us same order in a statement and different
-- orderings in different statements
Expand Down Expand Up @@ -1710,10 +1736,11 @@ $$
LANGUAGE PLPGSQL STABLE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_metrics_that_need_drop_chunk() TO prom_reader;

CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy()
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy(log_verbose boolean)
AS $$
DECLARE
r RECORD;
remaining_metrics SCHEMA_CATALOG.metric[] DEFAULT '{}';
BEGIN
--Do one loop with metric that could be locked without waiting.
--This allows you to do everything you can while avoiding lock contention.
Expand All @@ -1724,71 +1751,97 @@ BEGIN
SELECT *
FROM SCHEMA_CATALOG.get_metrics_that_need_drop_chunk()
LOOP
CONTINUE WHEN NOT SCHEMA_CATALOG.lock_metric_for_maintenance(r.id, wait=>false);
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name));
IF NOT SCHEMA_CATALOG.lock_metric_for_maintenance(r.id, wait=>false) THEN
remaining_metrics := remaining_metrics || r;
CONTINUE;
END IF;
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name), log_verbose=>log_verbose);
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
END LOOP;

IF log_verbose AND array_length(remaining_metrics, 1) > 0 THEN
RAISE LOG 'promscale maintenance: data retention: need to wait to grab locks on % metrics', array_length(remaining_metrics, 1);
END IF;

FOR r IN
SELECT *
FROM SCHEMA_CATALOG.get_metrics_that_need_drop_chunk()
FROM unnest(remaining_metrics)
LOOP
PERFORM SCHEMA_CATALOG.lock_metric_for_maintenance(r.id);
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name));
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name), log_verbose=>log_verbose);
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy()
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy(boolean)
IS 'drops old data according to the data retention policy. This procedure should be run regularly in a cron job';
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy() TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy(boolean) TO prom_maintenance;

--public procedure to be called by cron
--right now just does data retention but name is generic so that
--we can add stuff later without needing people to change their cron scripts
--should be the last thing run in a session so that all session locks
--are guaranteed released on error.
CREATE OR REPLACE PROCEDURE SCHEMA_PROM.execute_maintenance()
CREATE OR REPLACE PROCEDURE SCHEMA_PROM.execute_maintenance(log_verbose boolean = false)
AS $$
DECLARE
startT TIMESTAMPTZ;
BEGIN
CALL SCHEMA_CATALOG.execute_data_retention_policy();
startT := clock_timestamp();
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: starting';
END IF;
CALL SCHEMA_CATALOG.execute_data_retention_policy(log_verbose=>log_verbose);
IF SCHEMA_CATALOG.get_timescale_major_version() >= 2 THEN
CALL SCHEMA_CATALOG.execute_compression_policy();
IF log_verbose THEN
RAISE LOG 'promscale maintenance: compression: starting';
END IF;
CALL SCHEMA_CATALOG.execute_compression_policy(log_verbose=>log_verbose);
END IF;
IF log_verbose THEN
RAISE LOG 'promscale maintenance: finished in %', clock_timestamp()-startT;
END IF;
END;
$$ LANGUAGE PLPGSQL;
COMMENT ON PROCEDURE SCHEMA_PROM.execute_maintenance()
COMMENT ON PROCEDURE SCHEMA_PROM.execute_maintenance(boolean)
IS 'Execute maintenance tasks like dropping data according to retention policy. This procedure should be run regularly in a cron job';
GRANT EXECUTE ON PROCEDURE SCHEMA_PROM.execute_maintenance() TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_PROM.execute_maintenance(boolean) TO prom_maintenance;

CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_maintenance_job(job_id int, config jsonb)
AS $$
DECLARE
log_verbose boolean;
BEGIN
CALL SCHEMA_PROM.execute_maintenance();
log_verbose := coalesce(config->>'log_verbose', 'false')::boolean;
CALL SCHEMA_PROM.execute_maintenance(log_verbose=>log_verbose);
END
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE FUNCTION SCHEMA_PROM.config_maintenance_jobs(number_jobs int, new_schedule_interval interval)
CREATE OR REPLACE FUNCTION SCHEMA_PROM.config_maintenance_jobs(number_jobs int, new_schedule_interval interval, new_config jsonb = NULL)
RETURNS BOOLEAN
AS $func$
DECLARE
cnt int;
log_verbose boolean;
BEGIN
--check format of config
log_verbose := coalesce(new_config->>'log_verbose', 'false')::boolean;

PERFORM SCHEMA_TIMESCALE.delete_job(job_id)
FROM timescaledb_information.jobs
WHERE proc_schema = 'SCHEMA_CATALOG' AND proc_name = 'execute_maintenance_job' AND schedule_interval != new_schedule_interval;
WHERE proc_schema = 'SCHEMA_CATALOG' AND proc_name = 'execute_maintenance_job' AND (schedule_interval != new_schedule_interval OR new_config IS DISTINCT FROM config) ;


SELECT count(*) INTO cnt
FROM timescaledb_information.jobs
WHERE proc_schema = 'SCHEMA_CATALOG' AND proc_name = 'execute_maintenance_job';

IF cnt < number_jobs THEN
PERFORM SCHEMA_TIMESCALE.add_job('SCHEMA_CATALOG.execute_maintenance_job', new_schedule_interval)
PERFORM SCHEMA_TIMESCALE.add_job('SCHEMA_CATALOG.execute_maintenance_job', new_schedule_interval, config=>new_config)
FROM generate_series(1, number_jobs-cnt);
END IF;

Expand All @@ -1808,9 +1861,9 @@ SECURITY DEFINER
--search path must be set for security definer
SET search_path = pg_temp;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval) TO prom_admin;
COMMENT ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval)
REVOKE ALL ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval, jsonb) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval, jsonb) TO prom_admin;
COMMENT ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval, jsonb)
IS 'Configure the number of maintence jobs run by the job scheduler, as well as their scheduled interval';


Expand Down Expand Up @@ -2478,11 +2531,13 @@ LANGUAGE PLPGSQL STABLE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_metrics_that_need_compression() TO prom_maintenance;

--only for timescaledb 2.0 in 1.x we use compression policies
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_compression_policy()
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_compression_policy(log_verbose boolean = false)
AS $$
DECLARE
r SCHEMA_CATALOG.metric;
remaining_metrics SCHEMA_CATALOG.metric[] DEFAULT '{}';
startT TIMESTAMPTZ;
lockStartT TIMESTAMPTZ;
BEGIN
--Do one loop with metric that could be locked without waiting.
--This allows you to do everything you can while avoiding lock contention.
Expand All @@ -2497,7 +2552,14 @@ BEGIN
remaining_metrics := remaining_metrics || r;
CONTINUE;
END IF;
IF log_verbose THEN
startT := clock_timestamp();
RAISE LOG 'promscale maintenance: compression: metric %: starting, without lock wait', r.metric_name;
END IF;
CALL SCHEMA_CATALOG.compress_metric_chunks(r.metric_name);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: compression: metric %: finished in %', r.metric_name, clock_timestamp()-startT;
END IF;
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
Expand All @@ -2507,17 +2569,28 @@ BEGIN
SELECT *
FROM unnest(remaining_metrics)
LOOP
IF log_verbose THEN
lockStartT := clock_timestamp();
RAISE LOG 'promscale maintenance: compression: metric %: waiting for lock', r.metric_name;
END IF;
PERFORM SCHEMA_CATALOG.lock_metric_for_maintenance(r.id);
IF log_verbose THEN
startT := clock_timestamp();
RAISE LOG 'promscale maintenance: compression: metric %: starting', r.metric_name;
END IF;
CALL SCHEMA_CATALOG.compress_metric_chunks(r.metric_name);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: compression: metric %: finished in % (lock took %; compression took %)', r.metric_name, clock_timestamp()-lockStartT, startT-lockStartT, clock_timestamp()-startT;
END IF;
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy()
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy(boolean)
IS 'compress data according to the policy. This procedure should be run regularly in a cron job';
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy() TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy(boolean) TO prom_maintenance;

CREATE OR REPLACE PROCEDURE SCHEMA_PROM.add_prom_node(node_name TEXT, attach_to_existing_metrics BOOLEAN = true)
AS $func$
Expand Down
@@ -0,0 +1,5 @@
DROP PROCEDURE IF EXISTS SCHEMA_CATALOG.drop_metric_chunks(TEXT, TIMESTAMPTZ, TIMESTAMPTZ);
DROP PROCEDURE IF EXISTS SCHEMA_CATALOG.execute_data_retention_policy();
DROP PROCEDURE IF EXISTS SCHEMA_PROM.execute_maintenance();
DROP FUNCTION IF EXISTS SCHEMA_PROM.config_maintenance_jobs(int, interval);
DROP PROCEDURE IF EXISTS SCHEMA_CATALOG.execute_compression_policy();
91 changes: 81 additions & 10 deletions pkg/tests/end_to_end_tests/create_test.go
Expand Up @@ -1292,7 +1292,7 @@ func TestExecuteMaintenanceCompressionJob(t *testing.T) {

runMaintenanceJob := func() {
//execute_maintenance and not execute_compression_policy since we want to test end-to-end
_, err = dbJob.Exec(context.Background(), `CALL prom_api.execute_maintenance()`)
_, err = dbJob.Exec(context.Background(), `CALL prom_api.execute_maintenance(log_verbose=>true)`)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1539,10 +1539,16 @@ func TestConfigMaintenanceJobs(t *testing.T) {
t.Fatal("Incorrect number of jobs at startup")
}

changeJobs := func(numJobs int, scheduleInterval time.Duration) {
_, err = db.Exec(context.Background(), "SELECT config_maintenance_jobs($1, $2)", numJobs, scheduleInterval)
changeJobs := func(numJobs int, scheduleInterval time.Duration, config *string, configErr bool) {
_, err = db.Exec(context.Background(), "SELECT config_maintenance_jobs($1, $2, $3)", numJobs, scheduleInterval, config)
if err != nil {
t.Fatal(err)
if !configErr {
t.Fatal(err)
}
return
}
if configErr {
t.Fatal("Expect config error")
}
err = db.QueryRow(context.Background(),
"SELECT count(*) FROM timescaledb_information.jobs WHERE proc_schema = '_prom_catalog' AND proc_name = 'execute_maintenance_job' AND schedule_interval = $1", scheduleInterval).
Expand All @@ -1562,14 +1568,79 @@ func TestConfigMaintenanceJobs(t *testing.T) {
if cnt != 0 {
t.Fatalf("found %v jobs with wrong schedule interval", cnt)
}
if config == nil {
err = db.QueryRow(context.Background(),
"SELECT count(*) FROM timescaledb_information.jobs WHERE proc_schema = '_prom_catalog' AND proc_name = 'execute_maintenance_job' AND config IS NOT NULL").
Scan(&cnt)
if err != nil {
t.Fatal(err)
}
if cnt != 0 {
t.Fatalf("found %v jobs with wrong NULL config", cnt)
}
} else {
err = db.QueryRow(context.Background(),
"SELECT count(*) FROM timescaledb_information.jobs WHERE proc_schema = '_prom_catalog' AND proc_name = 'execute_maintenance_job' AND config != $1::jsonb", config).
Scan(&cnt)
if err != nil {
t.Fatal(err)
}
if cnt != 0 {
t.Fatalf("found %v jobs with wrong config", cnt)
}
}
}

changeJobs(4, time.Minute*30, nil, false)
changeJobs(4, time.Minute*45, nil, false)
changeJobs(5, time.Minute*45, nil, false)
changeJobs(2, time.Minute*45, nil, false)
changeJobs(1, time.Minute*30, nil, false)
changeJobs(0, time.Minute*30, nil, false)
config := `{"log_verbose": true}`
changeJobs(2, time.Minute*45, &config, false)
changeJobs(3, time.Minute*45, &config, false)
config = `{"log_verbose": false}`
changeJobs(1, time.Minute*45, &config, false)
config = `{"log_verbose": "rand"}`
changeJobs(1, time.Minute*45, &config, true)
})
}

func TestExecuteMaintJob(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
if !*useTimescaleDB {
t.Skip("jobs meaningless without TimescaleDB")
}
if !*useTimescale2 {
t.Skip("test meaningless without Timescale 2")
}
withDB(t, *testDatabase, func(dbOwner *pgxpool.Pool, t testing.TB) {
db := dbOwner

execJob := func(config *string, configErr bool) {
_, err := db.Exec(context.Background(), "CALL _prom_catalog.execute_maintenance_job(2, $1)", config)
if err != nil {
if !configErr {
t.Fatal(err)
}
return
}
if configErr {
t.Fatal("Expect config error")
}

}

changeJobs(4, time.Minute*30)
changeJobs(4, time.Minute*45)
changeJobs(5, time.Minute*45)
changeJobs(2, time.Minute*45)
changeJobs(1, time.Minute*30)
changeJobs(0, time.Minute*30)
execJob(nil, false)
config := `{"log_verbose": true}`
execJob(&config, false)
config = `{"log_verbose": false}`
execJob(&config, false)
config = `{"log_verbose": "rr"}`
execJob(&config, true)
})
}

Expand Down

0 comments on commit 4d26951

Please sign in to comment.