Skip to content

Commit

Permalink
Index support for compress chunk
Browse files Browse the repository at this point in the history
It allows to override tuplesort with indexscan
if compression setting keys matches with Index keys.
  • Loading branch information
shhnwz committed Oct 27, 2022
1 parent efbd8a9 commit d85c7e1
Showing 1 changed file with 293 additions and 20 deletions.
313 changes: 293 additions & 20 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include <utils/syscache.h>
#include <utils/tuplesort.h>
#include <utils/typcache.h>

#include <catalog/pg_am.h>
#include <utils.h>

#include "compat/compat.h"
Expand All @@ -43,6 +43,7 @@
#include "deltadelta.h"
#include "dictionary.h"
#include "gorilla.h"
#include "debug_point.h"
#include "ts_catalog/compression_chunk_size.h"
#include "create.h"
#include "custom_type_cache.h"
Expand Down Expand Up @@ -161,6 +162,17 @@ static void row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompr
static void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Tuplesortstate *sorted_rel, TupleDesc sorted_desc);
static void row_compressor_finish(RowCompressor *row_compressor);
static void row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row);
static bool row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor,
TupleTableSlot *row);
static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row);
static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
bool changed_groups);

static SegmentInfo *segment_info_new(Form_pg_attribute column_attr);
static void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);
static bool segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null);
static void run_analyze_on_chunk(Oid chunk_relid);

