Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix on-insert decompression after schema changes #5578

Merged
merged 1 commit into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ accidentally triggering the load of a previous DB version.**
* #5573 Fix unique constraint on compressed tables
* #5615 Add permission checks to run_job()
* #5614 Enable run_job() for telemetry job
* #5578 Fix on-insert decompression after schema changes

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
Expand Down
16 changes: 7 additions & 9 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1805,8 +1805,9 @@ compression_get_toast_storage(CompressionAlgorithms algorithm)
* columns of the uncompressed chunk.
*/
static ScanKeyData *
build_scankeys(int32 hypertable_id, RowDecompressor decompressor, Bitmapset *key_columns,
Bitmapset **null_columns, TupleTableSlot *slot, int *num_scankeys)
build_scankeys(int32 hypertable_id, Oid hypertable_relid, RowDecompressor decompressor,
Bitmapset *key_columns, Bitmapset **null_columns, TupleTableSlot *slot,
int *num_scankeys)
{
int key_index = 0;
ScanKeyData *scankeys = NULL;
Expand All @@ -1821,7 +1822,9 @@ build_scankeys(int32 hypertable_id, RowDecompressor decompressor, Bitmapset *key
char *attname = get_attname(decompressor.out_rel->rd_id, attno, false);
FormData_hypertable_compression *fd =
ts_hypertable_compression_get_by_pkey(hypertable_id, attname);

bool isnull;
AttrNumber ht_attno = get_attnum(hypertable_relid, attname);
Datum value = slot_getattr(slot, ht_attno, &isnull);
/*
* There are 3 possible scenarios we have to consider
* when dealing with columns which are part of unique
Expand All @@ -1838,11 +1841,8 @@ build_scankeys(int32 hypertable_id, RowDecompressor decompressor, Bitmapset *key
* batch filtering as the values are compressed and
* we have no metadata.
*/

if (COMPRESSIONCOL_IS_SEGMENT_BY(fd))
{
bool isnull;
Datum value = slot_getattr(slot, attno, &isnull);
key_index = create_segment_filter_scankey(&decompressor,
attname,
BTEqualStrategyNumber,
Expand All @@ -1854,9 +1854,6 @@ build_scankeys(int32 hypertable_id, RowDecompressor decompressor, Bitmapset *key
}
if (COMPRESSIONCOL_IS_ORDER_BY(fd))
{
bool isnull;
Datum value = slot_getattr(slot, attno, &isnull);

/* Cannot optimize orderby columns with NULL values since those
* are not visible in metadata
*/
Expand Down Expand Up @@ -1967,6 +1964,7 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo

int num_scankeys;
ScanKeyData *scankeys = build_scankeys(chunk->fd.hypertable_id,
chunk->hypertable_relid,
antekresic marked this conversation as resolved.
Show resolved Hide resolved
decompressor,
key_columns,
&null_columns,
Expand Down
86 changes: 80 additions & 6 deletions tsl/test/expected/compression_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ INSERT INTO ts_table SELECT * FROM data_table;
--cleanup tables
DROP TABLE data_table cascade;
DROP TABLE ts_table cascade;
--invalid reads for row expressions after column dropped on compressed tables #5458
-- #5458 invalid reads for row expressions after column dropped on compressed tables
CREATE TABLE readings(
"time" TIMESTAMPTZ NOT NULL,
battery_status TEXT,
Expand All @@ -645,6 +645,7 @@ NOTICE: migrating data to chunks
(35,public,readings,t)
(1 row)

create unique index readings_uniq_idx on readings("time",battery_temperature);
ALTER TABLE readings SET (timescaledb.compress,timescaledb.compress_segmentby = 'battery_temperature');
SELECT compress_chunk(show_chunks('readings'));
compress_chunk
Expand All @@ -661,7 +662,50 @@ SELECT readings FROM readings;
("Fri Nov 11 11:11:11 2022 PST",0.2)
(2 rows)

-- Unique constraints are not always respected on compressed tables #5553
-- #5577 On-insert decompression after schema changes may not work properly
SELECT decompress_chunk(show_chunks('readings'),true);
NOTICE: chunk "_hyper_35_24_chunk" is not compressed
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_35_22_chunk

(2 rows)

SELECT compress_chunk(show_chunks('readings'),true);
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_35_22_chunk
_timescaledb_internal._hyper_35_24_chunk
(2 rows)

\set ON_ERROR_STOP 0
INSERT INTO readings ("time", battery_temperature) VALUES
('2022-11-11 11:11:11',0.2) -- same record as inserted
;
ERROR: duplicate key value violates unique constraint "_hyper_35_24_chunk_readings_uniq_idx"
\set ON_ERROR_STOP 1
SELECT * from readings;
time | battery_temperature
------------------------------+---------------------
Fri Nov 11 03:11:11 2022 PST |
Fri Nov 11 11:11:11 2022 PST | 0.2
(2 rows)

SELECT assert_equal(count(1), 2::bigint) FROM readings;
assert_equal
--------------

(1 row)

-- no unique check failure during decompression
SELECT decompress_chunk(show_chunks('readings'),true);
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_35_22_chunk
_timescaledb_internal._hyper_35_24_chunk
(2 rows)

-- #5553 Unique constraints are not always respected on compressed tables
CREATE TABLE main_table AS
SELECT '2011-11-11 11:11:11'::timestamptz AS time, 'foo' AS device_id;
CREATE UNIQUE INDEX xm ON main_table(time, device_id);
Expand All @@ -680,19 +724,19 @@ ALTER TABLE main_table SET (
SELECT compress_chunk(show_chunks('main_table'));
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_37_25_chunk
_timescaledb_internal._hyper_37_27_chunk
(1 row)

-- insert rejected
\set ON_ERROR_STOP 0
INSERT INTO main_table VALUES
('2011-11-11 11:11:11', 'foo');
ERROR: duplicate key value violates unique constraint "_hyper_37_25_chunk_xm"
ERROR: duplicate key value violates unique constraint "_hyper_37_27_chunk_xm"
-- insert rejected in case 1st row doesn't violate constraint with different segmentby
INSERT INTO main_table VALUES
('2011-11-11 11:12:11', 'bar'),
('2011-11-11 11:11:11', 'foo');
ERROR: duplicate key value violates unique constraint "_hyper_37_25_chunk_xm"
ERROR: duplicate key value violates unique constraint "_hyper_37_27_chunk_xm"
\set ON_ERROR_STOP 1
SELECT assert_equal(count(1), 1::bigint) FROM main_table;
assert_equal
Expand All @@ -704,6 +748,36 @@ SELECT assert_equal(count(1), 1::bigint) FROM main_table;
SELECT decompress_chunk(show_chunks('main_table'), TRUE);
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_37_25_chunk
_timescaledb_internal._hyper_37_27_chunk
(1 row)

DROP TABLE IF EXISTS readings;
CREATE TABLE readings(
"time" timestamptz NOT NULL,
battery_status text,
candy integer,
battery_status2 text,
battery_temperature text
);
SELECT create_hypertable('readings', 'time', chunk_time_interval => interval '12 hour');
create_hypertable
------------------------
(39,public,readings,t)
(1 row)

CREATE UNIQUE INDEX readings_uniq_idx ON readings("time", battery_temperature);
ALTER TABLE readings SET (timescaledb.compress, timescaledb.compress_segmentby = 'battery_temperature');
ALTER TABLE readings DROP COLUMN battery_status;
ALTER TABLE readings DROP COLUMN battery_status2;
INSERT INTO readings("time", candy, battery_temperature)
VALUES ('2022-11-11 11:11:11', 88, '0.2');
SELECT compress_chunk(show_chunks('readings'), TRUE);
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_39_29_chunk
(1 row)

-- no error happens
INSERT INTO readings("time", candy, battery_temperature)
VALUES ('2022-11-11 11:11:11', 33, 0.3)
;
53 changes: 51 additions & 2 deletions tsl/test/sql/compression_errors.sql
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ INSERT INTO ts_table SELECT * FROM data_table;
DROP TABLE data_table cascade;
DROP TABLE ts_table cascade;

--invalid reads for row expressions after column dropped on compressed tables #5458
-- #5458 invalid reads for row expressions after column dropped on compressed tables
CREATE TABLE readings(
"time" TIMESTAMPTZ NOT NULL,
battery_status TEXT,
Expand All @@ -382,14 +382,33 @@ INSERT INTO readings ("time") VALUES ('2022-11-11 11:11:11-00');

SELECT create_hypertable('readings', 'time', chunk_time_interval => interval '12 hour', migrate_data=>true);

create unique index readings_uniq_idx on readings("time",battery_temperature);
ALTER TABLE readings SET (timescaledb.compress,timescaledb.compress_segmentby = 'battery_temperature');
SELECT compress_chunk(show_chunks('readings'));

ALTER TABLE readings DROP COLUMN battery_status;
INSERT INTO readings ("time", battery_temperature) VALUES ('2022-11-11 11:11:11', 0.2);
SELECT readings FROM readings;

-- Unique constraints are not always respected on compressed tables #5553
-- #5577 On-insert decompression after schema changes may not work properly

SELECT decompress_chunk(show_chunks('readings'),true);
SELECT compress_chunk(show_chunks('readings'),true);

\set ON_ERROR_STOP 0
INSERT INTO readings ("time", battery_temperature) VALUES
('2022-11-11 11:11:11',0.2) -- same record as inserted
;

\set ON_ERROR_STOP 1

SELECT * from readings;
SELECT assert_equal(count(1), 2::bigint) FROM readings;

-- no unique check failure during decompression
SELECT decompress_chunk(show_chunks('readings'),true);

-- #5553 Unique constraints are not always respected on compressed tables
CREATE TABLE main_table AS
SELECT '2011-11-11 11:11:11'::timestamptz AS time, 'foo' AS device_id;

Expand Down Expand Up @@ -419,3 +438,33 @@ SELECT assert_equal(count(1), 1::bigint) FROM main_table;

-- no unique check failure during decompression
SELECT decompress_chunk(show_chunks('main_table'), TRUE);


DROP TABLE IF EXISTS readings;

CREATE TABLE readings(
"time" timestamptz NOT NULL,
battery_status text,
candy integer,
battery_status2 text,
battery_temperature text
);

SELECT create_hypertable('readings', 'time', chunk_time_interval => interval '12 hour');

CREATE UNIQUE INDEX readings_uniq_idx ON readings("time", battery_temperature);

ALTER TABLE readings SET (timescaledb.compress, timescaledb.compress_segmentby = 'battery_temperature');

ALTER TABLE readings DROP COLUMN battery_status;
ALTER TABLE readings DROP COLUMN battery_status2;

INSERT INTO readings("time", candy, battery_temperature)
VALUES ('2022-11-11 11:11:11', 88, '0.2');

SELECT compress_chunk(show_chunks('readings'), TRUE);

-- no error happens
INSERT INTO readings("time", candy, battery_temperature)
VALUES ('2022-11-11 11:11:11', 33, 0.3)
;