Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ON CONFLICT DO UPDATE for compressed hypertables. #5454

Merged
merged 1 commit into from Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.**
* #5361 Add parallel support for partialize_agg()
* #5252 Improve unique constraint support on compressed hypertables
* #5312 Add timeout support to ping_data_node()
* #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
Expand Down
8 changes: 8 additions & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Expand Up @@ -4,6 +4,7 @@
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <access/xact.h>
#include <nodes/nodes.h>
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
Expand Down Expand Up @@ -145,7 +146,14 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* postgres can do proper constraint checking.
*/
if (ts_cm_functions->decompress_batches_for_insert)
{
ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot);
OnConflictAction onconflict_action =
chunk_dispatch_get_on_conflict_action(dispatch);
/* mark rows visible */
if (onconflict_action == ONCONFLICT_UPDATE)
dispatch->estate->es_output_cid = GetCurrentCommandId(true);
}
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
Expand Down
9 changes: 1 addition & 8 deletions src/nodes/chunk_dispatch/chunk_insert_state.c
Expand Up @@ -83,7 +83,7 @@ chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch)
#endif
}

static OnConflictAction
OnConflictAction
chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
{
if (!dispatch->dispatch_state)
Expand Down Expand Up @@ -583,7 +583,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
ALLOCSET_DEFAULT_SIZES);
OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch);
ResultRelInfo *relinfo;
bool has_compressed_chunk = (chunk->fd.compressed_chunk_id != 0);

/* permissions NOT checked here; were checked at hypertable level */
if (check_enable_rls(chunk->table_id, InvalidOid, false) == RLS_ENABLED)
Expand All @@ -597,12 +596,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
CHUNK_INSERT,
true);

if (has_compressed_chunk && onconflict_action == ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks")));

rel = table_open(chunk->table_id, RowExclusiveLock);

MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context);
Expand Down
1 change: 1 addition & 0 deletions src/nodes/chunk_dispatch/chunk_insert_state.h
Expand Up @@ -67,4 +67,5 @@ typedef struct ChunkDispatch ChunkDispatch;
extern ChunkInsertState *ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch);
extern void ts_chunk_insert_state_destroy(ChunkInsertState *state);

OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch);
#endif /* TIMESCALEDB_CHUNK_INSERT_STATE_H */
23 changes: 20 additions & 3 deletions tsl/test/expected/compression.out
Expand Up @@ -208,9 +208,26 @@ ERROR: cannot update/delete rows from chunk "_hyper_1_1_chunk" as it is compres
insert into foo values(10, 12, 12, 12)
on conflict( a, b)
do update set b = excluded.b;
ERROR: INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks
SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
10 | 12 | 12 | 12
20 | 11 | 20 |
30 | 222 | 40 |
(3 rows)

--TEST2c Do DML directly on the chunk.
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12);
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12)
on conflict( a, b)
do update set b = excluded.b + 12;
SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
10 | 24 | 12 | 12
20 | 11 | 20 |
30 | 222 | 40 |
(3 rows)

update _timescaledb_internal._hyper_1_2_chunk
set b = 12;
ERROR: cannot update/delete rows from chunk "_hyper_1_2_chunk" as it is compressed
Expand All @@ -229,7 +246,7 @@ ERROR: duplicate key value violates unique constraint "_hyper_1_2_chunk_foo_uni
select * from _timescaledb_internal._hyper_1_2_chunk order by a;
a | b | c | d
----+----+----+-----
10 | 12 | 12 | 12
10 | 24 | 12 | 12
10 | 10 | 20 |
11 | 10 | 20 | 120
(3 rows)
Expand Down
114 changes: 114 additions & 0 deletions tsl/test/expected/compression_conflicts.out
Expand Up @@ -274,3 +274,117 @@ SELECT count(*) FROM ONLY :CHUNK;
1
(1 row)

