Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor INSERT into compressed chunks #4926

Merged
merged 1 commit into from Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@ accidentally triggering the load of a previous DB version.**
## Unreleased

**Bugfixes**
* #4926 Fix corruption when inserting into compressed chunks
* #5101 Fix enabling compression on caggs with renamed columns

## 2.9.0 (2022-12-15)
Expand Down
23 changes: 21 additions & 2 deletions sql/policy_internal.sql
Expand Up @@ -49,6 +49,11 @@ DECLARE
numchunks INTEGER := 1;
_message text;
_detail text;
-- chunk status bits:
bit_compressed int := 1;
bit_compressed_unordered int := 2;
bit_frozen int := 4;
bit_compressed_partial int := 8;
BEGIN

-- procedures with SET clause cannot execute transaction
Expand All @@ -75,7 +80,15 @@ BEGIN
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
WHERE
ch.dropped IS FALSE
AND (ch.status = 0 OR ch.status = 3)
AND (
ch.status = 0 OR
(
ch.status & bit_compressed > 0 AND (
ch.status & bit_compressed_unordered > 0 OR
ch.status & bit_compressed_partial > 0
)
)
)
LOOP
IF chunk_rec.status = 0 THEN
BEGIN
Expand All @@ -88,7 +101,13 @@ BEGIN
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
ERRCODE = sqlstate;
END;
ELSIF chunk_rec.status = 3 AND recompress_enabled IS TRUE THEN
ELSIF
(
chunk_rec.status & bit_compressed > 0 AND (
chunk_rec.status & bit_compressed_unordered > 0 OR
chunk_rec.status & bit_compressed_partial > 0
)
) AND recompress_enabled IS TRUE THEN
BEGIN
PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true);
EXCEPTION WHEN OTHERS THEN
Expand Down
4 changes: 3 additions & 1 deletion src/chunk.c
Expand Up @@ -4351,9 +4351,11 @@ ts_chunk_get_compression_status(int32 chunk_id)
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED);
bool status_is_unordered =
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED_UNORDERED);
bool status_is_partial =
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED_PARTIAL);
if (status_is_compressed)
{
if (status_is_unordered)
if (status_is_unordered || status_is_partial)
st = CHUNK_COMPRESS_UNORDERED;
else
st = CHUNK_COMPRESS_ORDERED;
Expand Down
146 changes: 45 additions & 101 deletions src/copy.c
Expand Up @@ -712,13 +712,6 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
{
ResultRelInfo *resultRelInfo;
ResultRelInfo *saved_resultRelInfo = NULL;
/* if copies are directed to a chunk that is compressed, we redirect
* them to the internal compressed chunk. But we still
* need to check triggers, constrainst etc. against the original
* chunk (not the internal compressed chunk).
* check_resultRelInfo saves that information
*/
ResultRelInfo *check_resultRelInfo = NULL;
EState *estate = ccstate->estate; /* for ExecConstraints() */
ExprContext *econtext;
TupleTableSlot *singleslot;
Expand Down Expand Up @@ -965,10 +958,6 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte

currentTupleInsertMethod = insertMethod;

/* Insert tuples into compressed chunks tuple by tuple */
if (cis->compress_info)
currentTupleInsertMethod = CIM_SINGLE;

/* Determine which triggers exist on this chunk */
has_before_insert_row_trig =
(cis->result_relation_info->ri_TrigDesc &&
Expand Down Expand Up @@ -1047,21 +1036,14 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
estate->es_result_relation_info = resultRelInfo;
#endif

if (cis->compress_info != NULL)
check_resultRelInfo = cis->compress_info->orig_result_relation_info;
else
check_resultRelInfo = resultRelInfo;

/* Set the right relation for triggers */
ts_tuptableslot_set_table_oid(myslot,
RelationGetRelid(check_resultRelInfo->ri_RelationDesc));
ts_tuptableslot_set_table_oid(myslot, RelationGetRelid(resultRelInfo->ri_RelationDesc));

skip_tuple = false;

/* BEFORE ROW INSERT Triggers */
if (check_resultRelInfo->ri_TrigDesc &&
check_resultRelInfo->ri_TrigDesc->trig_insert_before_row)
skip_tuple = !ExecBRInsertTriggers(estate, check_resultRelInfo, myslot);
if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row)
skip_tuple = !ExecBRInsertTriggers(estate, resultRelInfo, myslot);

if (!skip_tuple)
{
Expand All @@ -1072,110 +1054,72 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
List *recheckIndexes = NIL;

/* Compute stored generated columns */
if (check_resultRelInfo->ri_RelationDesc->rd_att->constr &&
check_resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGeneratedCompat(check_resultRelInfo, estate, myslot, CMD_INSERT);
if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGeneratedCompat(resultRelInfo, estate, myslot, CMD_INSERT);

/*
* If the target is a plain table, check the constraints of
* the tuple.
*/
if (check_resultRelInfo->ri_FdwRoutine == NULL &&
check_resultRelInfo->ri_RelationDesc->rd_att->constr)
if (resultRelInfo->ri_FdwRoutine == NULL &&
resultRelInfo->ri_RelationDesc->rd_att->constr)
{
Assert(check_resultRelInfo->ri_RangeTableIndex > 0 && estate->es_range_table);
ExecConstraints(check_resultRelInfo, myslot, estate);
Assert(resultRelInfo->ri_RangeTableIndex > 0 && estate->es_range_table);
ExecConstraints(resultRelInfo, myslot, estate);
}

if (cis->compress_info)
if (currentTupleInsertMethod == CIM_SINGLE)
{
Assert(currentTupleInsertMethod == CIM_SINGLE);

TupleTableSlot *compress_slot =
ts_cm_functions->compress_row_exec(cis->compress_info->compress_state, myslot);
/* After Row triggers do not work with compressed chunks. So
* explicitly call cagg trigger here
*/
if (cis->compress_info->has_cagg_trigger)
{
HeapTupleTableSlot *hslot = (HeapTupleTableSlot *) myslot;
if (!hslot->tuple)
hslot->tuple = heap_form_tuple(myslot->tts_tupleDescriptor,
myslot->tts_values,
myslot->tts_isnull);
ts_compress_chunk_invoke_cagg_trigger(cis->compress_info,
cis->rel,
hslot->tuple);
}

/* OK, store the tuple and create index entries for it */
table_tuple_insert(resultRelInfo->ri_RelationDesc,
compress_slot,
myslot,
mycid,
ti_options,
bistate);

if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuplesCompat(resultRelInfo,
compress_slot,
myslot,
estate,
false,
false,
NULL,
NIL);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate,
resultRelInfo,
myslot,
recheckIndexes,
NULL /* transition capture */);
}
else
{
if (currentTupleInsertMethod == CIM_SINGLE)
{
/* OK, store the tuple and create index entries for it */
table_tuple_insert(resultRelInfo->ri_RelationDesc,
myslot,
mycid,
ti_options,
bistate);

if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuplesCompat(resultRelInfo,
myslot,
estate,
false,
false,
NULL,
NIL);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate,
check_resultRelInfo,
myslot,
recheckIndexes,
NULL /* transition capture */);
}
else
/*
* The slot previously might point into the per-tuple
* context. For batching it needs to be longer lived.
*/
ExecMaterializeSlot(myslot);

/* Add this tuple to the tuple buffer */
TSCopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo,
buffer,
myslot,
ccstate->cstate);

/*
* If enough inserts have queued up, then flush all
* buffers out to their tables.
*/
if (TSCopyMultiInsertInfoIsFull(&multiInsertInfo))
{
/*
* The slot previously might point into the per-tuple
* context. For batching it needs to be longer lived.
*/
ExecMaterializeSlot(myslot);

/* Add this tuple to the tuple buffer */
TSCopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo,
buffer,
myslot,
ccstate->cstate);

/*
* If enough inserts have queued up, then flush all
* buffers out to their tables.
*/
if (TSCopyMultiInsertInfoIsFull(&multiInsertInfo))
{
ereport(DEBUG2,
(errmsg("Flush called with %d bytes and %d buffered tuples",
multiInsertInfo.bufferedBytes,
multiInsertInfo.bufferedTuples)));

TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis);
}
ereport(DEBUG2,
(errmsg("flush called with %d bytes and %d buffered tuples",
multiInsertInfo.bufferedBytes,
multiInsertInfo.bufferedTuples)));

TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis);
}
}

