Skip to content

Commit

Permalink
Fix min/max(x, n) (facebookincubator#8311)
Browse files Browse the repository at this point in the history
Summary:

Velox has an optimization for Window operation with incremental aggregation when there are peer 
rows with the same frame. In this situation, the aggregation result is only computed at the first row 
of the peer and the rest rows simply copy this result. This optimization assumes that results of 
incremental aggregation in Window operation on peer rows should be the same. However, min/max(x, n) 
in Velox breaks this assumption because their extractValues() method causes the accumulator to 
be cleared, making the peer rows after the first row expect a different result. This diff fixes min/max(x, n) 
to make the extraction method not clear the accumulator. The behavior after the fix also align with 
Presto's.

This diff also adds a method testIncrementalAggregation in testAggregations to check that extractValues() 
doesn't change accumulator afterwards for all aggregation functions. After this fix, only min_by/max_by(x, y, n) 
doesn't pass testIncrementalAggregation.

This diff fixes facebookincubator#8138.

Differential Revision: D52638334
  • Loading branch information
kagamiori authored and facebook-github-bot committed Jan 11, 2024
1 parent 265cd94 commit bf002e6
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 39 deletions.
168 changes: 145 additions & 23 deletions velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/exec/tests/utils/TempFilePath.h"
#include "velox/expression/Expr.h"
#include "velox/expression/SignatureBinder.h"

using facebook::velox::exec::Spiller;
Expand Down Expand Up @@ -621,6 +622,37 @@ void AggregationTestBase::testAggregations(
testWithTableScan);
}

namespace {

std::vector<VectorPtr> extractArgColumns(
const core::CallTypedExprPtr& aggregateExpr,
const RowVectorPtr& input,
memory::MemoryPool* pool) {
auto type = input->type()->as<TypeKind::ROW>();
std::vector<VectorPtr> columns;
for (const auto& arg : aggregateExpr->inputs()) {
if (auto field =
dynamic_cast<const core::FieldAccessTypedExpr*>(arg.get())) {
auto channel = type.getChildIdx(field->name());
columns.push_back(input->childAt(channel));
}
if (dynamic_cast<const core::ConstantTypedExpr*>(arg.get())) {
auto constant = dynamic_cast<const core::ConstantTypedExpr*>(arg.get());
columns.push_back(constant->toConstantVector(pool));
}
if (auto lambda = dynamic_cast<const core::LambdaTypedExpr*>(arg.get())) {
for (const auto& name : lambda->signature()->names()) {
if (auto captureIndex = type.getChildIdxIfExists(name)) {
columns.push_back(input->childAt(captureIndex.value()));
}
}
}
}
return columns;
}

} // namespace

