Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small decompress_chunk refactor #5244

Merged
merged 2 commits into from Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 11 additions & 13 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Expand Up @@ -73,7 +73,7 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,

cis = ts_subspace_store_get(dispatch->cache, point);

if (NULL == cis)
if (!cis)
{
/*
* The chunk search functions may leak memory, so switch to a temporary
Expand All @@ -91,21 +91,19 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* where the chunk already exists.
*/
bool found;
Chunk *new_chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);
Chunk *chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);

#if PG14_GE
/*
* Frozen chunks require at least PG14.
*/
if (new_chunk && ts_chunk_is_frozen(new_chunk))
elog(ERROR,
"cannot INSERT into frozen chunk \"%s\"",
get_rel_name(new_chunk->table_id));
if (chunk && ts_chunk_is_frozen(chunk))
elog(ERROR, "cannot INSERT into frozen chunk \"%s\"", get_rel_name(chunk->table_id));
#endif

if (new_chunk == NULL)
if (!chunk)
{
new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found);
chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found);
}
else
found = true;
Expand All @@ -114,7 +112,7 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
if (found && dispatch->hypertable->fd.replication_factor > 1)
{
List *chunk_data_nodes =
ts_chunk_data_node_scan_by_chunk_id_filter(new_chunk->fd.id, CurrentMemoryContext);
ts_chunk_data_node_scan_by_chunk_id_filter(chunk->fd.id, CurrentMemoryContext);

/*
* If the chunk was not created as part of this insert, we need to check whether any
Expand All @@ -123,16 +121,16 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* mapping for the unavailable data nodes.
*/
if (dispatch->hypertable->fd.replication_factor > list_length(chunk_data_nodes))
ts_cm_functions->dist_update_stale_chunk_metadata(new_chunk, chunk_data_nodes);
ts_cm_functions->dist_update_stale_chunk_metadata(chunk, chunk_data_nodes);

list_free(chunk_data_nodes);
}

if (NULL == new_chunk)
if (!chunk)
elog(ERROR, "no chunk found or created");

cis = ts_chunk_insert_state_create(new_chunk, dispatch);
ts_subspace_store_add(dispatch->cache, new_chunk->cube, cis, destroy_chunk_insert_state);
cis = ts_chunk_insert_state_create(chunk, dispatch);
ts_subspace_store_add(dispatch->cache, chunk->cube, cis, destroy_chunk_insert_state);

MemoryContextSwitchTo(old_context);
}
Expand Down
137 changes: 73 additions & 64 deletions tsl/src/compression/compression.c
Expand Up @@ -10,11 +10,12 @@
#include <access/htup_details.h>
#include <access/multixact.h>
#include <access/xact.h>
#include <catalog/heap.h>
#include <catalog/index.h>
#include <catalog/namespace.h>
#include <catalog/pg_am.h>
#include <catalog/pg_attribute.h>
#include <catalog/pg_type.h>
#include <catalog/index.h>
#include <catalog/heap.h>
#include <common/base64.h>
#include <executor/tuptable.h>
#include <funcapi.h>
Expand All @@ -32,8 +33,6 @@
#include <utils/syscache.h>
#include <utils/tuplesort.h>
#include <utils/typcache.h>
#include <catalog/pg_am.h>
#include <utils.h>

#include "compat/compat.h"

Expand All @@ -50,7 +49,6 @@
#include "ts_catalog/hypertable_compression.h"
#include "ts_catalog/catalog.h"
#include "guc.h"
#include <nodes/print.h>

#define MAX_ROWS_PER_COMPRESSION 1000
/* gap in sequence id between rows, potential for adding rows in gap later */
Expand Down Expand Up @@ -1501,6 +1499,8 @@ typedef struct RowDecompressor
PerCompressedColumn *per_compressed_cols;
int16 num_compressed_columns;

TupleDesc in_desc;

TupleDesc out_desc;
Relation out_rel;

Expand All @@ -1522,6 +1522,45 @@ static void row_decompressor_decompress_row(RowDecompressor *row_decompressor);
static bool per_compressed_col_get_data(PerCompressedColumn *per_compressed_col,
Datum *decompressed_datums, bool *decompressed_is_nulls);

