Skip to content

Commit

Permalink
Index support for compress chunk
Browse files Browse the repository at this point in the history
It allows to override tuplesort with indexscan
if compression setting keys matches with Index keys.
Moreover this feature has Enable/Disable Toggle.
To Disable from the client use the following command,
SET timescaledb.enable_compression_indexscan = 'OFF'
  • Loading branch information
shhnwz committed Dec 5, 2022
1 parent 1a806e2 commit 33b8b38
Show file tree
Hide file tree
Showing 6 changed files with 1,406 additions and 24 deletions.
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ bool ts_guc_enable_now_constify = true;
TSDLLEXPORT bool ts_guc_enable_transparent_decompression = true;
bool ts_guc_enable_per_data_node_queries = true;
bool ts_guc_enable_async_append = true;
TSDLLEXPORT bool ts_guc_enable_compression_indexscan = true;
TSDLLEXPORT bool ts_guc_enable_skip_scan = true;
int ts_guc_max_open_chunks_per_insert = 10;
int ts_guc_max_cached_chunks_per_hypertable = 10;
Expand Down Expand Up @@ -373,6 +374,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.enable_compression_indexscan",
"Enable compression to take indexscan path",
"Enable indexscan during compression, if matching index is found",
&ts_guc_enable_compression_indexscan,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomEnumVariable("timescaledb.remote_data_fetcher",
"Set remote data fetcher type",
"Pick data fetcher type based on type of queries you plan to run "
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ extern TSDLLEXPORT bool ts_guc_enable_client_ddl_on_data_nodes;
extern TSDLLEXPORT char *ts_guc_ssl_dir;
extern TSDLLEXPORT char *ts_guc_passfile;
extern TSDLLEXPORT bool ts_guc_enable_remote_explain;
extern TSDLLEXPORT bool ts_guc_enable_compression_indexscan;

typedef enum DataFetcherType
{
Expand Down
239 changes: 215 additions & 24 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include <utils/syscache.h>
#include <utils/tuplesort.h>
#include <utils/typcache.h>

#include <catalog/pg_am.h>
#include <utils.h>

#include "compat/compat.h"
Expand All @@ -49,7 +49,7 @@
#include "segment_meta.h"
#include "ts_catalog/hypertable_compression.h"
#include "ts_catalog/catalog.h"

#include "guc.h"
#include <nodes/print.h>

#define MAX_ROWS_PER_COMPRESSION 1000
Expand Down Expand Up @@ -165,6 +165,17 @@ static void row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompr
static void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Tuplesortstate *sorted_rel, TupleDesc sorted_desc);
static void row_compressor_finish(RowCompressor *row_compressor);
static void row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row);
static bool row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor,
TupleTableSlot *row);
static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row);
static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
bool changed_groups);

static SegmentInfo *segment_info_new(Form_pg_attribute column_attr);
static void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);
static bool segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null);
static void run_analyze_on_chunk(Oid chunk_relid);