RowVectorPtr AggregationTestBase::validateStreamingInTestAggregations(
const std::function<void(PlanBuilder&)>& makeSource,
const std::vector<std::string>& aggregates,
Expand Down Expand Up @@ -921,6 +953,10 @@ void AggregationTestBase::testAggregations(
assertResults,
config);

if (testIncremental_) {
testIncrementalAggregation(makeSource, aggregates, config);
}

if (testWithTableScan) {
SCOPED_TRACE("Test reading input from table scan");
testReadFromFiles(
Expand Down Expand Up @@ -975,6 +1011,102 @@ VectorPtr AggregationTestBase::testStreaming(
config);
}

namespace {

constexpr int kRowSizeOffset = 8;
constexpr int kOffset = kRowSizeOffset + 8;

std::unique_ptr<exec::Aggregate> createAggregateFunction(
const std::string& functionName,
const std::vector<TypePtr>& inputTypes,
HashStringAllocator& allocator,
const std::unordered_map<std::string, std::string>& config) {
auto [intermediateType, finalType] = getResultTypes(functionName, inputTypes);
core::QueryConfig queryConfig({config});
auto func = exec::Aggregate::create(
functionName,
core::AggregationNode::Step::kSingle,
inputTypes,
finalType,
queryConfig);
func->setAllocator(&allocator);
func->setOffsets(kOffset, 0, 1, kRowSizeOffset);

VELOX_CHECK(intermediateType->equivalent(
*func->intermediateType(functionName, inputTypes)));
VELOX_CHECK(finalType->equivalent(*func->resultType()));

return func;
}

} // namespace

void AggregationTestBase::testIncrementalAggregation(
const std::function<void(exec::test::PlanBuilder&)>& makeSource,
const std::vector<std::string>& aggregates,
const std::unordered_map<std::string, std::string>& config) {
PlanBuilder builder(pool());
makeSource(builder);
auto data = AssertQueryBuilder(builder.planNode())
.configs(config)
.copyResults(pool());
auto inputSize = data->size();
if (inputSize == 0) {
return;
}

auto& aggregationNode = static_cast<const core::AggregationNode&>(
*builder.singleAggregation({}, aggregates).planNode());
for (int i = 0; i < aggregationNode.aggregates().size(); ++i) {
const auto& aggregate = aggregationNode.aggregates()[i];
const auto& aggregateExpr = aggregate.call;
const auto& functionName = aggregateExpr->name();
auto input = extractArgColumns(aggregateExpr, data, pool());

HashStringAllocator allocator(pool());
std::vector<core::LambdaTypedExprPtr> lambdas;
for (const auto& arg : aggregate.call->inputs()) {
if (auto lambda =
std::dynamic_pointer_cast<const core::LambdaTypedExpr>(arg)) {
lambdas.push_back(lambda);
}
}
auto queryCtxConfig = config;
auto func = createAggregateFunction(
functionName, aggregate.rawInputTypes, allocator, config);
auto queryCtx = std::make_shared<core::QueryCtx>(
nullptr, core::QueryConfig{queryCtxConfig});

std::shared_ptr<core::ExpressionEvaluator> expressionEvaluator;
if (!lambdas.empty()) {
expressionEvaluator = std::make_shared<exec::SimpleExpressionEvaluator>(
queryCtx.get(), allocator.pool());
func->setLambdaExpressions(lambdas, expressionEvaluator);
}

std::vector<char> group(kOffset + func->accumulatorFixedWidthSize());
std::vector<char*> groups(inputSize, group.data());
std::vector<vector_size_t> indices(1, 0);
func->initializeNewGroups(groups.data(), indices);

// Extract values from the same accumulator twice and expect results to be
// the same.
auto result1 = BaseVector::create(func->resultType(), 1, pool());
auto result2 = BaseVector::create(func->resultType(), 1, pool());
func->addSingleGroupRawInput(
group.data(), SelectivityVector(inputSize), input, false);
func->extractValues(groups.data(), 1, &result1);
func->extractValues(groups.data(), 1, &result2);

velox::test::assertEqualVectors(result1, result2);

// Destroy accumulators to avoid memory leak.
if (func->accumulatorUsesExternalMemory()) {
func->destroy(folly::Range(groups.data(), 1));
}
}
}

VectorPtr AggregationTestBase::testStreaming(
const std::string& functionName,
bool testGlobal,
Expand All @@ -983,28 +1115,16 @@ VectorPtr AggregationTestBase::testStreaming(
const std::vector<VectorPtr>& rawInput2,
vector_size_t rawInput2Size,
const std::unordered_map<std::string, std::string>& config) {
constexpr int kRowSizeOffset = 8;
constexpr int kOffset = kRowSizeOffset + 8;
std::vector<TypePtr> rawInputTypes(rawInput1.size());
std::transform(
rawInput1.begin(),
rawInput1.end(),
rawInputTypes.begin(),
[](const VectorPtr& vec) { return vec->type(); });

HashStringAllocator allocator(pool());
std::vector<TypePtr> rawInputTypes;
for (auto& vec : rawInput1) {
rawInputTypes.push_back(vec->type());
}
auto [intermediateType, finalType] =
getResultTypes(functionName, rawInputTypes);
auto createFunction = [&, &finalType = finalType] {
core::QueryConfig queryConfig({config});
auto func = exec::Aggregate::create(
functionName,
core::AggregationNode::Step::kSingle,
rawInputTypes,
finalType,
queryConfig);
func->setAllocator(&allocator);
func->setOffsets(kOffset, 0, 1, kRowSizeOffset);
return func;
};
auto func = createFunction();
auto func =
createAggregateFunction(functionName, rawInputTypes, allocator, config);
int maxRowCount = std::max(rawInput1Size, rawInput2Size);
std::vector<char> group(kOffset + func->accumulatorFixedWidthSize());
std::vector<char*> groups(maxRowCount, group.data());
Expand All @@ -1017,6 +1137,7 @@ VectorPtr AggregationTestBase::testStreaming(
func->addRawInput(
groups.data(), SelectivityVector(rawInput1Size), rawInput1, false);
}
auto intermediateType = func->intermediateType(functionName, rawInputTypes);
auto intermediate = BaseVector::create(intermediateType, 1, pool());
func->extractAccumulators(groups.data(), 1, &intermediate);
// Destroy accumulators to avoid memory leak.
Expand All @@ -1025,7 +1146,8 @@ VectorPtr AggregationTestBase::testStreaming(
}

// Create a new function picking up the intermediate result.
auto func2 = createFunction();
auto func2 =
createAggregateFunction(functionName, rawInputTypes, allocator, config);
func2->initializeNewGroups(groups.data(), indices);
if (testGlobal) {
func2->addSingleGroupIntermediateResults(
Expand All @@ -1042,7 +1164,7 @@ VectorPtr AggregationTestBase::testStreaming(
func2->addRawInput(
groups.data(), SelectivityVector(rawInput2Size), rawInput2, false);
}
auto result = BaseVector::create(finalType, 1, pool());
auto result = BaseVector::create(func2->resultType(), 1, pool());
func2->extractValues(groups.data(), 1, &result);
// Destroy accumulators to avoid memory leak.
if (func2->accumulatorUsesExternalMemory()) {
Expand Down
16 changes: 16 additions & 0 deletions velox/functions/lib/aggregates/tests/utils/AggregationTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,16 @@ class AggregationTestBase : public exec::test::OperatorTestBase {
testStreaming_ = true;
}

void disableTestIncremental() {
testIncremental_ = false;
}

/// Whether testStreaming should be called in testAggregations.
bool testStreaming_{true};

/// Whether testIncrementalAggregation should be called in testAggregations.
bool testIncremental_{true};

private:
// Test streaming use case where raw inputs are added after intermediate
// results. Return the result of aggregates if successful.
Expand All @@ -265,6 +272,15 @@ class AggregationTestBase : public exec::test::OperatorTestBase {
vector_size_t rawInput2Size,
const std::unordered_map<std::string, std::string>& config = {});

// Test to ensure that when extractValues() is called twice on the same
// accumulator, the extracted results are the same. This ensure that for
// incremental window aggregation, we can copy the result to avoid
// re-extraction for peer rows.
void testIncrementalAggregation(
const std::function<void(exec::test::PlanBuilder&)>& makeSource,
const std::vector<std::string>& aggregates,
const std::unordered_map<std::string, std::string>& config = {});

void testAggregationsImpl(
std::function<void(exec::test::PlanBuilder&)> makeSource,
const std::vector<std::string>& groupingKeys,
Expand Down
34 changes: 18 additions & 16 deletions velox/functions/prestosql/aggregates/MinMaxAggregates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,17 +545,17 @@ std::pair<vector_size_t*, vector_size_t*> rawOffsetAndSizes(
template <typename T, typename Compare>
struct MinMaxNAccumulator {
int64_t n{0};
std::priority_queue<T, std::vector<T, StlAllocator<T>>, Compare> topValues;
std::vector<T, StlAllocator<T>> heapValues;

explicit MinMaxNAccumulator(HashStringAllocator* allocator)
: topValues{Compare{}, StlAllocator<T>(allocator)} {}
: heapValues{StlAllocator<T>(allocator)} {}

int64_t getN() const {
return n;
}

size_t size() const {
return topValues.size();
return heapValues.size();
}

void checkAndSetN(DecodedVector& decodedN, vector_size_t row) {
Expand Down Expand Up @@ -584,25 +584,27 @@ struct MinMaxNAccumulator {
}

void compareAndAdd(T value, Compare& comparator) {
if (topValues.size() < n) {
topValues.push(value);
if (heapValues.size() < n) {
heapValues.push_back(value);
std::push_heap(heapValues.begin(), heapValues.end(), comparator);
} else {
const auto& topValue = topValues.top();
const auto& topValue = heapValues.front();
if (comparator(value, topValue)) {
topValues.pop();
topValues.push(value);
std::pop_heap(heapValues.begin(), heapValues.end(), comparator);
heapValues.back() = value;
std::push_heap(heapValues.begin(), heapValues.end(), comparator);
}
}
}

/// Moves all values from 'topValues' into 'rawValues' buffer. The queue of
/// 'topValues' will be empty after this call.
void extractValues(T* rawValues, vector_size_t offset) {
const vector_size_t size = topValues.size();
for (auto i = size - 1; i >= 0; --i) {
rawValues[offset + i] = topValues.top();
topValues.pop();
/// Copy all values from 'topValues' into 'rawValues' buffer. The heap remains
/// unchanged after the call.
void extractValues(T* rawValues, vector_size_t offset, Compare& comparator) {
std::sort_heap(heapValues.begin(), heapValues.end(), comparator);
for (int64_t i = heapValues.size() - 1; i >= 0; --i) {
rawValues[offset + i] = heapValues[i];
}
std::make_heap(heapValues.begin(), heapValues.end(), comparator);
}
};

Expand Down Expand Up @@ -775,7 +777,7 @@ class MinMaxNAggregateBase : public exec::Aggregate {
if (rawNs != nullptr) {
rawNs[i] = accumulator->n;
}
accumulator->extractValues(rawValues, offset);
accumulator->extractValues(rawValues, offset, comparator_);

offset += size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,7 @@ class MinMaxByNTest : public AggregationTestBase {
AggregationTestBase::SetUp();
AggregationTestBase::allowInputShuffle();
AggregationTestBase::enableTestStreaming();
AggregationTestBase::disableTestIncremental();
}
};

Expand Down
35 changes: 35 additions & 0 deletions velox/functions/prestosql/aggregates/tests/MinMaxTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,4 +766,39 @@ TEST_F(MinMaxNTest, double) {
testNumericGroupBy<double>();
}

TEST_F(MinMaxNTest, incrementalWindow) {
// SELECT
// c0, c1, c2, c3,
// max(c0, c1) over (partition by c2 order by c3 asc)
// FROM (
// VALUES
// (1, 10, false, 0),
// (2, 10, false, 1)
// ) AS t(c0, c1, c2, c3)
auto data = makeRowVector({
makeFlatVector<int64_t>({1, 2}),
makeFlatVector<int64_t>({10, 10}),
makeFlatVector<bool>({false, false}),
makeFlatVector<int64_t>({0, 1}),
});

auto plan =
PlanBuilder()
.values({data})
.window({"max(c0, c1) over (partition by c2 order by c3 asc)"})
.planNode();

auto result = AssertQueryBuilder(plan).copyResults(pool());

// Expected result: {1, 10, false, 0, [1]}, {2, 10, false, 1, [2, 1]}.
auto expected = makeRowVector({
makeFlatVector<int64_t>({1, 2}),
makeFlatVector<int64_t>({10, 10}),
makeFlatVector<bool>({false, false}),
makeFlatVector<int64_t>({0, 1}),
makeArrayVector<int64_t>({{1}, {2, 1}}),
});
facebook::velox::test::assertEqualVectors(expected, result);
}

} // namespace

0 comments on commit bf002e6

Please sign in to comment.