Skip to content

Commit

Permalink
Add support for ScalarArrayOpExpr to compressed DML
Browse files Browse the repository at this point in the history
This will enable usage of Array operations like IN() and ANY()
in batch filtering of compressed DML.
  • Loading branch information
svenklemm committed May 10, 2024
1 parent f41cf0c commit 5f35538
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 30 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_6880
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6880 Add support for array operators to compressed DML batch filtering
130 changes: 100 additions & 30 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/tableam.h>
#include <access/htup_details.h>
#include <access/multixact.h>
#include <access/tableam.h>
#include <access/valid.h>
#include <access/xact.h>
#include <catalog/heap.h>
Expand All @@ -20,13 +20,14 @@
#include <executor/tuptable.h>
#include <libpq/pqformat.h>
#include <miscadmin.h>
#include <nodes/nodeFuncs.h>
#include <nodes/execnodes.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <nodes/print.h>
#include <optimizer/optimizer.h>
#include <parser/parsetree.h>
#include <parser/parse_coerce.h>
#include <parser/parsetree.h>
#include <replication/message.h>
#include <storage/lmgr.h>
#include <storage/predicate.h>
#include <utils/builtins.h>
Expand All @@ -35,13 +36,12 @@
#include <utils/lsyscache.h>
#include <utils/memutils.h>
#include <utils/portal.h>
#include <utils/rel.h>
#include <utils/relcache.h>
#include <utils/rel.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
#include <utils/tuplesort.h>
#include <utils/typcache.h>
#include <replication/message.h>

#include "compat/compat.h"

Expand Down Expand Up @@ -162,7 +162,8 @@ static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid,
static int create_segment_filter_scankey(RowDecompressor *decompressor,
char *segment_filter_col_name, StrategyNumber strategy,
ScanKeyData *scankeys, int num_scankeys,
Bitmapset **null_columns, Datum value, bool is_null_check);
Bitmapset **null_columns, Datum value, bool is_null_check,
bool is_array_op);
static void create_per_compressed_column(RowDecompressor *decompressor);

