Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index support for compress chunk #4821

Merged
merged 1 commit into from Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/guc.c
Expand Up @@ -77,6 +77,7 @@ bool ts_guc_enable_osm_reads = true;
TSDLLEXPORT bool ts_guc_enable_transparent_decompression = true;
bool ts_guc_enable_per_data_node_queries = true;
bool ts_guc_enable_async_append = true;
TSDLLEXPORT bool ts_guc_enable_compression_indexscan = true;
TSDLLEXPORT bool ts_guc_enable_skip_scan = true;
int ts_guc_max_open_chunks_per_insert = 10;
int ts_guc_max_cached_chunks_per_hypertable = 10;
Expand Down Expand Up @@ -386,6 +387,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.enable_compression_indexscan",
"Enable compression to take indexscan path",
"Enable indexscan during compression, if matching index is found",
&ts_guc_enable_compression_indexscan,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomEnumVariable("timescaledb.remote_data_fetcher",
"Set remote data fetcher type",
"Pick data fetcher type based on type of queries you plan to run "
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Expand Up @@ -56,6 +56,7 @@ extern TSDLLEXPORT bool ts_guc_enable_client_ddl_on_data_nodes;
extern TSDLLEXPORT char *ts_guc_ssl_dir;
extern TSDLLEXPORT char *ts_guc_passfile;
extern TSDLLEXPORT bool ts_guc_enable_remote_explain;
extern TSDLLEXPORT bool ts_guc_enable_compression_indexscan;

typedef enum DataFetcherType
{
Expand Down
238 changes: 214 additions & 24 deletions tsl/src/compression/compression.c
Expand Up @@ -32,7 +32,7 @@
#include <utils/syscache.h>
#include <utils/tuplesort.h>
#include <utils/typcache.h>

#include <catalog/pg_am.h>
#include <utils.h>

#include "compat/compat.h"
Expand All @@ -49,7 +49,7 @@
#include "segment_meta.h"
#include "ts_catalog/hypertable_compression.h"
#include "ts_catalog/catalog.h"

#include "guc.h"
#include <nodes/print.h>

#define MAX_ROWS_PER_COMPRESSION 1000
Expand Down Expand Up @@ -165,6 +165,17 @@ static void row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompr
static void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Tuplesortstate *sorted_rel, TupleDesc sorted_desc);
static void row_compressor_finish(RowCompressor *row_compressor);
static void row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row);
static bool row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor,
TupleTableSlot *row);
static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row);
static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
bool changed_groups);

static SegmentInfo *segment_info_new(Form_pg_attribute column_attr);
static void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);
static bool segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null);
static void run_analyze_on_chunk(Oid chunk_relid);

