Skip to content

Commit

Permalink
squash it
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm committed Feb 27, 2024
1 parent 671633c commit e3095ae
Show file tree
Hide file tree
Showing 22 changed files with 1,410 additions and 1,372 deletions.
104 changes: 61 additions & 43 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ build_columndefs(CompressionSettings *settings, Oid src_relid)
{
Oid compresseddata_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
ArrayType *segmentby = settings->fd.segmentby;
List *column_defs = NIL;
List *compressed_column_defs = NIL;
List *segmentby_column_defs = NIL;

Relation rel = table_open(src_relid, AccessShareLock);
TupleDesc tupdesc = rel->rd_att;
Expand All @@ -137,71 +138,88 @@ build_columndefs(CompressionSettings *settings, Oid src_relid)
COMPRESSION_COLUMN_METADATA_PREFIX);

bool is_segmentby = ts_array_is_member(segmentby, NameStr(attr->attname));
bool is_orderby = ts_array_is_member(settings->fd.orderby, NameStr(attr->attname));

if (is_segmentby)
{
attroid = attr->atttypid; /*segment by columns have original type */
typmod = attr->atttypmod;
collid = attr->attcollation;
}

if (!OidIsValid(attroid))
{
attroid = compresseddata_oid; /* default type for column */
}
coldef = makeColumnDef(NameStr(attr->attname), attroid, typmod, collid);
column_defs = lappend(column_defs, coldef);
}

table_close(rel, AccessShareLock);
coldef = makeColumnDef(NameStr(attr->attname), attroid, typmod, collid);

/* additional metadata columns. */

/* count of the number of uncompressed rows */
column_defs = lappend(column_defs,
makeColumnDef(COMPRESSION_COLUMN_METADATA_COUNT_NAME,
INT4OID,
-1 /* typemod */,
0 /*collation*/));
/* sequence_num column */
column_defs = lappend(column_defs,
makeColumnDef(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME,
INT4OID,
-1 /* typemod */,
0 /*collation*/));

if (settings->fd.orderby)
{
Datum datum;
bool isnull;
int16 index = 1;
ArrayIterator it = array_create_iterator(settings->fd.orderby, 0, NULL);
while (array_iterate(it, &datum, &isnull))
/*
* Put the metadata columns before the compressed columns, because they
* are accessed before decompression.
*/
if (is_orderby)
{
AttrNumber col_attno = get_attnum(settings->fd.relid, TextDatumGetCString(datum));
Oid col_type = get_atttype(settings->fd.relid, col_attno);
TypeCacheEntry *type = lookup_type_cache(col_type, TYPECACHE_LT_OPR);
int index = ts_array_position(settings->fd.orderby, NameStr(attr->attname));
TypeCacheEntry *type = lookup_type_cache(attr->atttypid, TYPECACHE_LT_OPR);

if (!OidIsValid(type->lt_opr))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("invalid ordering column type %s", format_type_be(col_type)),
errmsg("invalid ordering column type %s", format_type_be(attr->atttypid)),
errdetail("Could not identify a less-than operator for the type.")));

/* segment_meta min and max columns */
column_defs = lappend(column_defs,
makeColumnDef(column_segment_min_name(index),
col_type,
-1 /* typemod */,
0 /*collation*/));
column_defs = lappend(column_defs,
makeColumnDef(column_segment_max_name(index),
col_type,
-1 /* typemod */,
0 /*collation*/));
index++;
compressed_column_defs = lappend(compressed_column_defs,
makeColumnDef(column_segment_min_name(index),
attr->atttypid,
attr->atttypmod,
attr->attcollation));
compressed_column_defs = lappend(compressed_column_defs,
makeColumnDef(column_segment_max_name(index),
attr->atttypid,
attr->atttypmod,
attr->attcollation));
}

if (is_segmentby)
{
segmentby_column_defs = lappend(segmentby_column_defs, coldef);
}
else
{
compressed_column_defs = lappend(compressed_column_defs, coldef);
}
}
return column_defs;

/*
* Add the metadata columns. Count is always accessed, so put it first.
* Sequence number should probably go after all orderby columns, but we
* put it here for simplicity.
*/
List *all_column_defs = list_make2(makeColumnDef(COMPRESSION_COLUMN_METADATA_COUNT_NAME,
INT4OID,
-1 /* typemod */,
0 /*collation*/),
makeColumnDef(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME,
INT4OID,
-1 /* typemod */,
0 /*collation*/));

/*
* Then, put all segmentby columns. They are likely to be used in filters
* before decompression.
*/
all_column_defs = list_concat(all_column_defs, segmentby_column_defs);

