Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segfault when deleting from compressed chunk #5643

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ typedef struct CrossModuleFunctions
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk,
TupleTableSlot *slot);
void (*decompress_batches_for_update_delete)(List *chunks, List *predicates);
bool (*decompress_target_segments)(PlanState *ps);
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql
Expand Down
54 changes: 2 additions & 52 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,53 +93,6 @@ get_chunk_dispatch_states(PlanState *substate)
return NIL;
}

#if PG14_GE
typedef struct ChunkScanNodes
{
/* list of compressed chunks */
List *chunks;
/* list of conditions specified in WHERE */
List *predicates;
} ChunkScanNodes;
/*
* Traverse the plan tree to look for Scan nodes on uncompressed chunks.
* Once Scan node is found check if chunk is compressed, if so then save
* the chunk in HypertableModifyState to be used during plan execution.
* We also save the WHERE quals to get information about predicates.
*/
static bool
collect_chunks_from_scan(PlanState *ps, ChunkScanNodes *sn)
{
RangeTblEntry *rte = NULL;
Chunk *current_chunk;
if (ps == NULL || ts_guc_enable_transparent_decompression == false)
return false;

switch (nodeTag(ps))
{
case T_SeqScanState:
case T_SampleScanState:
case T_IndexScanState:
case T_IndexOnlyScanState:
case T_BitmapHeapScanState:
case T_TidScanState:
case T_TidRangeScanState:
rte = rt_fetch(((Scan *) ps->plan)->scanrelid, ps->state->es_range_table);
current_chunk = ts_chunk_get_by_relid(rte->relid, false);
if (current_chunk && ts_chunk_is_compressed(current_chunk))
{
sn->chunks = lappend(sn->chunks, current_chunk);
if (ps->plan->qual && !sn->predicates)
sb230132 marked this conversation as resolved.
Show resolved Hide resolved
sn->predicates = ps->plan->qual;
}
break;
default:
break;
}
return planstate_tree_walker(ps, collect_chunks_from_scan, sn);
}
#endif

/*
* HypertableInsert (with corresponding executor node) is a plan node that
* implements INSERTs for hypertables. It is mostly a wrapper around the
Expand Down Expand Up @@ -799,11 +752,9 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)
*/
if ((operation == CMD_DELETE || operation == CMD_UPDATE) && !ht_state->comp_chunks_processed)
{
ChunkScanNodes *sn = palloc0(sizeof(ChunkScanNodes));
collect_chunks_from_scan(pstate, sn);
if (sn->chunks && ts_cm_functions->decompress_batches_for_update_delete)
if (ts_cm_functions->decompress_target_segments)
{
ts_cm_functions->decompress_batches_for_update_delete(sn->chunks, sn->predicates);
ts_cm_functions->decompress_target_segments(pstate);
ht_state->comp_chunks_processed = true;
/*
* save snapshot set during ExecutorStart(), since this is the same
Expand All @@ -816,7 +767,6 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)
/* mark rows visible */
estate->es_output_cid = GetCurrentCommandId(true);
}
pfree(sn);
}
/*
* Fetch rows from subplan, and execute the required table modification
Expand Down
119 changes: 76 additions & 43 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <funcapi.h>
#include <libpq/pqformat.h>
#include <miscadmin.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <nodes/print.h>
#include <parser/parsetree.h>
#include <storage/lmgr.h>
#include <storage/predicate.h>
#include <utils/builtins.h>
Expand Down Expand Up @@ -2340,66 +2342,97 @@
* 3. Move scanned rows to staging area.
* 4. Update catalog table to change status of moved chunk.
*/
void
decompress_batches_for_update_delete(List *chunks, List *predicates)
static void
decompress_batches_for_update_delete(Chunk *chunk, List *predicates)
{
/* process each chunk with its corresponding predicates */

List *filters = NIL;
List *is_null = NIL;
ListCell *ch = NULL;
ListCell *lc = NULL;

Relation chunk_rel;
Relation comp_chunk_rel;
Chunk *chunk, *comp_chunk;
Chunk *comp_chunk;
RowDecompressor decompressor;
SegmentFilter *filter;

if (predicates)
fill_predicate_context(linitial(chunks), predicates, &filters, &is_null);
sb230132 marked this conversation as resolved.
Show resolved Hide resolved
foreach (ch, chunks)
{
chunk = (Chunk *) lfirst(ch);
bool chunk_status_changed = false;
ScanKeyData *scankeys = NULL;
Bitmapset *null_columns = NULL;
int num_scankeys = 0;

chunk_rel = table_open(chunk->table_id, RowExclusiveLock);
comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);
decompressor = build_decompressor(comp_chunk_rel, chunk_rel);

if (filters)
{
scankeys =
build_update_delete_scankeys(&decompressor, filters, &num_scankeys, &null_columns);
}
if (decompress_batches(&decompressor,
scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed))
{
/*
* tuples from compressed chunk has been decompressed and moved
* to staging area, thus mark this chunk as partially compressed
*/
if (chunk_status_changed == true)
ts_chunk_set_partial(lfirst(ch));
}
bool chunk_status_changed = false;
ScanKeyData *scankeys = NULL;
Bitmapset *null_columns = NULL;
int num_scankeys = 0;

ts_catalog_close_indexes(decompressor.indexstate);
FreeBulkInsertState(decompressor.bistate);
fill_predicate_context(chunk, predicates, &filters, &is_null);

table_close(chunk_rel, NoLock);
table_close(comp_chunk_rel, NoLock);
chunk_rel = table_open(chunk->table_id, RowExclusiveLock);
comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);
decompressor = build_decompressor(comp_chunk_rel, chunk_rel);

