Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimescaleDB 2.x Continuous Aggrecation long recalculation #2867

Closed
Antiarchitect opened this issue Jan 27, 2021 · 33 comments · Fixed by #2926
Closed

TimescaleDB 2.x Continuous Aggrecation long recalculation #2867

Antiarchitect opened this issue Jan 27, 2021 · 33 comments · Fixed by #2926

Comments

@Antiarchitect
Copy link

Antiarchitect commented Jan 27, 2021

We have an issue with the Continuous Aggregation feature with TimescaleDB 2.x upgrade.
We storing data with 1 seconds granularity in hypertable and doing rollups to 10m and 1h intervals with continuous aggregation views.
Before 2.0 we have no issues with refreshes, but with 2.0 rollups get stuck after inserting a bunch of historical data.
We've looked through a code and was able to tell that the issue raises in invalidation logic.

Intervals

Our system doing separate inserts of data points rounded to 1-second grid (shown in red on the diagram) and this action leads to creating invalidation records in the _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log table. Because all of these records are 0 length intervals - the aggregation process cannot merge them into bigger intervals.
This leads to adding dozens of records into _timescaledb_catalog.continuous_aggs_materialization_invalidation_log and the continuous aggregation job invalidates the same 10 minutes interval again and again for each changed record.
Maybe the invalidation process can be more clever and join intervals that fall into the same bucket to avoid recalculating the same intervals over and over again? Another thought is to recalculate the bucket once taking into account all intervals currently invalidating it and not once per invalidation log record.

I hope this problem is not ours only. Please help us to understand and investigate the problem properly.

postgres=# SELECT version();
                                                 version
---------------------------------------------------------------------------------------------------------
 PostgreSQL 12.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39), 64-bit
(1 row)
postgres=# \dx
                                      List of installed extensions
    Name     | Version |   Schema   |                            Description
-------------+---------+------------+-------------------------------------------------------------------
 plpgsql     | 1.0     | pg_catalog | PL/pgSQL procedural language
 timescaledb | 2.0.0   | public     | Enables scalable inserts and complex queries for time-series data
@dimonzozo
Copy link

It would be great to solve this problem somehow. I've tried workaround with insert triggers on continuous_aggs_hypertable_invalidation_log to extend zero length interval to 1-second interval and hoped merge function to merge them. But the trigger seems to not being executed when data inserted via low-level functions (which used by TimescaleDB extension).

@erimatnor
Copy link
Contributor

Thanks for reporting this issue. Looks indeed to be an issue with the invalidation logic.

@aelg
Copy link

aelg commented Feb 3, 2021

I am seeing similar issues with refresh_continuous_aggregate ie. it’s taking forever. It seems to not affect newly inserted data, the CAGG policies are keeping up. I am not backfilling data, but the initial data dump was inserting historic data. And to get the CAGGs up to date I did run refresh_continuous_aggregate but it is extremely slow. I am seeing millions of rows in _timescaledb_catalog.continuous_aggs_materialization_invalidation_log so my guess is that it is connected to this issue.

The aggregations are making per minute, per hour and per day buckets from 2 months of data (100s of millions of rows). Some lighter (in number of rows per bucket) CAGGs have been able to complete (very slowly), but the heavier ones are not completing (have been running for 24h) even when restricting the refresh_continuous_aggregate interval to a single day.

@slasktrat
Copy link

I also have some issues I think may be related to this. We upgraded to 2.0 a couple of days ago, but after that continuous aggregate refresh jobs have "stopped working". What happens now is that they just run forever. I've tried all I could think of and even removed all jobs and just created one single continuous_aggregate_policy. That job has now been running for more than 24 hours constantly using 100% of one cpu core. The job is materializing 1 minute data into hourly with start_offset => INTERVAL '30 days', end_offset => INTERVAL '1 hour'. Like @aelg, running refresh_continuous_aggregate manually on smaller intervals also takes a very long time.

Some background info: we are running 400 nodes caching data and each node writing to the timescaledb every 30 minutes. These nodes may have unstable connection, so the write into timescaledb may also be hours and even days late. In other words, data are being backfilled constantly.