static RowDecompressor
build_decompressor(Relation in_rel, Relation out_rel)
{
TupleDesc in_desc = RelationGetDescr(in_rel);
TupleDesc out_desc = RelationGetDescr(out_rel);

Oid compressed_typeid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;

Assert(OidIsValid(compressed_typeid));

RowDecompressor decompressor = {
.per_compressed_cols =
create_per_compressed_column(in_desc, out_desc, out_rel->rd_id, compressed_typeid),
.num_compressed_columns = in_desc->natts,

.in_desc = in_desc,

.out_desc = out_desc,
.out_rel = out_rel,

.mycid = GetCurrentCommandId(true),
.bistate = GetBulkInsertState(),

/* cache memory used to store the decompressed datums/is_null for form_tuple */
.decompressed_datums = palloc(sizeof(Datum) * out_desc->natts),
.decompressed_is_nulls = palloc(sizeof(bool) * out_desc->natts),
};

/*
* We need to make sure decompressed_is_nulls is in a defined state. While this
* will get written for normal columns it will not get written for dropped columns
* since dropped columns don't exist in the compressed chunk so we initiallize
* with true here.
*/
memset(decompressor.decompressed_is_nulls, true, out_desc->natts);

return decompressor;
}

void
decompress_chunk(Oid in_table, Oid out_table)
{
Expand All @@ -1541,74 +1580,44 @@ decompress_chunk(Oid in_table, Oid out_table)
*/
Relation in_rel = relation_open(in_table, ExclusiveLock);

TupleDesc in_desc = RelationGetDescr(in_rel);
TupleDesc out_desc = RelationGetDescr(out_rel);
RowDecompressor decompressor = build_decompressor(in_rel, out_rel);

Oid compressed_data_type_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;

Assert(OidIsValid(compressed_data_type_oid));
Datum *compressed_datums = palloc(sizeof(*compressed_datums) * decompressor.in_desc->natts);
bool *compressed_is_nulls = palloc(sizeof(*compressed_is_nulls) * decompressor.in_desc->natts);

HeapTuple compressed_tuple;
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
MemoryContext per_compressed_row_ctx =
AllocSetContextCreate(CurrentMemoryContext,
"decompress chunk per-compressed row",
ALLOCSET_DEFAULT_SIZES);

for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
{
RowDecompressor decompressor = {
.per_compressed_cols = create_per_compressed_column(in_desc,
out_desc,
out_table,
compressed_data_type_oid),
.num_compressed_columns = in_desc->natts,

.out_desc = out_desc,
.out_rel = out_rel,

.mycid = GetCurrentCommandId(true),
.bistate = GetBulkInsertState(),

/* cache memory used to store the decompressed datums/is_null for form_tuple */
.decompressed_datums = palloc(sizeof(Datum) * out_desc->natts),
.decompressed_is_nulls = palloc(sizeof(bool) * out_desc->natts),
};
/*
* We need to make sure decompressed_is_nulls is in a defined state. While this
* will get written for normal columns it will not get written for dropped columns
* since dropped columns don't exist in the compressed chunk so we initiallize
* with true here.
*/
memset(decompressor.decompressed_is_nulls, true, out_desc->natts);

Datum *compressed_datums = palloc(sizeof(*compressed_datums) * in_desc->natts);
bool *compressed_is_nulls = palloc(sizeof(*compressed_is_nulls) * in_desc->natts);

HeapTuple compressed_tuple;
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
MemoryContext per_compressed_row_ctx =
AllocSetContextCreate(CurrentMemoryContext,
"decompress chunk per-compressed row",
ALLOCSET_DEFAULT_SIZES);

for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
{
MemoryContext old_ctx;
MemoryContext old_ctx;

Assert(HeapTupleIsValid(compressed_tuple));
Assert(HeapTupleIsValid(compressed_tuple));

old_ctx = MemoryContextSwitchTo(per_compressed_row_ctx);
old_ctx = MemoryContextSwitchTo(per_compressed_row_ctx);

heap_deform_tuple(compressed_tuple, in_desc, compressed_datums, compressed_is_nulls);
populate_per_compressed_columns_from_data(decompressor.per_compressed_cols,
in_desc->natts,
compressed_datums,
compressed_is_nulls);
heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
compressed_datums,
compressed_is_nulls);
populate_per_compressed_columns_from_data(decompressor.per_compressed_cols,
decompressor.in_desc->natts,
compressed_datums,
compressed_is_nulls);

row_decompressor_decompress_row(&decompressor);
MemoryContextSwitchTo(old_ctx);
MemoryContextReset(per_compressed_row_ctx);
}

heap_endscan(heapScan);
FreeBulkInsertState(decompressor.bistate);
row_decompressor_decompress_row(&decompressor);
MemoryContextSwitchTo(old_ctx);
MemoryContextReset(per_compressed_row_ctx);
}

heap_endscan(heapScan);
FreeBulkInsertState(decompressor.bistate);

/* Recreate all indexes on out rel, we already have an exclusive lock on it,
* so the strong locks taken by reindex_relation shouldn't matter. */
#if PG14_LT
Expand Down