Skip to content

Commit

Permalink
Refactor INSERT into compressed chunks
Browse files Browse the repository at this point in the history
This patch changes INSERTs into compressed chunks to no longer
be immediately compressed but stored in the uncompressed chunk
instead and later merged with the compressed chunk by a separate
job.

This greatly simplifies the INSERT-codepath as we no longer have
to rewrite the target of INSERTs and on-the-fly compress leading
to a roughly 2x improvement on INSERT rate into compressed chunk.
Additionally this improves TRIGGER-support for INSERTs into
compressed chunks.

This is a necessary refactoring to allow UPSERT/UPDATE/DELETE on
compressed chunks in follow-patches.
  • Loading branch information
svenklemm committed Nov 23, 2022
1 parent 40297f1 commit 5e8b23f
Show file tree
Hide file tree
Showing 19 changed files with 338 additions and 542 deletions.
9 changes: 7 additions & 2 deletions sql/policy_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ BEGIN
lag := _timescaledb_internal.subtract_integer_from_now(htoid, lag::BIGINT);
END IF;

-- chunk status bits:
-- 1: compressed
-- 2: compressed unordered
-- 4: frozen
-- 8: compressed partial
FOR chunk_rec IN
SELECT
show.oid, ch.schema_name, ch.table_name, ch.status
Expand All @@ -75,7 +80,7 @@ BEGIN
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
WHERE
ch.dropped IS FALSE
AND (ch.status = 0 OR ch.status = 3)
AND ch.status IN (0,3,9,11)
LOOP
IF chunk_rec.status = 0 THEN
BEGIN
Expand All @@ -88,7 +93,7 @@ BEGIN
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
ERRCODE = sqlstate;
END;
ELSIF chunk_rec.status = 3 AND recompress_enabled IS TRUE THEN
ELSIF chunk_rec.status IN (3,9,11) AND recompress_enabled IS TRUE THEN
BEGIN
PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true);
EXCEPTION WHEN OTHERS THEN
Expand Down
4 changes: 3 additions & 1 deletion src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -4451,9 +4451,11 @@ ts_chunk_get_compression_status(int32 chunk_id)
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED);
bool status_is_unordered =
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED_UNORDERED);
bool status_is_partial =
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED_PARTIAL);
if (status_is_compressed)
{
if (status_is_unordered)
if (status_is_unordered || status_is_partial)
st = CHUNK_COMPRESS_UNORDERED;
else
st = CHUNK_COMPRESS_ORDERED;
Expand Down
146 changes: 45 additions & 101 deletions src/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -712,13 +712,6 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
{
ResultRelInfo *resultRelInfo;
ResultRelInfo *saved_resultRelInfo = NULL;
/* if copies are directed to a chunk that is compressed, we redirect
* them to the internal compressed chunk. But we still
* need to check triggers, constrainst etc. against the original
* chunk (not the internal compressed chunk).
* check_resultRelInfo saves that information
*/
ResultRelInfo *check_resultRelInfo = NULL;
EState *estate = ccstate->estate; /* for ExecConstraints() */
ExprContext *econtext;
TupleTableSlot *singleslot;
Expand Down Expand Up @@ -965,10 +958,6 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte

currentTupleInsertMethod = insertMethod;

/* Insert tuples into compressed chunks tuple by tuple */
if (cis->compress_info)
currentTupleInsertMethod = CIM_SINGLE;

/* Determine which triggers exist on this chunk */
has_before_insert_row_trig =
(cis->result_relation_info->ri_TrigDesc &&
Expand Down Expand Up @@ -1047,21 +1036,14 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
estate->es_result_relation_info = resultRelInfo;
#endif

if (cis->compress_info != NULL)
check_resultRelInfo = cis->compress_info->orig_result_relation_info;
else
check_resultRelInfo = resultRelInfo;

/* Set the right relation for triggers */
ts_tuptableslot_set_table_oid(myslot,
RelationGetRelid(check_resultRelInfo->ri_RelationDesc));
ts_tuptableslot_set_table_oid(myslot, RelationGetRelid(resultRelInfo->ri_RelationDesc));

skip_tuple = false;

/* BEFORE ROW INSERT Triggers */
if (check_resultRelInfo->ri_TrigDesc &&
check_resultRelInfo->ri_TrigDesc->trig_insert_before_row)
skip_tuple = !ExecBRInsertTriggers(estate, check_resultRelInfo, myslot);
if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row)
skip_tuple = !ExecBRInsertTriggers(estate, resultRelInfo, myslot);

if (!skip_tuple)
{
Expand All @@ -1072,110 +1054,72 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
List *recheckIndexes = NIL;

/* Compute stored generated columns */
if (check_resultRelInfo->ri_RelationDesc->rd_att->constr &&
check_resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGeneratedCompat(check_resultRelInfo, estate, myslot, CMD_INSERT);
if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGeneratedCompat(resultRelInfo, estate, myslot, CMD_INSERT);

/*
* If the target is a plain table, check the constraints of
* the tuple.
*/
if (check_resultRelInfo->ri_FdwRoutine == NULL &&
check_resultRelInfo->ri_RelationDesc->rd_att->constr)
if (resultRelInfo->ri_FdwRoutine == NULL &&
resultRelInfo->ri_RelationDesc->rd_att->constr)
{
Assert(check_resultRelInfo->ri_RangeTableIndex > 0 && estate->es_range_table);
ExecConstraints(check_resultRelInfo, myslot, estate);
Assert(resultRelInfo->ri_RangeTableIndex > 0 && estate->es_range_table);
ExecConstraints(resultRelInfo, myslot, estate);
}

if (cis->compress_info)
if (currentTupleInsertMethod == CIM_SINGLE)
{
Assert(currentTupleInsertMethod == CIM_SINGLE);

TupleTableSlot *compress_slot =
ts_cm_functions->compress_row_exec(cis->compress_info->compress_state, myslot);
/* After Row triggers do not work with compressed chunks. So
* explicitly call cagg trigger here
*/
if (cis->compress_info->has_cagg_trigger)
{
HeapTupleTableSlot *hslot = (HeapTupleTableSlot *) myslot;
if (!hslot->tuple)
hslot->tuple = heap_form_tuple(myslot->tts_tupleDescriptor,
myslot->tts_values,
myslot->tts_isnull);
ts_compress_chunk_invoke_cagg_trigger(cis->compress_info,
cis->rel,
hslot->tuple);
}

/* OK, store the tuple and create index entries for it */
table_tuple_insert(resultRelInfo->ri_RelationDesc,
compress_slot,
myslot,
mycid,
ti_options,
bistate);

if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuplesCompat(resultRelInfo,
compress_slot,
myslot,
estate,
false,
false,
NULL,
NIL);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate,
resultRelInfo,
myslot,
recheckIndexes,
NULL /* transition capture */);
}
else
{
if (currentTupleInsertMethod == CIM_SINGLE)
{
/* OK, store the tuple and create index entries for it */
table_tuple_insert(resultRelInfo->ri_RelationDesc,
myslot,
mycid,
ti_options,
bistate);

if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuplesCompat(resultRelInfo,
myslot,
estate,
false,
false,
NULL,
NIL);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate,
check_resultRelInfo,
myslot,
recheckIndexes,
NULL /* transition capture */);
}
else
/*
* The slot previously might point into the per-tuple
* context. For batching it needs to be longer lived.
*/
ExecMaterializeSlot(myslot);

/* Add this tuple to the tuple buffer */
TSCopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo,
buffer,
myslot,
ccstate->cstate);

/*
* If enough inserts have queued up, then flush all
* buffers out to their tables.
*/
if (TSCopyMultiInsertInfoIsFull(&multiInsertInfo))
{
/*
* The slot previously might point into the per-tuple
* context. For batching it needs to be longer lived.
*/
ExecMaterializeSlot(myslot);

/* Add this tuple to the tuple buffer */
TSCopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo,
buffer,
myslot,
ccstate->cstate);

/*
* If enough inserts have queued up, then flush all
* buffers out to their tables.
*/
if (TSCopyMultiInsertInfoIsFull(&multiInsertInfo))
{
ereport(DEBUG2,
(errmsg("Flush called with %d bytes and %d buffered tuples",
multiInsertInfo.bufferedBytes,
multiInsertInfo.bufferedTuples)));

TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis);
}
ereport(DEBUG2,
(errmsg("flush called with %d bytes and %d buffered tuples",
multiInsertInfo.bufferedBytes,
multiInsertInfo.bufferedTuples)));

TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis);
}
}