Another thing maybe worth mentioned (not sure if this is relevant or not?); the hypertable was created using default chunk size, so I see that chunks are apparently created weekly with a total size of up to 74GB per chunk. I lowered to 1 day chunk size now, maybe I should even go lower? (server has 90 GB RAM).

@dimonzozo
Copy link

dimonzozo commented Feb 3, 2021

We are using a workaround to mitigate this issue in production now.
Materialization process (automatically and manual) doing its job by processing all records in _timescaledb_catalog.continuous_aggs_materialization_invalidation_log which falls into invalidation interval. And because this table being polluted with dozens of small intervals it takes forever. So the workaround is following:

  1. Check what's going on by running:
select materialization_id,
       _timescaledb_internal.to_timestamp(lowest_modified_value) as lowest_modified_value,
       _timescaledb_internal.to_timestamp(greatest_modified_value) as greatest_modified_value
from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log;
  1. There are "special" records in this table that guarantee the invalidation of recent data. lowest_modified_value value for this records is somewhere in the near past and greatest_modified_value is +infinity.
select materialization_id,
       _timescaledb_internal.to_timestamp(lowest_modified_value),
       _timescaledb_internal.to_timestamp(greatest_modified_value)
from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
where greatest_modified_value = _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);

This query will return a record for each cont. agg. view.

  1. Next thing we are cleaning all other records:
delete from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
where greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);

This query will probably hang on lock because background jobs are running so in other console execute:

select _timescaledb_internal.restart_background_workers();
  1. After this operation all recent data will be processed as normal.

  2. If you need to re-process some data in past (let's say last hour, day, or week) - just update lowest_modified_value value with desired start data and a chunk of data will be processed.

@slasktrat
Copy link

Thanks, I will try to see if we can do something similar!
However, the first query resulted in error: "ERROR: timestamp out of range" so I looked closer to this table and found the result a bit strange.

Is this making any sense or can this be part of the reason to the issues we experience?
image

@dimonzozo
Copy link

dimonzozo commented Feb 3, 2021

This 2 values are +inf and -inf.

# select _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
 to_unix_microseconds
----------------------
  9223372036854775807
(1 row)

# select _timescaledb_internal.to_unix_microseconds('-infinity'::timestamp);
 to_unix_microseconds
----------------------
 -9223372036854775808
(1 row)

Seems its a bit messed up (i'm not sure about all values, but some of them are makes sense).

In TSDB source code I saw that they are using -inf to +inf records in this table to indicate that data not being processed at all and all range should be reprocessed.

I would suggest to just clear all records in this table and re-add "special" record for each view you have.

Save all uniq cont. agg. view ids:

select distinct(materialization_id) from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log;

For us it returns 505 and 506.

Then delete all this mess with:

delete from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log;

-- maybe restart workers in separate console with
-- select _timescaledb_internal.restart_background_workers();

And re-add just few records (change with your ids!):

INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
SELECT 505, _timescaledb_internal.to_unix_microseconds(now()), _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);

INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
SELECT 506, _timescaledb_internal.to_unix_microseconds(now()), _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);

@slasktrat
Copy link

Thanks! So far this seems to solve several issues. Will continue testing tomorrow.

@erimatnor erimatnor self-assigned this Feb 4, 2021
@erimatnor
Copy link
Contributor

Currently looking into this issue, but would need some help to understand the underlying cause. Do people generally experience these issues after upgrading their continuous aggregates from a previous version? Specifically, I am wondering if the invalidation logs already had a lot of entries prior to updating or whether lots of entries appeared after the update?

If anyone has a script to reproduce these issues on a fresh installation, that would be tremendously helpful. I am trying to reproduce it myself in the meantime.

@dimonzozo
Copy link

dimonzozo commented Feb 4, 2021

Sure.

CREATE TABLE datapoints
(
    timestamp TIMESTAMPTZ NOT NULL,
    device_id bigint,
    data      float       not null
);

SELECT create_hypertable('datapoints', 'timestamp');