Expand Down
56 changes: 1 addition & 55 deletions src/nodes/chunk_dispatch_state.c
Expand Up @@ -115,10 +115,7 @@ chunk_dispatch_exec(CustomScanState *node)
* just when the chunk changes.
*/
#if PG14_LT
if (cis->compress_info != NULL)
estate->es_result_relation_info = cis->compress_info->orig_result_relation_info;
else
estate->es_result_relation_info = cis->result_relation_info;
estate->es_result_relation_info = cis->result_relation_info;
#endif

MemoryContextSwitchTo(old);
Expand All @@ -127,57 +124,6 @@ chunk_dispatch_exec(CustomScanState *node)
if (cis->hyper_to_chunk_map != NULL)
slot = execute_attr_map_slot(cis->hyper_to_chunk_map->attrMap, slot, cis->slot);

if (cis->compress_info != NULL)
{
/*
* When the chunk is compressed, we redirect the insert to the internal compressed
* chunk. However, any BEFORE ROW triggers defined on the chunk have to be executed
* before we redirect the insert.
*/
if (cis->compress_info->orig_result_relation_info->ri_TrigDesc &&
cis->compress_info->orig_result_relation_info->ri_TrigDesc->trig_insert_before_row)
{
bool skip_tuple;
skip_tuple =
!ExecBRInsertTriggers(estate, cis->compress_info->orig_result_relation_info, slot);

if (skip_tuple)
return NULL;
}

if (cis->rel->rd_att->constr && cis->rel->rd_att->constr->has_generated_stored)
ExecComputeStoredGeneratedCompat(cis->compress_info->orig_result_relation_info,
estate,
slot,
CMD_INSERT);

if (cis->rel->rd_att->constr)
ExecConstraints(cis->compress_info->orig_result_relation_info, slot, estate);

#if PG14_LT
estate->es_result_relation_info = cis->result_relation_info;
#endif
Assert(ts_cm_functions->compress_row_exec != NULL);
TupleTableSlot *orig_slot = slot;
old = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot = ts_cm_functions->compress_row_exec(cis->compress_info->compress_state, slot);
MemoryContextSwitchTo(old);
/* If we have cagg defined on the hypertable, we have to execute
* the function that records invalidations directly as AFTER ROW
* triggers do not work with compressed chunks.
*/
if (cis->compress_info->has_cagg_trigger)
{
Assert(ts_cm_functions->continuous_agg_call_invalidation_trigger);
HeapTupleTableSlot *hslot = (HeapTupleTableSlot *) orig_slot;
if (!hslot->tuple)
hslot->tuple = heap_form_tuple(orig_slot->tts_tupleDescriptor,
orig_slot->tts_values,
orig_slot->tts_isnull);

ts_compress_chunk_invoke_cagg_trigger(cis->compress_info, cis->rel, hslot->tuple);
}
}
return slot;
}

Expand Down