/********************
** compress_chunk **
Expand Down Expand Up @@ -300,6 +311,18 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
int num_compression_infos)
{
int n_keys;
ListCell *lc;
int indexscan_direction = NoMovementScanDirection;
List *in_rel_index_oids;
Relation matched_index_rel = NULL;
TupleTableSlot *slot;
IndexScanDesc index_scan;
bool first_iteration = true;
bool changed_groups, compressed_row_is_full;
MemoryContext old_ctx;
CommandId mycid = GetCurrentCommandId(true);
HeapTuple in_table_tp = NULL, index_tp = NULL;
Form_pg_attribute in_table_attr_tp, index_attr_tp;
const ColumnCompressionInfo **keys;
CompressionStats cstat;

Expand All @@ -323,14 +346,139 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column

TupleDesc in_desc = RelationGetDescr(in_rel);
TupleDesc out_desc = RelationGetDescr(out_rel);
in_rel_index_oids = RelationGetIndexList(in_rel);
int i = 0;
/* Before calling row compressor relation should be segmented and sorted as per
* compress_segmentby and compress_orderby column/s configured in ColumnCompressionInfo.
* Cost of sorting can be mitigated if we find an existing BTREE index defined for
* uncompressed chunk otherwise expensive tuplesort will come into play.
*
* The following code is trying to find an existing index that
* matches the ColumnCompressionInfo so that we can skip sequential scan and
* tuplesort.
*
* Matching Criteria for Each IndexAtt[i] and ColumnCompressionInfo Keys[i]
* ========================================================================
* a) Index attnum must match with ColumnCompressionInfo Key {keys[i]}.
* b) Index attOption(ASC/DESC and NULL_FIRST) can be mapped with ColumnCompressionInfo
* orderby_asc and null_first.
*
* BTREE Indexes Ordering
* =====================
* a) ASC[Null_Last] ==> [1]->[2]->NULL
* b) [Null_First]ASC ==> NULL->[1]->[2]
* c) DSC[Null_Last] ==> [2]->[1]->NULL
* d) [Null_First]DSC ==> NULL->[2]->[1]
*/
if (ts_guc_enable_compression_indexscan)
{
foreach (lc, in_rel_index_oids)
{
Oid index_oid = lfirst_oid(lc);
Relation index_rel = index_open(index_oid, AccessShareLock);
IndexInfo *index_info = BuildIndexInfo(index_rel);
int previous_direction = NoMovementScanDirection;
int current_direction = NoMovementScanDirection;

Tuplesortstate *sorted_rel = compress_chunk_sort_relation(in_rel, n_keys, keys);
if (n_keys <= index_info->ii_NumIndexKeyAttrs && index_info->ii_Am == BTREE_AM_OID)
{
for (i = 0; i < n_keys; i++)
{
int16 att_num = get_attnum(in_table, NameStr(keys[i]->attname));

int16 option = index_rel->rd_indoption[i];
bool index_orderby_asc = ((option & INDOPTION_DESC) == 0);
bool index_null_first = ((option & INDOPTION_NULLS_FIRST) != 0);
bool is_orderby_asc =
COMPRESSIONCOL_IS_SEGMENT_BY(keys[i]) ? true : keys[i]->orderby_asc;
bool is_null_first =
COMPRESSIONCOL_IS_SEGMENT_BY(keys[i]) ? false : keys[i]->orderby_nullsfirst;

if (att_num == 0 || index_info->ii_IndexAttrNumbers[i] != att_num)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can att_num == 0 happen in practice? This means that the orderby/segmentby column is not found in the uncompressed chunk. That would be either a program error or a data corruption. Let's use Ensure macro to check this, not if.

{
break;
}

in_table_tp = SearchSysCacheAttNum(in_table, att_num);
if (!HeapTupleIsValid(in_table_tp))
elog(ERROR,
"table \"%s\" does not have column \"%s\"",
get_rel_name(in_table),
NameStr(keys[i]->attname));

index_tp = SearchSysCacheAttNum(index_oid, i + 1);
if (!HeapTupleIsValid(index_tp))
elog(ERROR,
"index \"%s\" does not have column \"%s\"",
get_rel_name(index_oid),
NameStr(keys[i]->attname));

in_table_attr_tp = (Form_pg_attribute) GETSTRUCT(in_table_tp);
index_attr_tp = (Form_pg_attribute) GETSTRUCT(index_tp);

if (index_orderby_asc == is_orderby_asc && index_null_first == is_null_first &&
in_table_attr_tp->attcollation == index_attr_tp->attcollation)
{
current_direction = ForwardScanDirection;
}
else if (index_orderby_asc != is_orderby_asc &&
index_null_first != is_null_first &&
in_table_attr_tp->attcollation == index_attr_tp->attcollation)
{
current_direction = BackwardScanDirection;
}
else
{
current_direction = NoMovementScanDirection;
break;
}

ReleaseSysCache(in_table_tp);
in_table_tp = NULL;
ReleaseSysCache(index_tp);
index_tp = NULL;
if (previous_direction == NoMovementScanDirection)
{
previous_direction = current_direction;
}
else if (previous_direction != current_direction)
{
break;
}
}

RowCompressor row_compressor;
if (n_keys == i && (previous_direction == current_direction &&
current_direction != NoMovementScanDirection))
{
matched_index_rel = index_rel;
indexscan_direction = current_direction;
break;
}
else
{
if (HeapTupleIsValid(in_table_tp))
{
ReleaseSysCache(in_table_tp);
in_table_tp = NULL;
}
if (HeapTupleIsValid(index_tp))
{
ReleaseSysCache(index_tp);
index_tp = NULL;
}
index_close(index_rel, AccessShareLock);
}
}
else
{
index_close(index_rel, AccessShareLock);
}
}
}