CREATE MATERIALIZED VIEW datapoints_10m
            WITH (timescaledb.continuous, timescaledb.materialized_only = true)
AS
SELECT time_bucket('10m', timestamp) as timestamp,
       avg(data)                     as data
FROM datapoints
GROUP BY time_bucket('10m', timestamp)
WITH NO DATA;

-- inserting data for device_id == 1
INSERT INTO datapoints (timestamp, device_id, data)
SELECT g.id, 1, random()
FROM generate_series(now() - '1mon'::interval, now(), '1 second') AS g (id);

CALL refresh_continuous_aggregate('datapoints_10m', now() - '1mon'::interval, now());

SELECT hypertable_id, _timescaledb_internal.to_timestamp(watermark)
  FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold;
hypertable_id |      to_timestamp
---------------+------------------------
             1 | 2021-02-04 09:10:00+00

Everything fine till this moment and all data in datapoints table are materialized.
Now backfilling data to period below invalidation threshold (10_000 individual insert statements)

In bash shell:

for i in {3600..13600}; do
    psql -p 5433 -h localhost -U postgres tdb_test -c "INSERT INTO datapoints (timestamp, device_id, data) VALUES(now() - '${i}s'::interval, 2, random());"
done;

Now there are 10_000 record in hypertable invalidation log
For production cases this number can be huge (hundreds of thousands)

# select count(*)
  from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
count
-------
   10000

And when calling refresh_continuous_aggregate all records from continuous_aggs_hypertable_invalidation_log will be copied to continuous_aggs_materialization_invalidation_log, and materialization process will run again and again.

Expected behavior will be to invalidate and materialize all this little changes in one pass.

@erimatnor
Copy link
Contributor

@dimonzozo Thank you. Can you clarify the last bit: "And when calling refresh_continuous_aggregate all records from continuous_aggs_hypertable_invalidation_log will be copied to continuous_aggs_materialization_invalidation_log, and materialization process will run again and again."

How do you run refresh_continuous_aggregate at this point (can you give exact statement?). When you say the materialization process will run "again and again", do you mean that refresh_continuous_aggregate does not complete or are you referring to a background job/policy? If the latter, how is the policy configured?

@dimonzozo
Copy link

For tests i run the same command CALL refresh_continuous_aggregate('datapoints_10m', now() - '1mon'::interval, now());, but the same issue happens with background refresh.

With test data, this call takes lots more time than first call. On production, it never completes because in our data invalidation of each 10m interval takes up to 40 seconds.

And when calling refresh_continuous_aggregate all records from continuous_aggs_hypertable_invalidation_log will be copied to continuous_aggs_materialization_invalidation_log

My thought process was the following. I did look through the source code and found out that the first step of executing refresh_continuous_aggregate is copying data from continuous_aggs_hypertable_invalidation_log to continuous_aggs_materialization_invalidation_log for each materialized view exists. Then I checked processing logic and it runs materialization for each record in materialization_invalidation_log. Next, I assume that because the table contains lots of records for the same 10m interval - they all being processed separately which causes slow processing.

@erimatnor
Copy link
Contributor

erimatnor commented Feb 4, 2021

@dimonzozo Thanks for the additional information. I tested your reproduction case and, while the second refresh was indeed slower, it did complete without too much delay. Obviously, the refresh time is somewhat proportional to the amount of invalidations it needs to process and range to materialize and maybe there is something we can do to handle a huge amount of invalidations better.

A couple of observations, though. If you have many, many invalidation records due to single row (non batched) inserts, then the processing of those invalidations will also take a longer time (as evident by the example). We do merge invalidations, but only if ranges are adjacent or overlap, and only for the cagg being processed by the current command.

One workaround, until we can optimize for single row inserts, might be to manually materialize smaller ranges of backfill in a single refresh, and then do several of them instead.

Another option might be to provide a "hard refresh" option where we clear all the invalidations in the refreshed range without further processing, and then proceed with refreshing the whole range instead of smaller bits within the refresh window.

@dimonzozo
Copy link

We do merge invalidations, but only if ranges are adjacent or overlap, and only for the cagg being processed by the current command.

