Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorize filters that use scalar array operations #6272

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tsl/src/nodes/decompress_chunk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/decompress_chunk.c
${CMAKE_CURRENT_SOURCE_DIR}/exec.c
${CMAKE_CURRENT_SOURCE_DIR}/planner.c
${CMAKE_CURRENT_SOURCE_DIR}/pred_vector_array.c
${CMAKE_CURRENT_SOURCE_DIR}/qual_pushdown.c
${CMAKE_CURRENT_SOURCE_DIR}/vector_predicates.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
74 changes: 62 additions & 12 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,54 @@ compute_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *ba
const int bitmap_bytes = sizeof(uint64) * ((batch_state->total_batch_rows + 63) / 64);
batch_state->vector_qual_result = palloc(bitmap_bytes);
memset(batch_state->vector_qual_result, 0xFF, bitmap_bytes);
if (batch_state->total_batch_rows % 64 != 0)
{
/*
* We have to zero out the bits for past-the-end elements in the last
* bitmap word. Since all predicates are ANDed to the result bitmap,
* we can do it here once instead of doing it in each predicate.
*/
const uint64 mask = ((uint64) -1) >> (64 - batch_state->total_batch_rows % 64);
batch_state->vector_qual_result[batch_state->total_batch_rows / 64] = mask;
}

/*
* Compute the quals.
*/
ListCell *lc;
foreach (lc, chunk_state->vectorized_quals_constified)
{
/* For now we only support "Var ? Const" predicates. */
OpExpr *oe = castNode(OpExpr, lfirst(lc));
Var *var = castNode(Var, linitial(oe->args));
Const *constnode = castNode(Const, lsecond(oe->args));
/*
* For now we support "Var ? Const" predicates and
* ScalarArrayOperations.
*/
List *args = NULL;
RegProcedure vector_const_opcode = InvalidOid;
ScalarArrayOpExpr *saop = NULL;
OpExpr *opexpr = NULL;
if (IsA(lfirst(lc), ScalarArrayOpExpr))
{
saop = castNode(ScalarArrayOpExpr, lfirst(lc));
args = saop->args;
vector_const_opcode = get_opcode(saop->opno);
}
else
{
opexpr = castNode(OpExpr, lfirst(lc));
args = opexpr->args;
vector_const_opcode = get_opcode(opexpr->opno);
}

/*
* Find the vector_const predicate.
*/
VectorPredicate *vector_const_predicate = get_vector_const_predicate(vector_const_opcode);
Assert(vector_const_predicate != NULL);

/*
* Find the compressed column referred to by the Var.
*/
Var *var = castNode(Var, linitial(args));
DecompressChunkColumnDescription *column_description = NULL;
int column_index = 0;
for (; column_index < chunk_state->num_total_columns; column_index++)
Expand Down Expand Up @@ -273,20 +306,37 @@ compute_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *ba
predicate_result = &default_value_predicate_result;
}

/* Find and compute the predicate. */
void (*predicate)(const ArrowArray *, Datum, uint64 *restrict) =
get_vector_const_predicate(get_opcode(oe->opno));
Ensure(predicate != NULL,
"vectorized predicate not found for postgres predicate %d",
get_opcode(oe->opno));

/*
* The vectorizable predicates should be STRICT, so we shouldn't see null
* constants here.
*/
Const *constnode = castNode(Const, lsecond(args));
Ensure(!constnode->constisnull, "vectorized predicate called for a null value");

predicate(vector, constnode->constvalue, predicate_result);
/*
* At last, compute the predicate.
*/
if (saop)
{
vector_array_predicate(vector_const_predicate,
saop->useOr,
vector,
constnode->constvalue,
predicate_result);
}
else
{
vector_const_predicate(vector, constnode->constvalue, predicate_result);
}

/* Account for nulls which shouldn't pass the predicate. */
const size_t n = vector->length;
const size_t n_words = (n + 63) / 64;
const uint64 *restrict validity = (uint64 *restrict) vector->buffers[0];
for (size_t i = 0; i < n_words; i++)
{
predicate_result[i] &= validity[i];
}

/* Process the result. */
if (column_values->arrow == NULL)
Expand Down
12 changes: 10 additions & 2 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,16 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
}
}