/*
* Then, put all the compressed columns.
*/
all_column_defs = list_concat(all_column_defs, compressed_column_defs);

table_close(rel, AccessShareLock);

return all_column_defs;
}

/* use this api for the case when you add a single column to a table that already has
Expand Down
10 changes: 9 additions & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,18 @@ cost_batch_sorted_merge(PlannerInfo *root, CompressionInfo *compression_info,
segmentby_attno =
bms_next_member(compression_info->chunk_segmentby_attnos, segmentby_attno))
{
char *colname = get_attname(compression_info->chunk_rte->relid,
segmentby_attno,
/* missing_ok = */ false);
AttrNumber compressed_attno = get_attnum(compression_info->compressed_rte->relid, colname);
Ensure(compressed_attno != InvalidAttrNumber,
"segmentby column %s not found in compressed chunk %d",
colname,
compression_info->compressed_rte->relid);
Var *var = palloc(sizeof(Var));
*var = (Var){ .xpr.type = T_Var,
.varno = compression_info->compressed_rel->relid,
.varattno = segmentby_attno };
.varattno = compressed_attno };
segmentby_groupexprs = lappend(segmentby_groupexprs, var);
}
const double open_batches_estimated = estimate_num_groups_compat(root,
Expand Down
38 changes: 25 additions & 13 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,21 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)
/* Two columns are decompressed, the column that needs to be aggregated and the count column */
Assert(dcontext->num_total_columns == 2);

CompressionColumnDescription *column_description = &dcontext->template_columns[0];
Assert(dcontext->template_columns[1].type == COUNT_COLUMN);
CompressionColumnDescription *value_column_description = &dcontext->template_columns[0];
CompressionColumnDescription *count_column_description = &dcontext->template_columns[1];
if (count_column_description->type != COUNT_COLUMN)
{
/*
* The count and value columns can go in different order based on their
* order in compressed chunk, so check which one we are seeing.
*/
CompressionColumnDescription *tmp = value_column_description;
value_column_description = count_column_description;
count_column_description = tmp;
}
Assert(value_column_description->type == COMPRESSED_COLUMN ||
value_column_description->type == SEGMENTBY_COLUMN);
Assert(count_column_description->type == COUNT_COLUMN);

/* Get a free batch slot */
const int new_batch_index = batch_array_get_unused_slot(&batch_queue->batch_array);
Expand Down Expand Up @@ -492,14 +505,12 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)

int64 result_sum = 0;

if (column_description->type == SEGMENTBY_COLUMN)
if (value_column_description->type == SEGMENTBY_COLUMN)
{
/*
* To calculate the sum for a segment by value, we need to multiply the value of the segment
* by column with the number of compressed tuples in this batch.
*/
CompressionColumnDescription *column_description_count = &dcontext->template_columns[1];

while (true)
{
TupleTableSlot *compressed_slot =
Expand All @@ -516,13 +527,13 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)

bool isnull_value, isnull_elements;
Datum value = slot_getattr(compressed_slot,
column_description->compressed_scan_attno,
value_column_description->compressed_scan_attno,
&isnull_value);

/* We have multiple compressed tuples for this segment by value. Get number of
* compressed tuples */
Datum elements = slot_getattr(compressed_slot,
column_description_count->compressed_scan_attno,
count_column_description->compressed_scan_attno,
&isnull_elements);

if (!isnull_value && !isnull_elements)
Expand Down Expand Up @@ -553,10 +564,10 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)
MemoryContextSwitchTo(old_mctx);
}
}
else if (column_description->type == COMPRESSED_COLUMN)
else if (value_column_description->type == COMPRESSED_COLUMN)
{
Assert(dcontext->enable_bulk_decompression);
Assert(column_description->bulk_decompression_supported);
Assert(value_column_description->bulk_decompression_supported);
Assert(list_length(aggref->args) == 1);

while (true)
Expand All @@ -574,8 +585,9 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)

/* Decompress data */
bool isnull;
Datum value =
slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull);
Datum value = slot_getattr(compressed_slot,
value_column_description->compressed_scan_attno,
&isnull);

Ensure(isnull == false, "got unexpected NULL attribute value from compressed batch");

Expand All @@ -591,13 +603,13 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)

DecompressAllFunction decompress_all =
tsl_get_decompress_all_function(header->compression_algorithm,
column_description->typid);
value_column_description->typid);
Assert(decompress_all != NULL);

MemoryContextSwitchTo(dcontext->bulk_decompression_context);

arrow = decompress_all(PointerGetDatum(header),
column_description->typid,
value_column_description->typid,
batch_state->per_batch_context);

Assert(arrow != NULL);
Expand Down
Loading

0 comments on commit e3095ae

Please sign in to comment.