/********************
Expand Down Expand Up @@ -2089,7 +2090,8 @@ build_scankeys(Oid hypertable_relid, Oid out_rel, RowDecompressor *decompressor,
key_index,
null_columns,
value,
isnull);
isnull,
false);
}
if (ts_array_is_member(settings->fd.orderby, attname))
{
Expand All @@ -2108,6 +2110,7 @@ build_scankeys(Oid hypertable_relid, Oid out_rel, RowDecompressor *decompressor,
key_index,
null_columns,
value,
false,
false); /* is_null_check */
key_index = create_segment_filter_scankey(decompressor,
column_segment_max_name(index),
Expand All @@ -2116,6 +2119,7 @@ build_scankeys(Oid hypertable_relid, Oid out_rel, RowDecompressor *decompressor,
key_index,
null_columns,
value,
false,
false); /* is_null_check */
}
}
Expand All @@ -2128,7 +2132,8 @@ build_scankeys(Oid hypertable_relid, Oid out_rel, RowDecompressor *decompressor,
static int
create_segment_filter_scankey(RowDecompressor *decompressor, char *segment_filter_col_name,
StrategyNumber strategy, ScanKeyData *scankeys, int num_scankeys,
Bitmapset **null_columns, Datum value, bool is_null_check)
Bitmapset **null_columns, Datum value, bool is_null_check,
bool is_array_op)
{
AttrNumber cmp_attno = get_attnum(decompressor->in_rel->rd_id, segment_filter_col_name);
Assert(cmp_attno != InvalidAttrNumber);
Expand All @@ -2137,6 +2142,8 @@ create_segment_filter_scankey(RowDecompressor *decompressor, char *segment_filte
if (cmp_attno == InvalidAttrNumber)
return num_scankeys;

int flags = is_array_op ? SK_SEARCHARRAY : 0;

/*
* In PG versions <= 14 NULL values are always considered distinct
* from other NULL values and therefore NULLABLE multi-columnn
Expand Down Expand Up @@ -2181,7 +2188,7 @@ create_segment_filter_scankey(RowDecompressor *decompressor, char *segment_filte
return num_scankeys;

ScanKeyEntryInitialize(&scankeys[num_scankeys++],
0, /* flags */
flags,
cmp_attno,
strategy,
InvalidOid, /* No strategy subtype. */
Expand Down Expand Up @@ -2387,7 +2394,7 @@ algorithm_definition(CompressionAlgorithm algo)
#if PG14_GE
static BatchFilter *
make_batchfilter(char *column_name, StrategyNumber strategy, Oid collation, RegProcedure opcode,
Const *value, bool is_null_check, bool is_null)
Const *value, bool is_null_check, bool is_null, bool is_array_op)
{
BatchFilter *segment_filter = palloc0(sizeof(*segment_filter));

Expand All @@ -2398,6 +2405,7 @@ make_batchfilter(char *column_name, StrategyNumber strategy, Oid collation, RegP
.value = value,
.is_null_check = is_null_check,
.is_null = is_null,
.is_array_op = is_array_op,
};
namestrcpy(&segment_filter->column_name, column_name);

Expand Down Expand Up @@ -2556,19 +2564,18 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
foreach (lc, predicates)
{
Node *node = copyObject(lfirst(lc));

Var *var;
Expr *expr;
Oid collation, opno;
RegProcedure opcode;
char *column_name;

switch (nodeTag(node))
{
case T_OpExpr:
{
OpExpr *opexpr = castNode(OpExpr, node);
Oid opno;
RegProcedure opcode;
Oid collation = opexpr->inputcollid;
Expr *expr;
collation = opexpr->inputcollid;
Const *arg_value;

if (!ts_extract_expr_args(&opexpr->xpr, &var, &expr, &opno, &opcode))
Expand All @@ -2577,15 +2584,13 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
if (!IsA(expr, Const))
{
expr = (Expr *) estimate_expression_value(&root, (Node *) expr);

if (!IsA(expr, Const))
continue;
}

arg_value = castNode(Const, expr);

/* ignore system-defined attributes */
if (var->varattno <= 0)
continue;
column_name = get_attname(ch->table_id, var->varattno, false);
TypeCacheEntry *tce = lookup_type_cache(var->vartype, TYPECACHE_BTREE_OPFAMILY);
int op_strategy = get_op_opfamily_strategy(opno, tce->btree_opf);
Expand All @@ -2607,8 +2612,10 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
collation,
opcode,
arg_value,
false, /* is_null_check */
false)); /* is_null */
false, /* is_null_check */
false, /* is_null */
false /* is_array_op */
));
}
}
continue;
Expand Down Expand Up @@ -2640,8 +2647,10 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
collation,
opcode,
arg_value,
false, /* is_null_check */
false)); /* is_null */
false, /* is_null_check */
false, /* is_null */
false /* is_array_op */
));
*heap_filters = lappend(*heap_filters,
make_batchfilter(get_attname(settings->fd.relid,
max_attno,
Expand All @@ -2650,8 +2659,10 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
collation,
opcode,
arg_value,
false, /* is_null_check */
false)); /* is_null */
false, /* is_null_check */
false, /* is_null */
false /* is_array_op */
));
}
break;
case BTLessStrategyNumber:
Expand All @@ -2666,8 +2677,10 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
collation,
opcode,
arg_value,
false, /* is_null_check */
false)); /* is_null */
false, /* is_null_check */
false, /* is_null */
false /* is_array_op */
));
}
break;
case BTGreaterStrategyNumber:
Expand All @@ -2682,13 +2695,63 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
collation,
opcode,
arg_value,
false, /* is_null_check */
false)); /* is_null */
false, /* is_null_check */
false, /* is_null */
false /* is_array_op */
));
}
}
}
}
break;
case T_ScalarArrayOpExpr:
{
ScalarArrayOpExpr *sa_expr = castNode(ScalarArrayOpExpr, node);
if (!ts_extract_expr_args(&sa_expr->xpr, &var, &expr, &opno, &opcode))
continue;

if (!IsA(expr, Const))
{
expr = (Expr *) estimate_expression_value(&root, (Node *) expr);
if (!IsA(expr, Const))
continue;
}

Const *arg_value = castNode(Const, expr);
collation = sa_expr->inputcollid;

column_name = get_attname(ch->table_id, var->varattno, false);
TypeCacheEntry *tce = lookup_type_cache(var->vartype, TYPECACHE_BTREE_OPFAMILY);
int op_strategy = get_op_opfamily_strategy(opno, tce->btree_opf);
if (ts_array_is_member(settings->fd.segmentby, column_name))
{
switch (op_strategy)
{
case BTEqualStrategyNumber:
case BTLessStrategyNumber:
case BTLessEqualStrategyNumber:
case BTGreaterStrategyNumber:
case BTGreaterEqualStrategyNumber:
{
/* save segment by column name and its corresponding value specified in
* WHERE */
*index_filters = lappend(*index_filters,
make_batchfilter(column_name,
op_strategy,
collation,
opcode,
arg_value,
false, /* is_null_check */
false, /* is_null */
true /* is_array_op */
));
}
}
continue;
}

break;
}
case T_NullTest:
{
NullTest *ntest = (NullTest *) node;
Expand All @@ -2709,7 +2772,9 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
InvalidOid,
NULL,
true, /* is_null_check */
ntest->nulltesttype == IS_NULL)); /* is_null */
ntest->nulltesttype == IS_NULL, /* is_null */
false /* is_array_op */
));
if (ntest->nulltesttype == IS_NULL)
*is_null = lappend_int(*is_null, 1);
else
Expand Down Expand Up @@ -2763,7 +2828,8 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *heap_filters,
key_index,
null_columns,
filter->value ? filter->value->constvalue : 0,
filter->is_null_check);
filter->is_null_check,
filter->is_array_op);
}
*num_scankeys = key_index;
return scankeys;
Expand Down Expand Up @@ -2937,6 +3003,10 @@ build_index_scankeys(Relation index_rel, List *index_filters, int *num_scankeys)
{
flags = SK_ISNULL | (filter->is_null ? SK_SEARCHNULL : SK_SEARCHNOTNULL);
}
if (filter->is_array_op)
{
flags |= SK_SEARCHARRAY;
}

ScanKeyEntryInitialize(&scankey[idx++],
flags,
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ typedef struct BatchFilter
/* IS NULL or IS NOT NULL */
bool is_null_check;
bool is_null;
bool is_array_op;
} BatchFilter;

extern Datum tsl_compressed_data_decompress_forward(PG_FUNCTION_ARGS);
Expand Down
Loading

0 comments on commit 5f35538

Please sign in to comment.