OpExpr *opexpr = castNode(OpExpr, constified);
Ensure(IsA(lsecond(opexpr->args), Const),
List *args;
if (IsA(constified, OpExpr))
{
args = castNode(OpExpr, constified)->args;
}
else
{
args = castNode(ScalarArrayOpExpr, constified)->args;
}
Ensure(IsA(lsecond(args), Const),
"failed to evaluate runtime constant in vectorized filter");
chunk_state->vectorized_quals_constified =
lappend(chunk_state->vectorized_quals_constified, constified);
Expand Down
92 changes: 67 additions & 25 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,47 +431,70 @@
static Node *
make_vectorized_qual(DecompressChunkPath *path, Node *qual)
{
/* Only simple "Var op Const" binary predicates for now. */
if (!IsA(qual, OpExpr))
/*
* Currently we vectorize some "Var op Const" binary predicates,
* and scalar array operations with these predicates.
*/
if (!IsA(qual, OpExpr) && !IsA(qual, ScalarArrayOpExpr))
{
return NULL;
}

OpExpr *o = castNode(OpExpr, qual);
List *args = NIL;
OpExpr *opexpr = NULL;
Oid opno = InvalidOid;
ScalarArrayOpExpr *saop = NULL;
if (IsA(qual, OpExpr))
{
opexpr = castNode(OpExpr, qual);
args = opexpr->args;
opno = opexpr->opno;
}
else
{
saop = castNode(ScalarArrayOpExpr, qual);
args = saop->args;
opno = saop->opno;
}

if (list_length(o->args) != 2)
if (list_length(args) != 2)
{
return NULL;
}

if (IsA(lsecond(o->args), Var))
if (opexpr && IsA(lsecond(args), Var))
{
/* Try to commute the operator if the constant is on the right. */
Oid commutator_opno = get_commutator(o->opno);
if (OidIsValid(commutator_opno))
/*
* Try to commute the operator if we have Var on the right.
*/
opno = get_commutator(opno);
if (!OidIsValid(opno))
{
o = (OpExpr *) copyObject(o);
o->opno = commutator_opno;
/*
* opfuncid is a cache, we can set it to InvalidOid like the
* CommuteOpExpr() does.
*/
o->opfuncid = InvalidOid;
o->args = list_make2(lsecond(o->args), linitial(o->args));
return NULL;
}

opexpr = (OpExpr *) copyObject(opexpr);
opexpr->opno = opno;
/*
* opfuncid is a cache, we can set it to InvalidOid like the
* CommuteOpExpr() does.
*/
opexpr->opfuncid = InvalidOid;
args = list_make2(lsecond(args), linitial(args));
opexpr->args = args;
}

/*
* We can vectorize the operation where the left side is a Var and the right
* side is a constant or can be evaluated to a constant at run time (e.g.
* contains stable functions).
*/
if (!IsA(linitial(o->args), Var) || is_not_runtime_constant(lsecond(o->args)))
if (!IsA(linitial(args), Var) || is_not_runtime_constant(lsecond(args)))
{
return NULL;
}

Var *var = castNode(Var, linitial(o->args));
Var *var = castNode(Var, linitial(args));
Assert((Index) var->varno == path->info->chunk_rel->relid);

/*
Expand All @@ -485,13 +508,26 @@
return NULL;
}

Oid opcode = get_opcode(o->opno);
if (get_vector_const_predicate(opcode))
Oid opcode = get_opcode(opno);
if (!get_vector_const_predicate(opcode))
{
return (Node *) o;
return NULL;
}

return NULL;
#if PG14_GE
if (saop)
{
if (saop->hashfuncid)
{
/*
* Don't vectorize if the planner decided to build a hash table.
*/
return NULL;

Check warning on line 525 in tsl/src/nodes/decompress_chunk/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/planner.c#L525

Added line #L525 was not covered by tests
}
}
#endif

return opexpr ? (Node *) opexpr : (Node *) saop;
}

/*
Expand Down Expand Up @@ -861,10 +897,16 @@
{
elog(ERROR, "debug: encountered vector quals when they are disabled");
}
else if (ts_guc_debug_require_vector_qual == RVQ_Only &&
list_length(decompress_plan->scan.plan.qual) > 0)
else if (ts_guc_debug_require_vector_qual == RVQ_Only)
{
elog(ERROR, "debug: encountered non-vector quals when they are disabled");
if (list_length(decompress_plan->scan.plan.qual) > 0)
{
elog(ERROR, "debug: encountered non-vector quals when they are disabled");
}

Check warning on line 905 in tsl/src/nodes/decompress_chunk/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/planner.c#L905

Added line #L905 was not covered by tests
if (list_length(vectorized_quals) == 0)
{
elog(ERROR, "debug: did not encounter vector quals when they are required");
}

Check warning on line 909 in tsl/src/nodes/decompress_chunk/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/planner.c#L909

Added line #L909 was not covered by tests
}
#endif

Expand Down