Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce decompression during constraint checking #5584

Merged
merged 1 commit into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ accidentally triggering the load of a previous DB version.**
* #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* #5547 Skip Ordered Append when only 1 child node is present
* #5510 Propagate vacuum/analyze to compressed chunks
* #5584 Reduce decompression during constraint checking

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
Expand Down
81 changes: 81 additions & 0 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
bool changed_groups);

static int create_segment_metadata_scankey(RowDecompressor *decompressor,
char *segment_meta_col_name, AttrNumber in_attno,
StrategyNumber strategy, ScanKeyData *scankeys,
int num_scankeys, Datum value);
static void run_analyze_on_chunk(Oid chunk_relid);

/********************
Expand Down Expand Up @@ -1898,13 +1902,90 @@
key_index++;
}
}
if (COMPRESSIONCOL_IS_ORDER_BY(fd))
{
bool isnull;
Datum value = slot_getattr(slot, attno, &isnull);

/* Cannot optimize orderby columns with NULL values since those
* are not visible in metadata
*/
if (isnull)
continue;

key_index = create_segment_metadata_scankey(&decompressor,
compression_column_segment_min_name(fd),
attno,
BTLessEqualStrategyNumber,
scankeys,
key_index,
value);
key_index = create_segment_metadata_scankey(&decompressor,
compression_column_segment_max_name(fd),
attno,
BTGreaterEqualStrategyNumber,
scankeys,
key_index,
value);
}
}
}

*num_scankeys = key_index;
return scankeys;
}

static int
create_segment_metadata_scankey(RowDecompressor *decompressor, char *segment_meta_col_name,
AttrNumber in_attno, StrategyNumber strategy, ScanKeyData *scankeys,
int num_scankeys, Datum value)
{
AttrNumber segment_meta_attr_number =
get_attnum(decompressor->in_rel->rd_id, segment_meta_col_name);
Assert(segment_meta_attr_number != InvalidAttrNumber);

/* This should never happen but if it does happen, we can't generate a scan key for
* the orderby column so just skip it */
if (segment_meta_attr_number == InvalidAttrNumber)
return num_scankeys;

Check warning on line 1950 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L1950

Added line #L1950 was not covered by tests

Oid atttypid = decompressor->out_desc->attrs[AttrNumberGetAttrOffset(in_attno)].atttypid;

/* Orderby column type should match in compressed metadata columns and uncompressed
* chunk attribute */
Assert(
atttypid ==
decompressor->in_desc->attrs[AttrNumberGetAttrOffset(segment_meta_attr_number)].atttypid);

TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY);
if (!OidIsValid(tce->btree_opf))
elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid));

Check warning on line 1962 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L1962

Added line #L1962 was not covered by tests

Oid opr = get_opfamily_member(tce->btree_opf, atttypid, atttypid, strategy);
Assert(OidIsValid(opr));
/* We should never end up here but: no operator, no optimization */
if (!OidIsValid(opr))
return num_scankeys;

Check warning on line 1968 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L1968

Added line #L1968 was not covered by tests

opr = get_opcode(opr);
Assert(OidIsValid(opr));
/* We should never end up here but: no opcode, no optimization */
if (!OidIsValid(opr))
return num_scankeys;

Check warning on line 1974 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L1974

Added line #L1974 was not covered by tests

ScanKeyEntryInitialize(&scankeys[num_scankeys++],
0, /* flags */
segment_meta_attr_number,
strategy,
InvalidOid, /* No strategy subtype. */
decompressor->out_desc->attrs[AttrNumberGetAttrOffset(in_attno)]
.attcollation,
opr,
value);

return num_scankeys;
}

