Skip to content

Commit

Permalink
Do not decompress batches for COPY with potentially invalid slot (#6931)
Browse files Browse the repository at this point in the history
We don't have to decompress anything more when we re-lookup the chunk
insert state on COPY buffer flush. Moreover, `ChunkInsertState.slots[0]`
is incorrect slot type for `decompress_batches_for_insert()`, because it
is a chunk slot, not a hypertable slot. This can lead to rare errors
when the chunk insert states go out of cache.

Just don't do this unnecessary lookup, and move decompression to a
separate function for clarity. Add an assertion and test that detect the
slot type mismatch on main.
  • Loading branch information
akuzm committed May 23, 2024
1 parent b4a1710 commit 044322b
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 76 deletions.
22 changes: 2 additions & 20 deletions src/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,28 +315,9 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
ChunkInsertState *cis =
ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch,
buffer->point,
buffer->slots[0],
NULL /* on chunk changed function */,
NULL /* payload for on chunk changed function */);

if (ts_guc_max_tuples_decompressed_per_dml > 0)
{
if (miinfo->ccstate->dispatch->dispatch_state->tuples_decompressed >
ts_guc_max_tuples_decompressed_per_dml)
{
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("tuple decompression limit exceeded by operation"),
errdetail("current limit: %d, tuples decompressed: %lld",
ts_guc_max_tuples_decompressed_per_dml,
(long long int)
miinfo->ccstate->dispatch->dispatch_state->tuples_decompressed),
errhint("Consider increasing "
"timescaledb.max_tuples_decompressed_per_dml_transaction or "
"set to 0 (unlimited).")));
}
}

ResultRelInfo *resultRelInfo = cis->result_relation_info;

/*
Expand Down Expand Up @@ -914,12 +895,13 @@ copyfrom(CopyChunkState *ccstate, ParseState *pstate, Hypertable *ht, MemoryCont
/* Find or create the insert state matching the point */
cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch,
point,
myslot,
on_chunk_insert_state_changed,
bistate);

Assert(cis != NULL);

ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, myslot);

/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);

Expand Down
97 changes: 49 additions & 48 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ destroy_chunk_insert_state(void *cis)
*/
extern ChunkInsertState *
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
TupleTableSlot *slot,
const on_chunk_changed_func on_chunk_changed, void *data)
{
ChunkInsertState *cis;
Expand Down Expand Up @@ -97,7 +96,6 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* locking the hypertable. This serves as a fast path for the usual case
* where the chunk already exists.
*/
Assert(slot);
chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);

/*
Expand Down Expand Up @@ -149,36 +147,6 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
cis_changed = false;
}

if (found)
{
if (cis->chunk_compressed)
{
/*
* If this is an INSERT into a compressed chunk with UNIQUE or
* PRIMARY KEY constraints we need to make sure any batches that could
* potentially lead to a conflict are in the decompressed chunk so
* postgres can do proper constraint checking.
*/
if (ts_cm_functions->decompress_batches_for_insert)
{
ts_cm_functions->decompress_batches_for_insert(cis, slot);
OnConflictAction onconflict_action =
chunk_dispatch_get_on_conflict_action(dispatch);
/* mark rows visible */
if (onconflict_action == ONCONFLICT_UPDATE)
dispatch->estate->es_output_cid = GetCurrentCommandId(true);
}
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functionality not supported under the current \"%s\" license. "
"Learn more at https://timescale.com/.",
ts_guc_license),
errhint("To access all features and the best time-series "
"experience, try out Timescale Cloud")));
}
}

MemoryContextSwitchTo(old_context);

if (cis_changed && on_chunk_changed)
Expand All @@ -190,6 +158,54 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
return cis;
}

extern void
ts_chunk_dispatch_decompress_batches_for_insert(ChunkDispatch *dispatch, ChunkInsertState *cis,
TupleTableSlot *slot)
{
if (cis->chunk_compressed)
{
/*
* If this is an INSERT into a compressed chunk with UNIQUE or
* PRIMARY KEY constraints we need to make sure any batches that could
* potentially lead to a conflict are in the decompressed chunk so
* postgres can do proper constraint checking.
*/
if (ts_cm_functions->decompress_batches_for_insert)
{
ts_cm_functions->decompress_batches_for_insert(cis, slot);
OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch);
/* mark rows visible */
if (onconflict_action == ONCONFLICT_UPDATE)
dispatch->estate->es_output_cid = GetCurrentCommandId(true);

if (ts_guc_max_tuples_decompressed_per_dml > 0)
{
if (cis->cds->tuples_decompressed > ts_guc_max_tuples_decompressed_per_dml)
{
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("tuple decompression limit exceeded by operation"),
errdetail("current limit: %d, tuples decompressed: %lld",
ts_guc_max_tuples_decompressed_per_dml,
(long long int) cis->cds->tuples_decompressed),
errhint(
"Consider increasing "
"timescaledb.max_tuples_decompressed_per_dml_transaction or set "
"to 0 (unlimited).")));
}
}
}
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functionality not supported under the current \"%s\" license. "
"Learn more at https://timescale.com/.",
ts_guc_license),
errhint("To access all features and the best time-series "
"experience, try out Timescale Cloud")));
}
}

