Skip to content

Commit

Permalink
Fix min/max(x, n) (facebookincubator#8311)
Browse files Browse the repository at this point in the history
Summary:

Velox has an optimization for Window operation with incremental aggregation when there are peer
rows with the same frame. In this situation, the aggregation result is only computed at the first row
of the peer and the rest rows simply copy this result. This optimization assumes that results of
incremental aggregation in Window operation on peer rows should be the same. However, min/max(x, n)
in Velox breaks this assumption because their extractValues() method causes the accumulator to
be cleared, making the peer rows after the first row expect a different result. This diff fixes min/max(x, n)
to make the extraction method not clear the accumulator. The behavior after the fix also align with
Presto's.

This diff also adds a method testIncrementalAggregation in testAggregations to check that extractValues()
doesn't change accumulator afterwards for all aggregation functions. After this fix, only min_by/max_by(x, y, n)
doesn't pass testIncrementalAggregation.

This diff fixes facebookincubator#8138.

Reviewed By: mbasmanova

Differential Revision: D52638334
  • Loading branch information
kagamiori authored and facebook-github-bot committed Jan 22, 2024
1 parent d543b54 commit 48f2bf1
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
34 changes: 18 additions & 16 deletions velox/functions/prestosql/aggregates/MinMaxAggregates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,17 +545,17 @@ std::pair<vector_size_t*, vector_size_t*> rawOffsetAndSizes(
template <typename T, typename Compare>
struct MinMaxNAccumulator {
int64_t n{0};
std::priority_queue<T, std::vector<T, StlAllocator<T>>, Compare> topValues;
std::vector<T, StlAllocator<T>> heapValues;

explicit MinMaxNAccumulator(HashStringAllocator* allocator)
: topValues{Compare{}, StlAllocator<T>(allocator)} {}
: heapValues{StlAllocator<T>(allocator)} {}

int64_t getN() const {
return n;
}

size_t size() const {
return topValues.size();
return heapValues.size();
}

void checkAndSetN(DecodedVector& decodedN, vector_size_t row) {
Expand Down Expand Up @@ -584,25 +584,27 @@ struct MinMaxNAccumulator {
}

void compareAndAdd(T value, Compare& comparator) {
if (topValues.size() < n) {
topValues.push(value);
if (heapValues.size() < n) {
heapValues.push_back(value);
std::push_heap(heapValues.begin(), heapValues.end(), comparator);
} else {
const auto& topValue = topValues.top();
const auto& topValue = heapValues.front();
if (comparator(value, topValue)) {
topValues.pop();
topValues.push(value);
std::pop_heap(heapValues.begin(), heapValues.end(), comparator);
heapValues.back() = value;
std::push_heap(heapValues.begin(), heapValues.end(), comparator);
}
}
}

/// Moves all values from 'topValues' into 'rawValues' buffer. The queue of
/// 'topValues' will be empty after this call.
void extractValues(T* rawValues, vector_size_t offset) {
const vector_size_t size = topValues.size();
for (auto i = size - 1; i >= 0; --i) {
rawValues[offset + i] = topValues.top();
topValues.pop();
/// Copy all values from 'topValues' into 'rawValues' buffer. The heap remains
/// unchanged after the call.
void extractValues(T* rawValues, vector_size_t offset, Compare& comparator) {
std::sort_heap(heapValues.begin(), heapValues.end(), comparator);
for (int64_t i = heapValues.size() - 1; i >= 0; --i) {
rawValues[offset + i] = heapValues[i];
}
std::make_heap(heapValues.begin(), heapValues.end(), comparator);
}
};

Expand Down Expand Up @@ -775,7 +777,7 @@ class MinMaxNAggregateBase : public exec::Aggregate {
if (rawNs != nullptr) {
rawNs[i] = accumulator->n;
}
accumulator->extractValues(rawValues, offset);
accumulator->extractValues(rawValues, offset, comparator_);

offset += size;
}
Expand Down
34 changes: 33 additions & 1 deletion velox/functions/prestosql/aggregates/tests/MinMaxTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,6 @@ class MinMaxNTest : public functions::aggregate::test::AggregationTestBase {
void SetUp() override {
AggregationTestBase::SetUp();
allowInputShuffle();
AggregationTestBase::disableTestIncremental();
}

template <typename T>
Expand Down Expand Up @@ -767,4 +766,37 @@ TEST_F(MinMaxNTest, double) {
testNumericGroupBy<double>();
}

TEST_F(MinMaxNTest, incrementalWindow) {
// SELECT
// c0, c1, c2, c3,
// max(c0, c1) over (partition by c2 order by c3 asc)
// FROM (
// VALUES
// (1, 10, false, 0),
// (2, 10, false, 1)
// ) AS t(c0, c1, c2, c3)
auto data = makeRowVector({
makeFlatVector<int64_t>({1, 2}),
makeFlatVector<int64_t>({10, 10}),
makeFlatVector<bool>({false, false}),
makeFlatVector<int64_t>({0, 1}),
});

auto plan =
PlanBuilder()
.values({data})
.window({"max(c0, c1) over (partition by c2 order by c3 asc)"})
.planNode();

// Expected result: {1, 10, false, 0, [1]}, {2, 10, false, 1, [2, 1]}.
auto expected = makeRowVector({
makeFlatVector<int64_t>({1, 2}),
makeFlatVector<int64_t>({10, 10}),
makeFlatVector<bool>({false, false}),
makeFlatVector<int64_t>({0, 1}),
makeArrayVector<int64_t>({{1}, {2, 1}}),
});
AssertQueryBuilder(plan).assertResults(expected);
}

} // namespace

0 comments on commit 48f2bf1

Please sign in to comment.