Yeah. I saw this logic in source code. Great work, BTW! Source code is very clear, has lots of comments and easy to read.

"hard refresh" is what we doing in our case and that what I suggested to @slasktrat in comments before. As workaround this is totally fine.

Also I thought I can do a script which will replace lots of small invalidation ranges with single record, but running it by TSDB action will fail due locks on the table. Other workaround ideas would be very helpful.

@aelg
Copy link

aelg commented Feb 4, 2021

Can confirm that the workaround suggested by @dimonzozo seems to fix things for me as well. That is clearing out the continuous_aggs_materialization_invalidation_log-table and replacing it with a single row per CAGG that covers the whole interval that needs to be refreshed.

@erimatnor FYI I created the database in 2.0.0 and then filled it with data, the instance was upgraded from 1.7.4 though, but the databases where dropped and recreated, then filled with 2 months of data. This still seems to have created a lot of rows in the invalidation log.

@aelg
Copy link

aelg commented Feb 4, 2021

Also I'm using BIGINT with nanoseconds since epoch as the time column if that should matter.

@slasktrat
Copy link

slasktrat commented Feb 4, 2021

I've been testing some more and when using the built-in add_continuous_aggregate_policy ,continuous_aggs_materialization_invalidation_log is filled with an extreme amount of entries and the result is that the initial job runs forever (I killed the job after 28 hours). As long as the schedule interval is not too big it so far seems sufficient for me to do some cleanup in the continuous_aggs_materialization_invalidation_log immediately after creating the aggregate policy. The same job that was running for 28 hours without completing did now have an initial run duration of 12 minutes, and subsequent runs are completed in seconds, when creating the aggregate policy using my custom function below.

CREATE OR REPLACE FUNCTION add_continuous_aggregate_policy_custom(
		continuous_aggregate regclass,
		start_offset interval,
		end_offset interval,
		schedule_interval interval,
		if_not_exists boolean DEFAULT false)
		RETURNS integer
	AS $$

	DECLARE
		job_id integer;
		mat_id integer;
		lowest_modified_v bigint;

	BEGIN
		mat_id := (SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = continuous_aggregate::name);
		lowest_modified_v := (SELECT MIN(lowest_modified_value) 
								FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log 
								WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = mat_id
								GROUP BY materialization_id);

		IF lowest_modified_v IS NULL THEN
			lowest_modified_v := _timescaledb_internal.to_unix_microseconds(now() - INTERVAL '1 hour');
		END IF;

		SELECT add_continuous_aggregate_policy(continuous_aggregate, start_offset => start_offset, end_offset => end_offset, schedule_interval => schedule_interval, if_not_exists => if_not_exists) INTO job_id;

		DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log where materialization_id = mat_id;
		INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT mat_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);

		RETURN job_id;
	END;
$$ LANGUAGE plpgsql;

@slasktrat
Copy link

The joy did only last a few hours, already the continuous_aggs_materialization_invalidation_log is polluted with more entries than the refresh job are able to manage.. Seems we have to go all manual like @dimonzozo after all. :/

@dimonzozo
Copy link

dimonzozo commented Feb 4, 2021

I combined some ideas and have another possible workaround (not properly tested!).

This procedure will properly handle invalidations intervals and can be used instead of add_continuous_aggregate_policy, but unfortunately, it won't work because of #2876:

CREATE OR REPLACE PROCEDURE refresh_continuous_aggregates(job_id int, config jsonb)
    LANGUAGE PLPGSQL
AS
$$
DECLARE
    hyp               name;
    cont_aggs         jsonb;
    start_offset      interval;
    end_offset        interval;
    mat_ids           integer[];
    hyp_id            integer;
    lowest_modified_v bigint;
    cont_agg          name;