static CustomScanMethods chunk_dispatch_plan_methods = {
.CustomName = "ChunkDispatch",
.CreateCustomScanState = chunk_dispatch_state_create,
Expand Down Expand Up @@ -394,25 +410,10 @@ chunk_dispatch_exec(CustomScanState *node)
/* Find or create the insert state matching the point */
cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch,
point,
slot,
on_chunk_insert_state_changed,
state);

if (ts_guc_max_tuples_decompressed_per_dml > 0)
{
if (cis->cds->tuples_decompressed > ts_guc_max_tuples_decompressed_per_dml)
{
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("tuple decompression limit exceeded by operation"),
errdetail("current limit: %d, tuples decompressed: %lld",
ts_guc_max_tuples_decompressed_per_dml,
(long long int) cis->cds->tuples_decompressed),
errhint("Consider increasing "
"timescaledb.max_tuples_decompressed_per_dml_transaction or set "
"to 0 (unlimited).")));
}
}
ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, slot);

MemoryContextSwitchTo(old);

Expand Down
5 changes: 4 additions & 1 deletion src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ typedef void (*on_chunk_changed_func)(ChunkInsertState *state, void *data);
extern ChunkDispatch *ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags);
extern void ts_chunk_dispatch_destroy(ChunkDispatch *chunk_dispatch);
extern ChunkInsertState *
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *p, TupleTableSlot *slot,
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *p,
const on_chunk_changed_func on_chunk_changed, void *data);
extern void ts_chunk_dispatch_decompress_batches_for_insert(ChunkDispatch *dispatch,
ChunkInsertState *cis,
TupleTableSlot *slot);