void
decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlot *slot)
{
Expand Down
163 changes: 134 additions & 29 deletions tsl/test/expected/compression_conflicts.out
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ BEGIN;
('2020-01-01 0:00:01','d1',0.1),
('2020-01-01 0:00:02','d2',0.2),
('2020-01-01 0:00:03','d3',0.3);
-- data should have move into uncompressed chunk for conflict check
-- no data should have moved into uncompressed chunk for conflict check
-- since we used metadata optimization to guarantee uniqueness
SELECT count(*) FROM ONLY :CHUNK;
count
-------
4
3
(1 row)

ROLLBACK;
Expand Down Expand Up @@ -139,11 +140,12 @@ BEGIN;
('2020-01-01 0:00:01','d1',0.1),
('2020-01-01 0:00:01','d2',0.2),
('2020-01-01 0:00:01','d3',0.3);
-- data should have move into uncompressed chunk for conflict check
-- no data should have move into uncompressed chunk for conflict check
-- since we used metadata optimization to guarantee uniqueness
SELECT count(*) FROM ONLY :CHUNK;
count
-------
5
3
(1 row)

ROLLBACK;
Expand Down Expand Up @@ -213,6 +215,7 @@ SELECT count(*) FROM ONLY :CHUNK;
BEGIN;
INSERT INTO comp_conflicts_3 VALUES ('2020-01-01',NULL,0.3);
-- data for 1 segment (count = 1 value + 1 inserted) should be present in uncompressed chunk
-- we treat NULLs as NOT DISTINCT and let the constraint configuration handle the check
SELECT count(*) FROM ONLY :CHUNK;
count
-------
Expand All @@ -223,11 +226,12 @@ ROLLBACK;
-- should succeed since there are no conflicts in the values
BEGIN;
INSERT INTO comp_conflicts_3 VALUES ('2020-01-01 0:00:01','d1',0.1);
-- data for 1 segment (count = 1 value + 1 inserted) should have move into uncompressed chunk for conflict check
-- no data should have move into uncompressed chunk for conflict check
-- since we used metadata optimization to guarantee uniqueness
SELECT count(*) FROM ONLY :CHUNK;
count
-------
2
1
(1 row)

ROLLBACK;
Expand All @@ -236,11 +240,12 @@ BEGIN;
('2020-01-01 0:00:01','d1',0.1),
('2020-01-01 0:00:01','d2',0.2),
('2020-01-01 0:00:01','d3',0.3);
-- data for 2 segment (count = 2 value + 2 inserted) should have move into uncompressed chunk for conflict check
-- no data for should have move into uncompressed chunk for conflict check
-- since we used metadata optimization to guarantee uniqueness
SELECT count(*) FROM ONLY :CHUNK;
count
-------
4
3
(1 row)

ROLLBACK;
Expand Down Expand Up @@ -274,6 +279,106 @@ SELECT count(*) FROM ONLY :CHUNK;
1
(1 row)

-- test 4: multi-column primary key with multi-column orderby compression
CREATE TABLE comp_conflicts_4(time timestamptz NOT NULL, device text, value float, UNIQUE(time, device));
SELECT table_name FROM create_hypertable('comp_conflicts_4','time');
table_name
------------------
comp_conflicts_4
(1 row)

ALTER TABLE comp_conflicts_4 SET (timescaledb.compress,timescaledb.compress_orderby='time,device');
-- implicitly create chunk
INSERT INTO comp_conflicts_4 SELECT generate_series('2020-01-01'::timestamp, '2020-01-01 2:00:00', '1s'), 'd1',0.1;
INSERT INTO comp_conflicts_4 VALUES ('2020-01-01','d2',0.2);
INSERT INTO comp_conflicts_4 VALUES ('2020-01-01',NULL,0.3);
SELECT compress_chunk(c) AS "CHUNK" FROM show_chunks('comp_conflicts_4') c
\gset
-- after compression no data should be in uncompressed chunk
SELECT count(*) FROM ONLY :CHUNK;
count
-------
0
(1 row)

-- NULL is considered distinct from other NULL so even though the next INSERT looks
-- like a conflict it is not a constraint violation (PG15 makes NULL behaviour configurable)
BEGIN;
INSERT INTO comp_conflicts_4 VALUES ('2020-01-01',NULL,0.3);
-- data for 1 segment (count = 1000 values + 1 inserted) should be present in uncompressed chunk
-- we treat NULLs as NOT DISTINCT and let the constraint configuration handle the check
SELECT count(*) FROM ONLY :CHUNK;
count
-------
1001
(1 row)

ROLLBACK;
-- should succeed since there are no conflicts in the values
BEGIN;
INSERT INTO comp_conflicts_4 VALUES ('2020-01-01 2:00:01','d1',0.1);
-- no data should have move into uncompressed chunk for conflict check
-- since we used metadata optimization to guarantee uniqueness
SELECT count(*) FROM ONLY :CHUNK;
count
-------
1
(1 row)

ROLLBACK;
BEGIN;
INSERT INTO comp_conflicts_4 VALUES
('2020-01-01 2:00:01','d1',0.1),
('2020-01-01 2:00:01','d2',0.2),
('2020-01-01 2:00:01','d3',0.3);
-- no data for should have move into uncompressed chunk for conflict check
-- since we used metadata optimization to guarantee uniqueness
SELECT count(*) FROM ONLY :CHUNK;
count
-------
3
(1 row)

ROLLBACK;
BEGIN;
INSERT INTO comp_conflicts_4 VALUES ('2020-01-01 0:00:01','d3',0.2);
-- count = 1 since no data should have move into uncompressed chunk for conflict check since d3 is new segment
SELECT count(*) FROM ONLY :CHUNK;
count
-------
1
(1 row)

ROLLBACK;
-- no data should be in uncompressed chunk since we did rollback
SELECT count(*) FROM ONLY :CHUNK;
count
-------
0
(1 row)

-- should fail since it conflicts with existing row
\set ON_ERROR_STOP 0
INSERT INTO comp_conflicts_4 VALUES ('2020-01-01','d1',0.1);
ERROR: duplicate key value violates unique constraint "7_4_comp_conflicts_4_time_device_key"
\set ON_ERROR_STOP 1
-- data not should have move into uncompressed chunk for conflict check
SELECT count(*) FROM ONLY :CHUNK;
count
-------
0
(1 row)

INSERT INTO comp_conflicts_4 VALUES ('2020-01-01 0:00:01','d1',0.1) ON CONFLICT DO NOTHING;
INSERT INTO comp_conflicts_4 VALUES ('2020-01-01 0:30:00','d1',0.1) ON CONFLICT DO NOTHING;
-- data should have move into uncompressed chunk for conflict check
-- 2 segments (count = 2000)
SELECT count(*) FROM ONLY :CHUNK;
count
-------
2000
(1 row)

CREATE OR REPLACE VIEW compressed_chunk_info_view AS
SELECT
h.schema_name AS hypertable_schema,
Expand Down Expand Up @@ -301,7 +406,7 @@ SELECT * FROM create_hypertable('compressed_ht', 'time',
WARNING: column type "character varying" used for "name" does not follow best practices
hypertable_id | schema_name | table_name | created
---------------+-------------+---------------+---------
7 | public | compressed_ht | t
9 | public | compressed_ht | t
(1 row)

-- create chunk 1
Expand All @@ -320,23 +425,23 @@ ALTER TABLE compressed_ht SET (
timescaledb.compress_segmentby = 'sensor_id'
);
SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht'));
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_7_7_chunk
_timescaledb_internal._hyper_7_8_chunk
_timescaledb_internal._hyper_7_9_chunk
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_9_9_chunk
_timescaledb_internal._hyper_9_10_chunk
_timescaledb_internal._hyper_9_11_chunk
(3 rows)

-- check compression status
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
1 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
1 | _hyper_7_9_chunk
chunk_status | CHUNK_NAME
--------------+-------------------
1 | _hyper_9_10_chunk
1 | _hyper_9_11_chunk
1 | _hyper_9_9_chunk
(3 rows)

-- should report 0 row
Expand All @@ -361,11 +466,11 @@ SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
1 | _hyper_7_9_chunk
chunk_status | CHUNK_NAME
--------------+-------------------
1 | _hyper_9_10_chunk
1 | _hyper_9_11_chunk
9 | _hyper_9_9_chunk
(3 rows)

INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row')
Expand All @@ -381,10 +486,10 @@ SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
9 | _hyper_7_9_chunk
chunk_status | CHUNK_NAME
--------------+-------------------
1 | _hyper_9_10_chunk
9 | _hyper_9_11_chunk
9 | _hyper_9_9_chunk
(3 rows)