if (filters)
{
scankeys =
build_update_delete_scankeys(&decompressor, filters, &num_scankeys, &null_columns);
}
if (decompress_batches(&decompressor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the return value contract of this method? (its not documented...)

...but anyway - I think it would be better to instead of passing a writeable bool ptr; have that as return value?

note that: currently I see 3 return-s in this method; from which:

  • 2 doesn't free scankeys
    • not necessarily an issue - I think you can just remove the if+pfree
  • 1 may not end with an ERROR ; and can exit the method without closing the heapScan - what could be the consequences of that?

...and I now wonder why that method tries to mask concurrent deletes/updates on the compressed table - instead of erroring out straight away? as it is now it may leave the table in an inconsistent state...

                        if (IsolationUsesXactSnapshot())
                                ereport(ERROR,
                                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                 errmsg("could not serialize access due to concurrent update")));

scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed))
{
/*
* tuples from compressed chunk has been decompressed and moved
* to staging area, thus mark this chunk as partially compressed
*/
if (chunk_status_changed == true)
ts_chunk_set_partial(chunk);
}

ts_catalog_close_indexes(decompressor.indexstate);
FreeBulkInsertState(decompressor.bistate);

table_close(chunk_rel, NoLock);
table_close(comp_chunk_rel, NoLock);

foreach (lc, filters)
{
filter = lfirst(lc);
pfree(filter);
}
}

/*
* Traverse the plan tree to look for Scan nodes on uncompressed chunks.
* Once Scan node is found check if chunk is compressed, if so then
* decompress those segments which match the filter conditions if present.
*/
bool
decompress_target_segments(PlanState *ps)
{
RangeTblEntry *rte = NULL;
Chunk *current_chunk;
if (ps == NULL)
return false;

Check warning on line 2415 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L2415

Added line #L2415 was not covered by tests

switch (nodeTag(ps))
{
case T_SeqScanState:
case T_SampleScanState:
case T_IndexScanState:
case T_IndexOnlyScanState:
case T_BitmapHeapScanState:
case T_TidScanState:
case T_TidRangeScanState:
rte = rt_fetch(((Scan *) ps->plan)->scanrelid, ps->state->es_range_table);
current_chunk = ts_chunk_get_by_relid(rte->relid, false);
if (current_chunk && ts_chunk_is_compressed(current_chunk))
{
decompress_batches_for_update_delete(current_chunk, ps->plan->qual);
}
break;
default:
break;
}
return planstate_tree_walker(ps, decompress_target_segments, NULL);
}
#endif
2 changes: 1 addition & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ typedef struct ChunkInsertState ChunkInsertState;
extern void decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk,
TupleTableSlot *slot);
#if PG14_GE
extern void decompress_batches_for_update_delete(List *chunks, List *predicates);
extern bool decompress_target_segments(PlanState *ps);
#endif
/* CompressSingleRowState methods */
struct CompressSingleRowState;
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ CrossModuleFunctions tsl_cm_functions = {
.decompress_chunk = tsl_decompress_chunk,
.decompress_batches_for_insert = decompress_batches_for_insert,
#if PG14_GE
.decompress_batches_for_update_delete = decompress_batches_for_update_delete,
.decompress_target_segments = decompress_target_segments,
#else
.decompress_batches_for_update_delete = NULL,
.decompress_target_segments = NULL,
#endif

.data_node_add = data_node_add,
Expand Down
18 changes: 5 additions & 13 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ do update set b = excluded.b;
SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
3 | 111 | 20 |
sb230132 marked this conversation as resolved.
Show resolved Hide resolved
10 | 12 | 12 | 12
20 | 111 | 20 |
30 | 111 | 40 |
(3 rows)
(4 rows)

--TEST2c Do DML directly on the chunk.
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12)
Expand All @@ -218,10 +219,11 @@ do update set b = excluded.b + 12;
SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
3 | 111 | 20 |
10 | 24 | 12 | 12
20 | 111 | 20 |
30 | 111 | 40 |
(3 rows)
(4 rows)

update _timescaledb_internal._hyper_1_2_chunk
set b = 12;
Expand All @@ -238,9 +240,8 @@ update foo set b =20 where a = 10;
select * from _timescaledb_internal._hyper_1_2_chunk order by a,b;
a | b | c | d
----+----+----+-----
10 | 20 | 20 |
11 | 10 | 20 | 120
(2 rows)
(1 row)

delete from foo where a = 10;
select * from _timescaledb_internal._hyper_1_2_chunk order by a,b;
Expand Down Expand Up @@ -444,15 +445,6 @@ vacuum full conditions;
-- After vacuum, table_bytes is 0, but any associated index/toast storage is not
-- completely reclaimed. Sets it at 8K (page size). So a chunk which has
-- been compressed still incurs an overhead of n * 8KB (for every index + toast table) storage on the original uncompressed chunk.
select pg_size_pretty(table_bytes), pg_size_pretty(index_bytes),
pg_size_pretty(toast_bytes), pg_size_pretty(total_bytes)
from hypertable_detailed_size('foo');
-[ RECORD 1 ]--+-----------
pg_size_pretty | 32 kB
pg_size_pretty | 144 kB
pg_size_pretty | 8192 bytes
pg_size_pretty | 184 kB

select pg_size_pretty(table_bytes), pg_size_pretty(index_bytes),
pg_size_pretty(toast_bytes), pg_size_pretty(total_bytes)
from hypertable_detailed_size('conditions');
Expand Down