extern TSDLLEXPORT Path *ts_chunk_dispatch_path_create(PlannerInfo *root, ModifyTablePath *mtpath,
Index hypertable_rti, int subpath_index);
24 changes: 17 additions & 7 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -2051,7 +2051,20 @@ build_scankeys(Oid hypertable_relid, Oid out_rel, RowDecompressor *decompressor,
char *attname = get_attname(decompressor->out_rel->rd_id, attno, false);
bool isnull;
AttrNumber ht_attno = get_attnum(hypertable_relid, attname);

/*
* This is a not very precise but easy assertion to detect attno
* mismatch at least in some cases. The mismatch might happen if the
* hypertable and chunk layout are different because of dropped
* columns, and we're using a wrong slot type here.
*/
PG_USED_FOR_ASSERTS_ONLY Oid ht_atttype = get_atttype(hypertable_relid, ht_attno);
PG_USED_FOR_ASSERTS_ONLY Oid slot_atttype =
slot->tts_tupleDescriptor->attrs[AttrNumberGetAttrOffset(ht_attno)].atttypid;
Assert(ht_atttype == slot_atttype);

Datum value = slot_getattr(slot, ht_attno, &isnull);

/*
* There are 3 possible scenarios we have to consider
* when dealing with columns which are part of unique
Expand Down Expand Up @@ -2256,14 +2269,11 @@ compressed_insert_key_columns(Relation relation)
void
decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
{
/* COPY operation can end up flushing an empty buffer which
* could in turn send an empty slot our way. No need to decompress
* anything if that happens.
/*
* This is supposed to be called with the actual tuple that is being
* inserted, so it cannot be empty.
*/
if (TTS_EMPTY(slot))
{
return;
}
Assert(!TTS_EMPTY(slot));

Relation out_rel = cis->rel;

Expand Down
56 changes: 56 additions & 0 deletions tsl/test/expected/compress_dml_copy.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Test various cases of COPY with decompression and different chunk layouts.
create table cdmlcopy(filler bigint, ts int, value float, metric text);
select create_hypertable('cdmlcopy', 'ts', chunk_time_interval => 1000);
NOTICE: adding not-null constraint to column "ts"
create_hypertable
-----------------------
(1,public,cdmlcopy,t)
(1 row)

alter table cdmlcopy add unique (metric, ts);
alter table cdmlcopy set (timescaledb.compress, timescaledb.compress_segmentby = 'metric');
NOTICE: default order by for hypertable "cdmlcopy" is set to "ts DESC"
\copy cdmlcopy from stdin
select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;
count
-------
1
(1 row)

alter table cdmlcopy drop column filler;
\copy cdmlcopy from stdin
select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;
NOTICE: chunk "_hyper_1_1_chunk" is already compressed
count
-------
2
(1 row)

\set ON_ERROR_STOP 0
\copy cdmlcopy from stdin
ERROR: duplicate key value violates unique constraint "1_1_cdmlcopy_metric_ts_key"
\copy cdmlcopy from stdin
ERROR: duplicate key value violates unique constraint "3_2_cdmlcopy_metric_ts_key"
\set ON_ERROR_STOP 1
-- Also test the code path where the chunk insert state goes out of cache.
set timescaledb.max_open_chunks_per_insert = 1;
select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;
NOTICE: chunk "_hyper_1_1_chunk" is already compressed
NOTICE: chunk "_hyper_1_3_chunk" is already compressed
count
-------
2
(1 row)

\copy cdmlcopy from stdin
reset timescaledb.max_open_chunks_per_insert;
select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;
count
-------
2
(1 row)

drop table cdmlcopy;
12 changes: 12 additions & 0 deletions tsl/test/expected/compression_insert.out
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,18 @@ SELECT count(compress_chunk(ch)) FROM show_chunks('test_copy') ch;
(1 row)

\copy test_copy FROM data/copy_data.csv WITH CSV HEADER;
-- Also test the code path where the chunk insert state goes out of cache.
set timescaledb.max_open_chunks_per_insert = 1;
truncate table test_copy;
INSERT INTO test_copy SELECT generate_series(1,25,1), -1;
SELECT count(compress_chunk(ch)) FROM show_chunks('test_copy') ch;
count
-------
3
(1 row)

\copy test_copy FROM data/copy_data.csv WITH CSV HEADER;
reset timescaledb.max_open_chunks_per_insert;
DROP TABLE test_copy;
-- Text limitting decompressed tuple during an insert
CREATE TABLE test_limit (
Expand Down
1 change: 1 addition & 0 deletions tsl/test/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ set(TEST_FILES
cagg_utils.sql
compress_auto_sparse_index.sql
compress_default.sql
compress_dml_copy.sql
compress_float8_corrupt.sql
compressed_detoaster.sql
compressed_collation.sql
Expand Down
60 changes: 60 additions & 0 deletions tsl/test/sql/compress_dml_copy.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

-- Test various cases of COPY with decompression and different chunk layouts.

create table cdmlcopy(filler bigint, ts int, value float, metric text);

select create_hypertable('cdmlcopy', 'ts', chunk_time_interval => 1000);

alter table cdmlcopy add unique (metric, ts);

alter table cdmlcopy set (timescaledb.compress, timescaledb.compress_segmentby = 'metric');

\copy cdmlcopy from stdin
0 1 1.1 'metric1'
0 1 1.2 'metric2'
\.

select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;

alter table cdmlcopy drop column filler;

\copy cdmlcopy from stdin
1001 1.1 'metric1'
1001 1.2 'metric2'
\.

select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;

\set ON_ERROR_STOP 0
\copy cdmlcopy from stdin
1 1.1 'metric1'
1 1.2 'metric2'
\.

\copy cdmlcopy from stdin
1001 1.1 'metric1'
1001 1.2 'metric2'
\.
\set ON_ERROR_STOP 1


-- Also test the code path where the chunk insert state goes out of cache.
set timescaledb.max_open_chunks_per_insert = 1;

select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;

\copy cdmlcopy from stdin
2 2.1 'metric1'
1002 2.2 'metric2'
2 2.2 'metric2'
1002 2.1 'metric1'
\.

reset timescaledb.max_open_chunks_per_insert;

select count(compress_chunk(x)) from show_chunks('cdmlcopy') x;

drop table cdmlcopy;
13 changes: 13 additions & 0 deletions tsl/test/sql/compression_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,19 @@ SELECT count(compress_chunk(ch)) FROM show_chunks('test_copy') ch;

\copy test_copy FROM data/copy_data.csv WITH CSV HEADER;

-- Also test the code path where the chunk insert state goes out of cache.
set timescaledb.max_open_chunks_per_insert = 1;

truncate table test_copy;

INSERT INTO test_copy SELECT generate_series(1,25,1), -1;

SELECT count(compress_chunk(ch)) FROM show_chunks('test_copy') ch;

\copy test_copy FROM data/copy_data.csv WITH CSV HEADER;

reset timescaledb.max_open_chunks_per_insert;

DROP TABLE test_copy;

-- Text limitting decompressed tuple during an insert
Expand Down

0 comments on commit 044322b

Please sign in to comment.