Assert(num_compression_infos <= in_desc->natts);
Assert(num_compression_infos <= out_desc->natts);

RowCompressor row_compressor;
row_compressor_init(&row_compressor,
in_desc,
out_rel,
Expand All @@ -340,12 +488,67 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
out_desc->natts,
true /*need_bistate*/);

row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc);
if (matched_index_rel != NULL)
{
#ifdef TS_DEBUG
const char *compression_path =
GetConfigOption("timescaledb.show_compression_path_info", true, false);
if (compression_path != NULL && strcmp(compression_path, "on") == 0)
elog(INFO,
"compress_chunk_indexscan_start matched index \"%s\"",
get_rel_name(matched_index_rel->rd_id));
#endif
index_scan = index_beginscan(in_rel, matched_index_rel, GetTransactionSnapshot(), 0, 0);
slot = table_slot_create(in_rel, NULL);
index_rescan(index_scan, NULL, 0, NULL, 0);
while (index_getnext_slot(index_scan, indexscan_direction, slot))
{
slot_getallattrs(slot);
old_ctx = MemoryContextSwitchTo(row_compressor.per_row_ctx);
/* first time through */
if (first_iteration)
{
row_compressor_update_group(&row_compressor, slot);
first_iteration = false;
}
changed_groups = row_compressor_new_row_is_in_new_group(&row_compressor, slot);
compressed_row_is_full =
row_compressor.rows_compressed_into_current_value >= MAX_ROWS_PER_COMPRESSION;
if (compressed_row_is_full || changed_groups)
{
if (row_compressor.rows_compressed_into_current_value > 0)
row_compressor_flush(&row_compressor, mycid, changed_groups);
if (changed_groups)
row_compressor_update_group(&row_compressor, slot);
}

row_compressor_finish(&row_compressor);
row_compressor_append_row(&row_compressor, slot);
MemoryContextSwitchTo(old_ctx);
ExecClearTuple(slot);
}

tuplesort_end(sorted_rel);
run_analyze_on_chunk(in_rel->rd_id);
shhnwz marked this conversation as resolved.
Show resolved Hide resolved
if (row_compressor.rows_compressed_into_current_value > 0)
row_compressor_flush(&row_compressor, mycid, true);

ExecDropSingleTupleTableSlot(slot);
index_endscan(index_scan);
index_close(matched_index_rel, AccessShareLock);
}
else
{
#ifdef TS_DEBUG
const char *compression_path =
GetConfigOption("timescaledb.show_compression_path_info", true, false);
if (compression_path != NULL && strcmp(compression_path, "on") == 0)
elog(INFO, "compress_chunk_tuplesort_start");
#endif
Tuplesortstate *sorted_rel = compress_chunk_sort_relation(in_rel, n_keys, keys);
row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc);
tuplesort_end(sorted_rel);
}

row_compressor_finish(&row_compressor);
truncate_relation(in_table);

/* Recreate all indexes on out rel, we already have an exclusive lock on it,
Expand Down Expand Up @@ -416,7 +619,6 @@ static void compress_chunk_populate_sort_info_for_column(Oid table,
const ColumnCompressionInfo *column,
AttrNumber *att_nums, Oid *sort_operator,
Oid *collation, bool *nulls_first);
static void run_analyze_on_chunk(Oid chunk_relid);

static Tuplesortstate *
compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressionInfo **keys)
Expand Down Expand Up @@ -540,21 +742,6 @@ run_analyze_on_chunk(Oid chunk_relid)
ExecVacuum(NULL, &vs, true);
}

/********************
** row_compressor **
********************/

static void row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row);
static bool row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor,
TupleTableSlot *row);
static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row);
static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
bool changed_groups);

static SegmentInfo *segment_info_new(Form_pg_attribute column_attr);
static void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);
static bool segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null);

/* Find segment by index for setting the correct sequence number if
* we are trying to roll up chunks while compressing
*/
Expand Down Expand Up @@ -775,6 +962,9 @@ get_sequence_number_for_current_group(Relation table_rel, Oid index_oid,
return result + SEQUENCE_NUM_GAP;
}

/********************
** row_compressor **
********************/
/* num_compression_infos is the number of columns we will write to in the compressed table */
static void
row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc,
Expand Down