BEGIN
    SELECT jsonb_object_field_text(config, 'hypertable') INTO STRICT hyp;
    SELECT jsonb_extract_path(config, 'cont_aggs')::jsonb INTO STRICT cont_aggs;
    SELECT jsonb_object_field_text(config, 'start_offset')::interval INTO STRICT start_offset;
    SELECT jsonb_object_field_text(config, 'end_offset')::interval INTO STRICT end_offset;

    mat_ids := (SELECT array_agg(mat_hypertable_id) FROM _timescaledb_catalog.continuous_agg WHERE cont_aggs ? user_view_name::text);
    RAISE NOTICE 'Materialized tables ids: "%"', mat_ids;

    hyp_id := (SELECT id FROM _timescaledb_catalog.hypertable WHERE table_name = hyp);
    RAISE NOTICE 'Hypertable id: "%"', hyp_id;

    lowest_modified_v := (SELECT MIN(lowest_modified_value)
                          FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
                          WHERE hypertable_id = hyp_id
                          GROUP BY hypertable_id);

    RAISE NOTICE 'Lowest modified value: "%"', lowest_modified_v;

    IF lowest_modified_v IS NOT NULL THEN
        DELETE
        FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
        WHERE hypertable_id = hyp_id;

        INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
            (
                SELECT id,
                       lowest_modified_v,
                       COALESCE(SELECT cagg_watermark
                                FROM _timescaledb_internal.cagg_watermark(id)),
                       _timescaledb_internal.to_unix_microseconds(now())
                FROM unnest(mat_ids) as id
            );
    END IF;

    FOR cont_agg IN
        SELECT jsonb_array_elements(cont_aggs)
        LOOP
            RAISE NOTICE 'Refreshing %', cont_agg::text;
            CALL refresh_continuous_aggregate(cont_agg::regclass, now() - start_offset,
                                              now() - end_offset);
        END LOOP;
END
$$;

Possible usage is:

SELECT add_job('refresh_continuous_aggregates', '1h',
               config => '{"hypertable":"datapoints","cont_aggs":["datapoints_10m", "datapoints_1h"],"start_offset":"12h","end_offset":"2h"}');

@slasktrat
Copy link

Funny! I'm testing almost exact the same thing

CREATE OR REPLACE PROCEDURE run_all_continuous_aggregates(job_id int, config jsonb) 

LANGUAGE PLPGSQL 
AS $$

DECLARE
	JOB RECORD;
	mat_id integer;
	lowest_modified_v bigint;

BEGIN
	FOR JOB IN SELECT * FROM timescaledb_information.jobs j inner join timescaledb_information.job_stats s on s.job_id = j.job_id WHERE application_name LIKE 'Refresh Continuous%'

	LOOP
		mat_id := (SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg a1 inner join timescaledb_information.continuous_aggregates a2 on a2.view_name = a1.user_view_name WHERE materialization_hypertable_name = JOB.hypertable_name);		

		lowest_modified_v := (SELECT MIN(lowest_modified_value) 
								FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log 
								WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = mat_id
								GROUP BY materialization_id);

		IF lowest_modified_v IS NULL THEN 
			 lowest_modified_v := _timescaledb_internal.to_unix_microseconds(now() - INTERVAL '1 hour');
		END IF;

		RAISE NOTICE 'Refreshing % (%) from % ', JOB.hypertable_name, JOB.job_id, _timescaledb_internal.to_timestamp(lowest_modified_v);		
		DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log where materialization_id = mat_id;
		INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT mat_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);	
		PERFORM alter_job(JOB.job_id, next_start => now());
	  END LOOP;
END
$$;

select add_job('run_all_continuous_aggregates', '1h');

@dimonzozo
Copy link

Wow! Great way to avoid calling refresh_continuous_aggregate.

@dimonzozo
Copy link

dimonzozo commented Feb 4, 2021

I see the problem here. This procedure join all records which already in continuous_aggs_materialization_invalidation_log, but new records will be moved from continuous_aggs_hypertable_invalidation_log to continuous_aggs_materialization_invalidation_log on start of materialization job and we'll have the same issue.

And another difficulty is that all records in hypertable_invalidation_log should be processed (squashed) and then copied to materialization_invalidation_log in multiple copies (for each cont. agg on target hypertable).

I tried to solve it my procedure, but stuck with transactions issue.

@slasktrat
Copy link

slasktrat commented Feb 4, 2021