/********************
** compress_chunk **
Expand Down Expand Up @@ -275,6 +287,16 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
int num_compression_infos)
{
int n_keys;
ListCell *lc;
int indexscan_direction = NoMovementScanDirection;
List *in_rel_index_oids;
Relation matched_index_rel = NULL;
TupleTableSlot *slot;
IndexScanDesc index_scan;
bool first_iteration = true;
bool changed_groups, compressed_row_is_full;
MemoryContext old_ctx;
CommandId mycid = GetCurrentCommandId(true);
const ColumnCompressionInfo **keys;
CompressionStats cstat;

Expand All @@ -298,14 +320,232 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column

TupleDesc in_desc = RelationGetDescr(in_rel);
TupleDesc out_desc = RelationGetDescr(out_rel);
in_rel_index_oids = RelationGetIndexList(in_rel);
int i = 0;
/* We create index on the compressed chunk based on ColumnCompressionInfo,
* So, before calling row compressor we need to sort the rows as per ColumnCompressionInfo.
* Cost of sorting can be mitigated if we find and existing BTREE index defined for
* uncompressed chunk otherwise expensive tuplesort will come into play.
*
* This interesting piece of code is trying to find any existing index that
* matches the ColumnCompressionInfo just to override tuplesort with indexscan.
*
* Matching Criteria for Each IndexAtt[i] and ColumnCompressionInfo Keys[i]
* ========================================================================
* a) Index attnum must match with ColumnCompressionInfo Key {keys[i]}.
* b) Index attOption(ASC/DESC and NULL_FIRST) can be mapped with ColumnCompressionInfo
* orderby_asc and null_first.
*
* BTREE Indexes Ordering
* =====================
* a) ASC[Null_Last] ==> [1]->[2]->NULL
* b) [Null_First]ASC ==> NULL->[1]->[2]
* c) DSC[Null_Last] ==> [2]->[1]->NULL
* d) [Null_First]DSC ==> NULL->[2]->[1]
*/
foreach (lc, in_rel_index_oids)
{
Oid index_oid = lfirst_oid(lc);
Relation index_rel = index_open(index_oid, AccessShareLock);
IndexInfo *index_info = BuildIndexInfo(index_rel);
int previous_direction = NoMovementScanDirection;
int current_direction = NoMovementScanDirection;

Tuplesortstate *sorted_rel = compress_chunk_sort_relation(in_rel, n_keys, keys);

RowCompressor row_compressor;
if (n_keys <= index_info->ii_NumIndexKeyAttrs && index_info->ii_Am == BTREE_AM_OID)
{
for (i = 0; i < n_keys; i++)
{
int16 att_num = get_attnum(in_table, NameStr(keys[i]->attname));
int16 option = index_rel->rd_indoption[i];
bool is_orderby_asc = keys[i]->orderby_asc;
bool is_null_first = keys[i]->orderby_nullsfirst;
if (index_info->ii_IndexAttrNumbers[i] == att_num)
{
if (COMPRESSIONCOL_IS_SEGMENT_BY(keys[i]))
{
if (option)
{
if ((option & INDOPTION_DESC) != 0)
{
current_direction = BackwardScanDirection;
}
else
{
current_direction = ForwardScanDirection;
}
}
else
{
current_direction = ForwardScanDirection;
}
}
else if (COMPRESSIONCOL_IS_ORDER_BY(keys[i]))
{
if (option)
{
/* Case NullFirst(DESC) e.g. [NULL->2->1]*/
if (((option & INDOPTION_DESC) != 0) &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(is_orderby_asc && is_null_first))
{
current_direction = BackwardScanDirection;
}
else if ((option & INDOPTION_DESC) != 0 &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(!is_orderby_asc && is_null_first))
{
current_direction = ForwardScanDirection;
}
else if ((option & INDOPTION_DESC) != 0 &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(is_orderby_asc && !is_null_first))
{
current_direction = BackwardScanDirection;
}
else if ((option & INDOPTION_DESC) != 0 &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(!is_orderby_asc && !is_null_first))
{
/* Index can't be picked*/
current_direction = NoMovementScanDirection;
break;
}
/* Case NullFirst(ASC) e.g. [NULL->1->2]*/
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(is_orderby_asc && is_null_first))
{
current_direction = ForwardScanDirection;
}
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(!is_orderby_asc && is_null_first))
{
/* Index can't be picked*/
current_direction = NoMovementScanDirection;
break;
}
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(is_orderby_asc && !is_null_first))
{
/* Index can't be picked*/
current_direction = NoMovementScanDirection;
break;
}
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) != 0) &&
(!is_orderby_asc && !is_null_first))
{
current_direction = BackwardScanDirection;
}
/* Case NullLast(DESC) e.g. [2->1->NULL]*/
else if ((option & INDOPTION_DESC) != 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(is_orderby_asc && is_null_first))
{
current_direction = BackwardScanDirection;
}
else if ((option & INDOPTION_DESC) != 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(!is_orderby_asc && is_null_first))
{
/* Index can't be picked*/
current_direction = NoMovementScanDirection;
break;
}
else if ((option & INDOPTION_DESC) != 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(is_orderby_asc && !is_null_first))
{
/* Index can't be picked*/
current_direction = NoMovementScanDirection;
break;
}
else if ((option & INDOPTION_DESC) != 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(!is_orderby_asc && !is_null_first))
{
current_direction = BackwardScanDirection;
}
/* Case NullLast(ASC) e.g. [2->1->NULL]*/
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(is_orderby_asc && is_null_first))
{
/* Index can't be picked*/
current_direction = NoMovementScanDirection;
break;
}
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(!is_orderby_asc && is_null_first))
{
current_direction = BackwardScanDirection;
}
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(is_orderby_asc && !is_null_first))
{
current_direction = ForwardScanDirection;
}
else if ((option & INDOPTION_DESC) == 0 &&
((option & INDOPTION_NULLS_FIRST) == 0) &&
(!is_orderby_asc && !is_null_first))
{
/* Index can't be picked*/
current_direction = NoMovementScanDirection;
break;
}
}
else
{
if (is_orderby_asc)
{
current_direction = ForwardScanDirection;
}
else
{
current_direction = BackwardScanDirection;
}
}
}

