diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.c b/src/nodes/chunk_dispatch/chunk_dispatch.c index b84bf92d7b2..7cce830b055 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch/chunk_dispatch.c @@ -73,7 +73,7 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, cis = ts_subspace_store_get(dispatch->cache, point); - if (NULL == cis) + if (!cis) { /* * The chunk search functions may leak memory, so switch to a temporary @@ -91,21 +91,19 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, * where the chunk already exists. */ bool found; - Chunk *new_chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point); + Chunk *chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point); #if PG14_GE /* * Frozen chunks require at least PG14. */ - if (new_chunk && ts_chunk_is_frozen(new_chunk)) - elog(ERROR, - "cannot INSERT into frozen chunk \"%s\"", - get_rel_name(new_chunk->table_id)); + if (chunk && ts_chunk_is_frozen(chunk)) + elog(ERROR, "cannot INSERT into frozen chunk \"%s\"", get_rel_name(chunk->table_id)); #endif - if (new_chunk == NULL) + if (!chunk) { - new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found); + chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found); } else found = true; @@ -114,7 +112,7 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, if (found && dispatch->hypertable->fd.replication_factor > 1) { List *chunk_data_nodes = - ts_chunk_data_node_scan_by_chunk_id_filter(new_chunk->fd.id, CurrentMemoryContext); + ts_chunk_data_node_scan_by_chunk_id_filter(chunk->fd.id, CurrentMemoryContext); /* * If the chunk was not created as part of this insert, we need to check whether any @@ -123,16 +121,16 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, * mapping for the unavailable data nodes. */ if (dispatch->hypertable->fd.replication_factor > list_length(chunk_data_nodes)) - ts_cm_functions->dist_update_stale_chunk_metadata(new_chunk, chunk_data_nodes); + ts_cm_functions->dist_update_stale_chunk_metadata(chunk, chunk_data_nodes); list_free(chunk_data_nodes); } - if (NULL == new_chunk) + if (!chunk) elog(ERROR, "no chunk found or created"); - cis = ts_chunk_insert_state_create(new_chunk, dispatch); - ts_subspace_store_add(dispatch->cache, new_chunk->cube, cis, destroy_chunk_insert_state); + cis = ts_chunk_insert_state_create(chunk, dispatch); + ts_subspace_store_add(dispatch->cache, chunk->cube, cis, destroy_chunk_insert_state); MemoryContextSwitchTo(old_context); } diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 41983c19474..18c3db273e2 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -10,11 +10,12 @@ #include #include #include +#include +#include #include +#include #include #include -#include -#include #include #include #include @@ -32,8 +33,6 @@ #include #include #include -#include -#include #include "compat/compat.h" @@ -50,7 +49,6 @@ #include "ts_catalog/hypertable_compression.h" #include "ts_catalog/catalog.h" #include "guc.h" -#include #define MAX_ROWS_PER_COMPRESSION 1000 /* gap in sequence id between rows, potential for adding rows in gap later */ @@ -1501,6 +1499,8 @@ typedef struct RowDecompressor PerCompressedColumn *per_compressed_cols; int16 num_compressed_columns; + TupleDesc in_desc; + TupleDesc out_desc; Relation out_rel; @@ -1522,6 +1522,45 @@ static void row_decompressor_decompress_row(RowDecompressor *row_decompressor); static bool per_compressed_col_get_data(PerCompressedColumn *per_compressed_col, Datum *decompressed_datums, bool *decompressed_is_nulls); +static RowDecompressor +build_decompressor(Relation in_rel, Relation out_rel) +{ + TupleDesc in_desc = RelationGetDescr(in_rel); + TupleDesc out_desc = RelationGetDescr(out_rel); + + Oid compressed_typeid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid; + + Assert(OidIsValid(compressed_typeid)); + + RowDecompressor decompressor = { + .per_compressed_cols = + create_per_compressed_column(in_desc, out_desc, out_rel->rd_id, compressed_typeid), + .num_compressed_columns = in_desc->natts, + + .in_desc = in_desc, + + .out_desc = out_desc, + .out_rel = out_rel, + + .mycid = GetCurrentCommandId(true), + .bistate = GetBulkInsertState(), + + /* cache memory used to store the decompressed datums/is_null for form_tuple */ + .decompressed_datums = palloc(sizeof(Datum) * out_desc->natts), + .decompressed_is_nulls = palloc(sizeof(bool) * out_desc->natts), + }; + + /* + * We need to make sure decompressed_is_nulls is in a defined state. While this + * will get written for normal columns it will not get written for dropped columns + * since dropped columns don't exist in the compressed chunk so we initiallize + * with true here. + */ + memset(decompressor.decompressed_is_nulls, true, out_desc->natts); + + return decompressor; +} + void decompress_chunk(Oid in_table, Oid out_table) { @@ -1541,74 +1580,44 @@ decompress_chunk(Oid in_table, Oid out_table) */ Relation in_rel = relation_open(in_table, ExclusiveLock); - TupleDesc in_desc = RelationGetDescr(in_rel); - TupleDesc out_desc = RelationGetDescr(out_rel); + RowDecompressor decompressor = build_decompressor(in_rel, out_rel); - Oid compressed_data_type_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid; - - Assert(OidIsValid(compressed_data_type_oid)); + Datum *compressed_datums = palloc(sizeof(*compressed_datums) * decompressor.in_desc->natts); + bool *compressed_is_nulls = palloc(sizeof(*compressed_is_nulls) * decompressor.in_desc->natts); + HeapTuple compressed_tuple; + TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); + MemoryContext per_compressed_row_ctx = + AllocSetContextCreate(CurrentMemoryContext, + "decompress chunk per-compressed row", + ALLOCSET_DEFAULT_SIZES); + + for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); compressed_tuple != NULL; + compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) { - RowDecompressor decompressor = { - .per_compressed_cols = create_per_compressed_column(in_desc, - out_desc, - out_table, - compressed_data_type_oid), - .num_compressed_columns = in_desc->natts, - - .out_desc = out_desc, - .out_rel = out_rel, - - .mycid = GetCurrentCommandId(true), - .bistate = GetBulkInsertState(), - - /* cache memory used to store the decompressed datums/is_null for form_tuple */ - .decompressed_datums = palloc(sizeof(Datum) * out_desc->natts), - .decompressed_is_nulls = palloc(sizeof(bool) * out_desc->natts), - }; - /* - * We need to make sure decompressed_is_nulls is in a defined state. While this - * will get written for normal columns it will not get written for dropped columns - * since dropped columns don't exist in the compressed chunk so we initiallize - * with true here. - */ - memset(decompressor.decompressed_is_nulls, true, out_desc->natts); - - Datum *compressed_datums = palloc(sizeof(*compressed_datums) * in_desc->natts); - bool *compressed_is_nulls = palloc(sizeof(*compressed_is_nulls) * in_desc->natts); - - HeapTuple compressed_tuple; - TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); - MemoryContext per_compressed_row_ctx = - AllocSetContextCreate(CurrentMemoryContext, - "decompress chunk per-compressed row", - ALLOCSET_DEFAULT_SIZES); - - for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); - compressed_tuple != NULL; - compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) - { - MemoryContext old_ctx; + MemoryContext old_ctx; - Assert(HeapTupleIsValid(compressed_tuple)); + Assert(HeapTupleIsValid(compressed_tuple)); - old_ctx = MemoryContextSwitchTo(per_compressed_row_ctx); + old_ctx = MemoryContextSwitchTo(per_compressed_row_ctx); - heap_deform_tuple(compressed_tuple, in_desc, compressed_datums, compressed_is_nulls); - populate_per_compressed_columns_from_data(decompressor.per_compressed_cols, - in_desc->natts, - compressed_datums, - compressed_is_nulls); + heap_deform_tuple(compressed_tuple, + decompressor.in_desc, + compressed_datums, + compressed_is_nulls); + populate_per_compressed_columns_from_data(decompressor.per_compressed_cols, + decompressor.in_desc->natts, + compressed_datums, + compressed_is_nulls); - row_decompressor_decompress_row(&decompressor); - MemoryContextSwitchTo(old_ctx); - MemoryContextReset(per_compressed_row_ctx); - } - - heap_endscan(heapScan); - FreeBulkInsertState(decompressor.bistate); + row_decompressor_decompress_row(&decompressor); + MemoryContextSwitchTo(old_ctx); + MemoryContextReset(per_compressed_row_ctx); } + heap_endscan(heapScan); + FreeBulkInsertState(decompressor.bistate); + /* Recreate all indexes on out rel, we already have an exclusive lock on it, * so the strong locks taken by reindex_relation shouldn't matter. */ #if PG14_LT