Expand Down
56 changes: 1 addition & 55 deletions src/nodes/chunk_dispatch_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ chunk_dispatch_exec(CustomScanState *node)
* just when the chunk changes.
*/
#if PG14_LT
if (cis->compress_info != NULL)
estate->es_result_relation_info = cis->compress_info->orig_result_relation_info;
else
estate->es_result_relation_info = cis->result_relation_info;
estate->es_result_relation_info = cis->result_relation_info;
#endif

MemoryContextSwitchTo(old);
Expand All @@ -127,57 +124,6 @@ chunk_dispatch_exec(CustomScanState *node)
if (cis->hyper_to_chunk_map != NULL)
slot = execute_attr_map_slot(cis->hyper_to_chunk_map->attrMap, slot, cis->slot);

if (cis->compress_info != NULL)
{
/*
* When the chunk is compressed, we redirect the insert to the internal compressed
* chunk. However, any BEFORE ROW triggers defined on the chunk have to be executed
* before we redirect the insert.
*/
if (cis->compress_info->orig_result_relation_info->ri_TrigDesc &&
cis->compress_info->orig_result_relation_info->ri_TrigDesc->trig_insert_before_row)
{
bool skip_tuple;
skip_tuple =
!ExecBRInsertTriggers(estate, cis->compress_info->orig_result_relation_info, slot);

if (skip_tuple)
return NULL;
}

if (cis->rel->rd_att->constr && cis->rel->rd_att->constr->has_generated_stored)
ExecComputeStoredGeneratedCompat(cis->compress_info->orig_result_relation_info,
estate,
slot,
CMD_INSERT);

if (cis->rel->rd_att->constr)
ExecConstraints(cis->compress_info->orig_result_relation_info, slot, estate);

#if PG14_LT
estate->es_result_relation_info = cis->result_relation_info;
#endif
Assert(ts_cm_functions->compress_row_exec != NULL);
TupleTableSlot *orig_slot = slot;
old = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot = ts_cm_functions->compress_row_exec(cis->compress_info->compress_state, slot);
MemoryContextSwitchTo(old);
/* If we have cagg defined on the hypertable, we have to execute
* the function that records invalidations directly as AFTER ROW
* triggers do not work with compressed chunks.
*/
if (cis->compress_info->has_cagg_trigger)
{
Assert(ts_cm_functions->continuous_agg_call_invalidation_trigger);
HeapTupleTableSlot *hslot = (HeapTupleTableSlot *) orig_slot;
if (!hslot->tuple)
hslot->tuple = heap_form_tuple(orig_slot->tts_tupleDescriptor,
orig_slot->tts_values,
orig_slot->tts_isnull);

ts_compress_chunk_invoke_cagg_trigger(cis->compress_info, cis->rel, hslot->tuple);
}
}
return slot;
}

Expand Down

0 comments on commit 5e8b23f

Please sign in to comment.