Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support UPDATE/DELETE on compressed hypertables. #5339

Merged
merged 1 commit into from Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,7 @@ accidentally triggering the load of a previous DB version.**
* #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* #5344 Enable JOINS for Hierarchical Continuous Aggregates
* #5417 Refactor and optimize distributed COPY
* #5339 Support UPDATE/DELETE on compressed hypertables

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -139,6 +139,7 @@ typedef struct CrossModuleFunctions
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk,
TupleTableSlot *slot);
void (*decompress_batches_for_update_delete)(List *chunks, List *predicates);
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql
Expand Down
86 changes: 82 additions & 4 deletions src/nodes/hypertable_modify.c
Expand Up @@ -33,7 +33,7 @@
#if PG14_GE
static void fireASTriggers(ModifyTableState *node);
static void fireBSTriggers(ModifyTableState *node);
static TupleTableSlot *ExecModifyTable(PlanState *pstate);
static TupleTableSlot *ExecModifyTable(CustomScanState *cs_node, PlanState *pstate);
static TupleTableSlot *ExecProcessReturning(ResultRelInfo *resultRelInfo, TupleTableSlot *tupleSlot,
TupleTableSlot *planSlot);
static void ExecInitInsertProjection(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo);
Expand Down Expand Up @@ -93,6 +93,53 @@ get_chunk_dispatch_states(PlanState *substate)
return NIL;
}

#if PG14_GE
typedef struct ChunkScanNodes
{
/* list of compressed chunks */
List *chunks;
/* list of conditions specified in WHERE */
List *predicates;
} ChunkScanNodes;
/*
* Traverse the plan tree to look for Scan nodes on uncompressed chunks.
* Once Scan node is found check if chunk is compressed, if so then save
* the chunk in HypertableModifyState to be used during plan execution.
* We also save the WHERE quals to get information about predicates.
*/
static bool
collect_chunks_from_scan(PlanState *ps, ChunkScanNodes *sn)
{
RangeTblEntry *rte = NULL;
Chunk *current_chunk;
if (ps == NULL || ts_guc_enable_transparent_decompression == false)
return false;

switch (nodeTag(ps))
{
case T_SeqScanState:
case T_SampleScanState:
case T_IndexScanState:
case T_IndexOnlyScanState:
case T_BitmapHeapScanState:
case T_TidScanState:
case T_TidRangeScanState:
rte = rt_fetch(((Scan *) ps->plan)->scanrelid, ps->state->es_range_table);
current_chunk = ts_chunk_get_by_relid(rte->relid, false);
if (current_chunk && ts_chunk_is_compressed(current_chunk))
{
sn->chunks = lappend(sn->chunks, current_chunk);
if (ps->plan->qual && !sn->predicates)
sn->predicates = ps->plan->qual;
}
break;
default:
break;
}
return planstate_tree_walker(ps, collect_chunks_from_scan, sn);
}
#endif

/*
* HypertableInsert (with corresponding executor node) is a plan node that
* implements INSERTs for hypertables. It is mostly a wrapper around the
Expand Down Expand Up @@ -170,7 +217,7 @@ hypertable_modify_exec(CustomScanState *node)
return ExecProcNode(linitial(node->custom_ps));
#else
ModifyTableState *mtstate = linitial_node(ModifyTableState, node->custom_ps);
return ExecModifyTable(&mtstate->ps);
return ExecModifyTable(node, &mtstate->ps);
#endif
}

Expand Down Expand Up @@ -675,8 +722,9 @@ internalGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, Tupl
* modified version of ExecModifyTable from executor/nodeModifyTable.c
*/
static TupleTableSlot *
ExecModifyTable(PlanState *pstate)
ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)
{
HypertableModifyState *ht_state = (HypertableModifyState *) cs_node;
ModifyTableState *node = castNode(ModifyTableState, pstate);
ModifyTableContext context;
EState *estate = node->ps.state;
Expand Down Expand Up @@ -745,7 +793,31 @@ ExecModifyTable(PlanState *pstate)
context.mtstate = node;
context.epqstate = &node->mt_epqstate;
context.estate = estate;

/*
* For UPDATE/DELETE on compressed hypertable, decompress chunks and
* move rows to uncompressed chunks.
*/
if ((operation == CMD_DELETE || operation == CMD_UPDATE) && !ht_state->comp_chunks_processed)
{
ChunkScanNodes *sn = palloc0(sizeof(ChunkScanNodes));
collect_chunks_from_scan(pstate, sn);
if (sn->chunks && ts_cm_functions->decompress_batches_for_update_delete)
{
ts_cm_functions->decompress_batches_for_update_delete(sn->chunks, sn->predicates);
ht_state->comp_chunks_processed = true;
/*
* save snapshot set during ExecutorStart(), since this is the same
* snapshot used to SeqScan of uncompressed chunks
*/
ht_state->snapshot = estate->es_snapshot;
/* use current transaction snapshot */
estate->es_snapshot = GetTransactionSnapshot();
sb230132 marked this conversation as resolved.
Show resolved Hide resolved
CommandCounterIncrement();
/* mark rows visible */
estate->es_output_cid = GetCurrentCommandId(true);
}
pfree(sn);
}
/*
* Fetch rows from subplan, and execute the required table modification
* for each row.
Expand Down Expand Up @@ -970,6 +1042,12 @@ ExecModifyTable(PlanState *pstate)
*/
relinfos = estate->es_opened_result_relations;

if (ht_state->comp_chunks_processed)
{
estate->es_snapshot = ht_state->snapshot;
ht_state->comp_chunks_processed = false;
}

foreach (lc, relinfos)
{
resultRelInfo = lfirst(lc);
Expand Down
2 changes: 2 additions & 0 deletions src/nodes/hypertable_modify.h
Expand Up @@ -27,6 +27,8 @@ typedef struct HypertableModifyState
CustomScanState cscan_state;
ModifyTable *mt;
List *serveroids;
bool comp_chunks_processed;
Snapshot snapshot;
FdwRoutine *fdwroutine;
} HypertableModifyState;

Expand Down