/********************
** compress_chunk **
Expand Down Expand Up @@ -300,6 +311,18 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
int num_compression_infos)
{
int n_keys;
ListCell *lc;
int indexscan_direction = NoMovementScanDirection;
List *in_rel_index_oids;
Relation matched_index_rel = NULL;
TupleTableSlot *slot;
IndexScanDesc index_scan;
bool first_iteration = true;
bool changed_groups, compressed_row_is_full;
MemoryContext old_ctx;
CommandId mycid = GetCurrentCommandId(true);
HeapTuple in_table_tp = NULL, index_tp = NULL;
Form_pg_attribute in_table_attr_tp, index_attr_tp;
const ColumnCompressionInfo **keys;
CompressionStats cstat;

Expand All @@ -323,14 +346,139 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column

TupleDesc in_desc = RelationGetDescr(in_rel);
TupleDesc out_desc = RelationGetDescr(out_rel);
in_rel_index_oids = RelationGetIndexList(in_rel);
int i = 0;
/* Before calling row compressor relation should be segmented and sorted as per
* compress_segmentby and compress_orderby column/s configured in ColumnCompressionInfo.
* Cost of sorting can be mitigated if we find an existing BTREE index defined for
* uncompressed chunk otherwise expensive tuplesort will come into play.
*
* The following code is trying to find an existing index that
* matches the ColumnCompressionInfo so that we can skip sequential scan and
* tuplesort.
*
* Matching Criteria for Each IndexAtt[i] and ColumnCompressionInfo Keys[i]
* ========================================================================
* a) Index attnum must match with ColumnCompressionInfo Key {keys[i]}.
* b) Index attOption(ASC/DESC and NULL_FIRST) can be mapped with ColumnCompressionInfo
* orderby_asc and null_first.
*
* BTREE Indexes Ordering
* =====================
* a) ASC[Null_Last] ==> [1]->[2]->NULL
* b) [Null_First]ASC ==> NULL->[1]->[2]
* c) DSC[Null_Last] ==> [2]->[1]->NULL
* d) [Null_First]DSC ==> NULL->[2]->[1]
*/
if (ts_guc_enable_compression_indexscan)
{
foreach (lc, in_rel_index_oids)
{
Oid index_oid = lfirst_oid(lc);
Relation index_rel = index_open(index_oid, AccessShareLock);
IndexInfo *index_info = BuildIndexInfo(index_rel);
int previous_direction = NoMovementScanDirection;
int current_direction = NoMovementScanDirection;

Tuplesortstate *sorted_rel = compress_chunk_sort_relation(in_rel, n_keys, keys);
if (n_keys <= index_info->ii_NumIndexKeyAttrs && index_info->ii_Am == BTREE_AM_OID)
{
for (i = 0; i < n_keys; i++)
{
int16 att_num = get_attnum(in_table, NameStr(keys[i]->attname));

int16 option = index_rel->rd_indoption[i];
bool index_orderby_asc = ((option & INDOPTION_DESC) == 0);
bool index_null_first = ((option & INDOPTION_NULLS_FIRST) != 0);
bool is_orderby_asc =
COMPRESSIONCOL_IS_SEGMENT_BY(keys[i]) ? true : keys[i]->orderby_asc;
bool is_null_first =
COMPRESSIONCOL_IS_SEGMENT_BY(keys[i]) ? false : keys[i]->orderby_nullsfirst;

if (att_num == 0 || index_info->ii_IndexAttrNumbers[i] != att_num)
{
break;
}

in_table_tp = SearchSysCacheAttNum(in_table, att_num);
if (!HeapTupleIsValid(in_table_tp))
elog(ERROR,
"table \"%s\" does not have column \"%s\"",
get_rel_name(in_table),
NameStr(keys[i]->attname));

index_tp = SearchSysCacheAttNum(index_oid, i + 1);
if (!HeapTupleIsValid(index_tp))
elog(ERROR,
"index \"%s\" does not have column \"%s\"",
get_rel_name(index_oid),
NameStr(keys[i]->attname));

in_table_attr_tp = (Form_pg_attribute) GETSTRUCT(in_table_tp);
index_attr_tp = (Form_pg_attribute) GETSTRUCT(index_tp);

if (index_orderby_asc == is_orderby_asc && index_null_first == is_null_first &&
in_table_attr_tp->attcollation == index_attr_tp->attcollation)
{
current_direction = ForwardScanDirection;
}
else if (index_orderby_asc != is_orderby_asc &&
index_null_first != is_null_first &&
in_table_attr_tp->attcollation == index_attr_tp->attcollation)
{
current_direction = BackwardScanDirection;
}
else
{
current_direction = NoMovementScanDirection;
break;
}

ReleaseSysCache(in_table_tp);
in_table_tp = NULL;
ReleaseSysCache(index_tp);
index_tp = NULL;
if (previous_direction == NoMovementScanDirection)
{
previous_direction = current_direction;
}
else if (previous_direction != current_direction)
{
break;
}
}

RowCompressor row_compressor;
if (n_keys == i && (previous_direction == current_direction &&
current_direction != NoMovementScanDirection))
{
matched_index_rel = index_rel;
indexscan_direction = current_direction;
break;
}
else
{
if (HeapTupleIsValid(in_table_tp))
{
ReleaseSysCache(in_table_tp);
in_table_tp = NULL;
}
if (HeapTupleIsValid(index_tp))
{
ReleaseSysCache(index_tp);
index_tp = NULL;
}
index_close(index_rel, AccessShareLock);
}
}
else
{
index_close(index_rel, AccessShareLock);
}
}
}