if (previous_direction == NoMovementScanDirection)
{
previous_direction = current_direction;
}
else if (previous_direction != current_direction)
{
break;
}
}
else
{
break;
}
}
if (n_keys == i && (previous_direction == current_direction &&
current_direction != NoMovementScanDirection))
{
matched_index_rel = index_rel;
indexscan_direction = current_direction;
break;
}
else
{
index_close(index_rel, AccessShareLock);
}
}
else
{
index_close(index_rel, AccessShareLock);
}
}

Assert(num_compression_infos <= in_desc->natts);
Assert(num_compression_infos <= out_desc->natts);

RowCompressor row_compressor;
row_compressor_init(&row_compressor,
in_desc,
out_rel,
Expand All @@ -315,12 +555,57 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
out_desc->natts,
true /*need_bistate*/);

row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc);
if (matched_index_rel != NULL)
{
ereport(DEBUG5, errmsg("compress_chunk_indexscan_start"));
DEBUG_WAITPOINT("compress_chunk_indexscan_start");
index_scan = index_beginscan(in_rel, matched_index_rel, GetTransactionSnapshot(), 0, 0);
slot = table_slot_create(in_rel, NULL);
index_rescan(index_scan, NULL, 0, NULL, 0);
while (index_getnext_slot(index_scan, indexscan_direction, slot))
{
slot_getallattrs(slot);
old_ctx = MemoryContextSwitchTo(row_compressor.per_row_ctx);
/* first time through */
if (first_iteration)
{
row_compressor_update_group(&row_compressor, slot);
first_iteration = false;
}
changed_groups = row_compressor_new_row_is_in_new_group(&row_compressor, slot);
compressed_row_is_full =
row_compressor.rows_compressed_into_current_value >= MAX_ROWS_PER_COMPRESSION;
if (compressed_row_is_full || changed_groups)
{
if (row_compressor.rows_compressed_into_current_value > 0)
row_compressor_flush(&row_compressor, mycid, changed_groups);
if (changed_groups)
row_compressor_update_group(&row_compressor, slot);
}

row_compressor_finish(&row_compressor);
row_compressor_append_row(&row_compressor, slot);
MemoryContextSwitchTo(old_ctx);
ExecClearTuple(slot);
}

tuplesort_end(sorted_rel);
run_analyze_on_chunk(in_rel->rd_id);
if (row_compressor.rows_compressed_into_current_value > 0)
row_compressor_flush(&row_compressor, mycid, true);

ExecDropSingleTupleTableSlot(slot);
index_endscan(index_scan);
index_close(matched_index_rel, AccessShareLock);
}
else
{
ereport(DEBUG5, errmsg("compress_chunk_tuplesort_start"));
DEBUG_WAITPOINT("compress_chunk_tuplesort_start");
Tuplesortstate *sorted_rel = compress_chunk_sort_relation(in_rel, n_keys, keys);
row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc);
tuplesort_end(sorted_rel);
}

row_compressor_finish(&row_compressor);
truncate_relation(in_table);

/* Recreate all indexes on out rel, we already have an exclusive lock on it,
Expand Down Expand Up @@ -391,7 +676,6 @@ static void compress_chunk_populate_sort_info_for_column(Oid table,
const ColumnCompressionInfo *column,
AttrNumber *att_nums, Oid *sort_operator,
Oid *collation, bool *nulls_first);
static void run_analyze_on_chunk(Oid chunk_relid);

static Tuplesortstate *
compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressionInfo **keys)
Expand Down Expand Up @@ -519,17 +803,6 @@ run_analyze_on_chunk(Oid chunk_relid)
** row_compressor **
********************/

static void row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row);
static bool row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor,
TupleTableSlot *row);
static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row);
static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
bool changed_groups);

static SegmentInfo *segment_info_new(Form_pg_attribute column_attr);
static void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);
static bool segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null);

/* num_compression_infos is the number of columns we will write to in the compressed table */
static void
row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc,
Expand Down

0 comments on commit d85c7e1

Please sign in to comment.