CREATE OR REPLACE VIEW compressed_chunk_info_view AS
SELECT
h.schema_name AS hypertable_schema,
h.table_name AS hypertable_name,
c.schema_name as chunk_schema,
c.table_name as chunk_name,
c.status as chunk_status,
comp.schema_name as compressed_chunk_schema,
comp.table_name as compressed_chunk_name
FROM
_timescaledb_catalog.hypertable h JOIN
_timescaledb_catalog.chunk c ON h.id = c.hypertable_id
LEFT JOIN _timescaledb_catalog.chunk comp
ON comp.id = c.compressed_chunk_id;
CREATE TABLE compressed_ht (
time TIMESTAMP WITH TIME ZONE NOT NULL,
sensor_id INTEGER NOT NULL,
cpu double precision null,
temperature double precision null,
name varchar(100) default 'this is a default string value'
);
CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id);
SELECT * FROM create_hypertable('compressed_ht', 'time',
chunk_time_interval => INTERVAL '2 months');
WARNING: column type "character varying" used for "name" does not follow best practices
hypertable_id | schema_name | table_name | created
---------------+-------------+---------------+---------
7 | public | compressed_ht | t
(1 row)

-- create chunk 1
INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');
INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');
-- create chunk 2
INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2');
INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2');
-- create chunk 3
INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3');
ALTER TABLE compressed_ht SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id'
);
SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht'));
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_7_7_chunk
_timescaledb_internal._hyper_7_8_chunk
_timescaledb_internal._hyper_7_9_chunk
(3 rows)

-- check compression status
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
1 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
1 | _hyper_7_9_chunk
(3 rows)

-- should report 0 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';
count
-------
0
(1 row)

INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE';
-- should report 1 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';
count
-------
1
(1 row)

-- check that chunk 1 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
1 | _hyper_7_9_chunk
(3 rows)

INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *;
time | sensor_id | cpu | temperature | name
-------------------------------------+-----------+-------+-------------+-----------------------
Sun Jan 23 11:40:28.192199 2022 PST | 6 | 0.876 | 4.123 | ON CONFLICT DO UPDATE
(1 row)

-- check that chunks 1 and 3 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
9 | _hyper_7_9_chunk
(3 rows)

7 changes: 6 additions & 1 deletion tsl/test/sql/compression.sql
Expand Up @@ -85,9 +85,14 @@ set b = (select f1.newval from foo_join f1 left join lateral (select newval as n
insert into foo values(10, 12, 12, 12)
on conflict( a, b)
do update set b = excluded.b;
SELECT * from foo ORDER BY a,b;

--TEST2c Do DML directly on the chunk.
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12);
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12)
on conflict( a, b)
do update set b = excluded.b + 12;
SELECT * from foo ORDER BY a,b;

update _timescaledb_internal._hyper_1_2_chunk
set b = 12;
delete from _timescaledb_internal._hyper_1_2_chunk;
Expand Down
79 changes: 79 additions & 0 deletions tsl/test/sql/compression_conflicts.sql
Expand Up @@ -210,3 +210,82 @@ INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHI
-- data should have move into uncompressed chunk for conflict check
SELECT count(*) FROM ONLY :CHUNK;

CREATE OR REPLACE VIEW compressed_chunk_info_view AS
SELECT
h.schema_name AS hypertable_schema,
h.table_name AS hypertable_name,
c.schema_name as chunk_schema,
c.table_name as chunk_name,
c.status as chunk_status,
comp.schema_name as compressed_chunk_schema,
comp.table_name as compressed_chunk_name
FROM
_timescaledb_catalog.hypertable h JOIN
_timescaledb_catalog.chunk c ON h.id = c.hypertable_id
LEFT JOIN _timescaledb_catalog.chunk comp
ON comp.id = c.compressed_chunk_id;

CREATE TABLE compressed_ht (
time TIMESTAMP WITH TIME ZONE NOT NULL,
sensor_id INTEGER NOT NULL,
cpu double precision null,
temperature double precision null,
name varchar(100) default 'this is a default string value'
);
CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id);

SELECT * FROM create_hypertable('compressed_ht', 'time',
chunk_time_interval => INTERVAL '2 months');

-- create chunk 1
INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');
INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');

-- create chunk 2
INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2');
INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2');

-- create chunk 3
INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3');

ALTER TABLE compressed_ht SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id'
);

SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht'));

-- check compression status
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;

-- should report 0 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';

INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE';

-- should report 1 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';

-- check that chunk 1 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;

INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *;

-- check that chunks 1 and 3 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;