Assert(num_compression_infos <= in_desc->natts);
Assert(num_compression_infos <= out_desc->natts);

RowCompressor row_compressor;
row_compressor_init(&row_compressor,
in_desc,
out_rel,
Expand All @@ -340,12 +488,67 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
out_desc->natts,
true /*need_bistate*/);

row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc);
if (matched_index_rel != NULL)
{
#ifdef TS_DEBUG
const char *compression_path =
GetConfigOption("timescaledb.show_compression_path_info", true, false);
if (compression_path != NULL && strcmp(compression_path, "on") == 0)
elog(INFO,
"compress_chunk_indexscan_start matched index \"%s\"",
get_rel_name(matched_index_rel->rd_id));
#endif
index_scan = index_beginscan(in_rel, matched_index_rel, GetTransactionSnapshot(), 0, 0);
slot = table_slot_create(in_rel, NULL);
index_rescan(index_scan, NULL, 0, NULL, 0);
while (index_getnext_slot(index_scan, indexscan_direction, slot))
{
slot_getallattrs(slot);
old_ctx = MemoryContextSwitchTo(row_compressor.per_row_ctx);
/* first time through */
if (first_iteration)
{
row_compressor_update_group(&row_compressor, slot);
first_iteration = false;
}
changed_groups = row_compressor_new_row_is_in_new_group(&row_compressor, slot);
compressed_row_is_full =
row_compressor.rows_compressed_into_current_value >= MAX_ROWS_PER_COMPRESSION;
if (compressed_row_is_full || changed_groups)
{
if (row_compressor.rows_compressed_into_current_value > 0)
row_compressor_flush(&row_compressor, mycid, changed_groups);
if (changed_groups)
row_compressor_update_group(&row_compressor, slot);
}

row_compressor_finish(&row_compressor);
row_compressor_append_row(&row_compressor, slot);
MemoryContextSwitchTo(old_ctx);
ExecClearTuple(slot);
}

tuplesort_end(sorted_rel);
run_analyze_on_chunk(in_rel->rd_id);
if (row_compressor.rows_compressed_into_current_value > 0)
row_compressor_flush(&row_compressor, mycid, true);

ExecDropSingleTupleTableSlot(slot);
index_endscan(index_scan);
index_close(matched_index_rel, AccessShareLock);
}
else
{
#ifdef TS_DEBUG
const char *compression_path =
GetConfigOption("timescaledb.show_compression_path_info", true, false);
if (compression_path != NULL && strcmp(compression_path, "on") == 0)
elog(INFO, "compress_chunk_tuplesort_start");
#endif
Tuplesortstate *sorted_rel = compress_chunk_sort_relation(in_rel, n_keys, keys);
row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc);
tuplesort_end(sorted_rel);
}

row_compressor_finish(&row_compressor);
truncate_relation(in_table);

/* Recreate all indexes on out rel, we already have an exclusive lock on it,
Expand Down Expand Up @@ -416,7 +619,6 @@ static void compress_chunk_populate_sort_info_for_column(Oid table,
const ColumnCompressionInfo *column,
AttrNumber *att_nums, Oid *sort_operator,
Oid *collation, bool *nulls_first);
static void run_analyze_on_chunk(Oid chunk_relid);

static Tuplesortstate *
compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressionInfo **keys)
Expand Down Expand Up @@ -540,21 +742,6 @@ run_analyze_on_chunk(Oid chunk_relid)
ExecVacuum(NULL, &vs, true);
}

/********************
** row_compressor **
********************/

static void row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row);
static bool row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor,
TupleTableSlot *row);
static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row);
static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
bool changed_groups);

static SegmentInfo *segment_info_new(Form_pg_attribute column_attr);
static void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);
static bool segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null);

/* Find segment by index for setting the correct sequence number if
* we are trying to roll up chunks while compressing
*/
Expand Down Expand Up @@ -776,6 +963,10 @@ get_sequence_number_for_current_group(Relation table_rel, Oid index_oid,
return result + SEQUENCE_NUM_GAP;
}


/********************
** row_compressor **
********************/
/* num_compression_infos is the number of columns we will write to in the compressed table */
static void
row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc,
Expand Down

0 comments on commit 33b8b38

Please sign in to comment.