Yeah, I also tried to chunk the job up in smaller pieces and running the refresh synchronous, but also got stuck with transaction issue.. I'll let this method run over night and see how it performs. So far I got much higher success rate than with the built-in logic, but not 100%. This workaround can be improved in many ways, but hopefully the timescaledb guys will fix the root issue. Should not be necessary to use these workarounds..

@slasktrat
Copy link

slasktrat commented Feb 5, 2021

It's far from perfect and some values are set to fit our case, but I modified my own custom job to the following and now we have an automated refresh with all jobs having at least some success rate - unlike with the built-in functionality. But I hope this issue will be prioritized as upgrading from 1.7 to 2.0 in practice broke our service and caused several days of "downtime" until this workaround was up and running. 😢

CREATE OR REPLACE PROCEDURE run_all_continuous_aggregates(my_job_id int, config jsonb) LANGUAGE PLPGSQL AS
$$

DECLARE
	JOB RECORD;
	CAGG RECORD;
	lowest_modified_v bigint;
	refresh_to timestamp;
	running_job_count integer;
	max_concurrent_jobs integer := 3;
	max_job_runtime interval := INTERVAL '20 minutes';
	
BEGIN
	SELECT count(*) FROM timescaledb_information.job_stats WHERE job_id <> my_job_id AND next_start = '-infinity'::timestamp AND now() - last_run_started_at < max_job_runtime INTO running_job_count;
	
	FOR JOB IN SELECT * FROM timescaledb_information.jobs j
				LEFT OUTER JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id
				WHERE application_name LIKE 'Refresh Continuous%' AND (s.next_start <> '-infinity'::timestamp OR now() - s.last_run_started_at > max_job_runtime)
				ORDER BY s.last_successful_finish ASC NULLS FIRST

	LOOP
		IF (running_job_count >= max_concurrent_jobs) THEN
			EXIT;
		END IF;
				
		SELECT * FROM _timescaledb_catalog.continuous_agg a1 
			INNER JOIN timescaledb_information.continuous_aggregates a2 on a2.view_name = a1.user_view_name 
			WHERE materialization_hypertable_name = JOB.hypertable_name
			INTO CAGG;

		lowest_modified_v := (SELECT MIN(lowest_modified_value) 
								FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log 
								WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id
								GROUP BY materialization_id);
			
		RAISE NOTICE 'lowest_modified_v %', _timescaledb_internal.to_timestamp(lowest_modified_v);
		IF lowest_modified_v IS NOT NULL THEN
			running_job_count := running_job_count + 1;
			lowest_modified_v := LEAST(lowest_modified_v, (SELECT last_refresh_to FROM custom_invalidation_log c WHERE c.job_id = JOB.job_id));
			
			IF JOB.config->'start_offset' IS NOT NULL THEN
				lowest_modified_v := GREATEST(lowest_modified_v, _timescaledb_internal.to_unix_microseconds(now() + INTERVAL '1 hour' - CAST(JOB.config->>'start_offset' AS INTERVAL)));
			END IF;
			
			IF (now() - _timescaledb_internal.to_timestamp(lowest_modified_v) > INTERVAL '2 days') THEN
				refresh_to := _timescaledb_internal.to_timestamp(lowest_modified_v) + INTERVAL '1 day'; --* (random() * 10);
			ELSE
				refresh_to := now();
			END IF;

			INSERT INTO custom_invalidation_log (job_id, last_refresh_to) VALUES (JOB.job_id, _timescaledb_internal.to_unix_microseconds(refresh_to))
			ON CONFLICT (job_id) DO UPDATE SET last_refresh_to = _timescaledb_internal.to_unix_microseconds(refresh_to);

			RAISE LOG 'Refreshing % (job % - mat_hypertable_id %) from % to % ', CAGG.view_name, JOB.job_id, CAGG.mat_hypertable_id, _timescaledb_internal.to_timestamp(lowest_modified_v), refresh_to;
			DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id;
			INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT CAGG.mat_hypertable_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds(refresh_to);	
			PERFORM alter_job(JOB.job_id, next_start => now(), max_runtime => max_job_runtime);	 
		END IF;
	END LOOP;
	PERFORM alter_job(my_job_id, next_start => now() + INTERVAL '10 seconds');
END
$$;

--SELECT add_job('run_all_continuous_aggregates', '1m', initial_start => now() + INTERVAL '1 minute');

@slasktrat
Copy link

Update: The latest workaround has been running for a few days now and it actually works very well. An additional benefit is that all caggs are now continuously updated with increased control of concurrent jobs without the log being spammed with out of background workers.

@erimatnor
Copy link
Contributor

erimatnor commented Feb 9, 2021

@slasktrat Great to hear that you were able to work around the issue.

A quick update on our end. I believe we have a solution to optimize our invalidation handling for lots of small invalidations. Essentially, what we are testing is a way to expand each invalidation to the closest bucket boundaries. This should be safe since we always materialize full buckets (except for some corner-cases when you drop chunks, but in that case we might at worst invalidate more data than necessary, which shouldn't be an issue either). Thus, if you insert a value every minute and you have a 10 minute continuous aggregate bucket, you will expand each minute invalidation to the full 10 minute bucket, which in turn will merge with the next bucket if that one was invalidated too, and so on.

Still, I think there are some corner cases where this might still not be optimal. For instance, let's say you have 1 minute buckets and you insert a value every 2 minutes. Then you will only invalidate every other bucket, which still leads to lots of invalidations if you e.g., refresh 1 week's worth of data. Internally, we will actually materialize each invalidated range separately, which is why materialization is slow for these corner cases where we cannot merge ranges into bigger ones.

Obviously, the situation we want to avoid on the other end of the spectrum is that you have to re-materialize too much when you've, e.g., only invalidated a couple of buckets across a refresh window o, e.g., a year. Then it is better to do a number of smaller materializations instead of re-materializing the whole year's worth of data.

There might be some additional heuristics we can implement to optimize further for these worst-case scenarios where you have backfill across, e.g., every other bucket. For instance, we could try to set a limit on how many materializations we do in a refresh window and try to expand invalidations across the N adjacent buckets or simply fall back to a brute-force refresh of the whole refresh window.

I think we might take an incremental approach here to see what works best and tweak this further if necessary across multiple releases. Sometimes, the approach that works well for one use case does not work well for other use case, so we want to be cautious about making too many assumptions.

@hardikm10
Copy link

hardikm10 commented Feb 12, 2021

Just for the benefit of anyone else who runs into this, here’s an example of how to implement the solution @dimonzozo and @slasktrat discussed above. Thank you both for your work on this.

Prerequisites :

  1. Continous Aggregates Job(s) must already be defined (generally done by add_continous_aggregate_policy )

  2. Following this process requires a login from the user who has write permission to _timescaledb_catalog schema (generally ‘postgres’ )

Steps :

Step 1: Create supporting tables for the User Defined Action to be created

CREATE TABLE custom_invalidation_log(
job_id integer not null unique,
last_refresh_to bigint
);

Step 2: Create Procedure for User Defined Action using TimescaleDB Automation Framework

CREATE OR REPLACE PROCEDURE run_all_continuous_aggregates(my_job_id int, config jsonb) LANGUAGE PLPGSQL AS $$ 
DECLARE
	JOB RECORD;
	CAGG RECORD;
	lowest_modified_v bigint;
	refresh_to timestamp;
	running_job_count integer;
	max_concurrent_jobs integer := 3;
	max_job_runtime interval := INTERVAL '20 minutes';
	
BEGIN
	SELECT count(*) FROM timescaledb_information.job_stats WHERE job_id <> my_job_id AND next_start = '-infinity'::timestamp AND now() - last_run_started_at < max_job_runtime INTO running_job_count;
	
	FOR JOB IN SELECT * FROM timescaledb_information.jobs j
				LEFT OUTER JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id
				WHERE application_name LIKE 'Refresh Continuous%' AND (s.next_start <> '-infinity'::timestamp OR now() - s.last_run_started_at > max_job_runtime)
				ORDER BY s.last_successful_finish ASC NULLS FIRST

	LOOP
		IF (running_job_count >= max_concurrent_jobs) THEN
			EXIT;
		END IF;
				
		SELECT * FROM _timescaledb_catalog.continuous_agg a1 
			INNER JOIN timescaledb_information.continuous_aggregates a2 on a2.view_name = a1.user_view_name 
			WHERE materialization_hypertable_name = JOB.hypertable_name
			INTO CAGG;

		lowest_modified_v := (SELECT MIN(lowest_modified_value) 
								FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log 
								WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id
								GROUP BY materialization_id);
			
		RAISE NOTICE 'lowest_modified_v %', _timescaledb_internal.to_timestamp(lowest_modified_v);
		IF lowest_modified_v IS NOT NULL THEN
			running_job_count := running_job_count + 1;
			lowest_modified_v := LEAST(lowest_modified_v, (SELECT last_refresh_to FROM custom_invalidation_log c WHERE c.job_id = JOB.job_id));
			
			IF JOB.config->'start_offset' IS NOT NULL THEN
				lowest_modified_v := GREATEST(lowest_modified_v, _timescaledb_internal.to_unix_microseconds(now() + INTERVAL '1 hour' - CAST(JOB.config->>'start_offset' AS INTERVAL)));
			END IF;
			
			IF (now() - _timescaledb_internal.to_timestamp(lowest_modified_v) > INTERVAL '2 days') THEN
				refresh_to := _timescaledb_internal.to_timestamp(lowest_modified_v) + INTERVAL '1 day'; --* (random() * 10);
			ELSE
				refresh_to := now();
			END IF;

			INSERT INTO custom_invalidation_log (job_id, last_refresh_to) VALUES (JOB.job_id, _timescaledb_internal.to_unix_microseconds(refresh_to))
			ON CONFLICT (job_id) DO UPDATE SET last_refresh_to = _timescaledb_internal.to_unix_microseconds(refresh_to);

			RAISE LOG 'Refreshing % (job % - mat_hypertable_id %) from % to % ', CAGG.view_name, JOB.job_id, CAGG.mat_hypertable_id, _timescaledb_internal.to_timestamp(lowest_modified_v), refresh_to;
			DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id;
			INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT CAGG.mat_hypertable_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds(refresh_to);	
			PERFORM alter_job(JOB.job_id, next_start => now(), max_runtime => max_job_runtime);	 
		END IF;
	END LOOP;
	PERFORM alter_job(my_job_id, next_start => now() + INTERVAL '10 seconds');
END $$;

Note that there are two default variables :

max_concurrent_jobs (integer)

max_job_runtime (interval)

You might want to change these variables as per your need.

Also, note that this script/action will run for all continuous aggregates which might or might not be required. You might want to run this script for specific jobs at specific schedules, for which you will need to alter the script.

Step 3: Register the procedure run_all_continuous_aggregates to be run every hour (or whenever as per the need).

SELECT add_job('run_all_continuous_aggregates','1h');

erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 17, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 17, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 18, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 18, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 18, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 18, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 18, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit that referenced this issue Feb 19, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes #2867
erimatnor added a commit that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes #2867
erimatnor added a commit that referenced this issue Feb 19, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes #2867
erimatnor added a commit that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes #2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes timescale#2867
erimatnor added a commit to erimatnor/timescaledb that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes timescale#2867
erimatnor added a commit that referenced this issue Feb 19, 2021
The refreshing of a continuous aggregate is slow when many small
invalidations are generated by frequent single row insert
backfills. This change adds an optimization that merges small
invalidations by first expanding invalidations to full bucket
boundaries. There is really no reason to maintain invalidations that
aren't covering full buckets since refresh windows are already aligned
to buckets anyway.

Fixes #2867
erimatnor added a commit that referenced this issue Feb 19, 2021
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes #2867
@mfreed
Copy link
Member

mfreed commented Feb 19, 2021

Heads up: The fix for this will appear in TimescaleDB 2.0.2, which has just been tagged and will be released shortly.

@mfreed
Copy link
Member

mfreed commented Feb 23, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants