Skip to content

Commit

Permalink
Add RowsStreamingWindowBuild to avoid OOM in Window operator
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and glutenperfbot committed Apr 17, 2024
1 parent 4f990b2 commit 8657ebf
Show file tree
Hide file tree
Showing 20 changed files with 554 additions and 26 deletions.
3 changes: 2 additions & 1 deletion velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ void registerAggregateWindowFunction(const std::string& name) {
pool,
stringAllocator,
config);
});
},
{exec::ProcessingUnit::kRows, false});
}
}
} // namespace facebook::velox::exec
2 changes: 2 additions & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ add_library(
PlanNodeStats.cpp
PrefixSort.cpp
ProbeOperatorState.cpp
RowsStreamingWindowBuild.cpp
RowsStreamingWindowPartition.cpp
RowContainer.cpp
RowNumber.cpp
SortBuffer.cpp
Expand Down
94 changes: 94 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/exec/RowsStreamingWindowPartition.h"

namespace facebook::velox::exec {

RowsStreamingWindowBuild::RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}

void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
if (windowPartitions_.size() <= inputCurrentPartition_) {
windowPartitions_.push_back(std::make_shared<RowsStreamingWindowPartition>(
data_.get(),
folly::Range<char**>(nullptr, nullptr),
inversedInputChannels_,
sortKeyInfo_));
}

windowPartitions_[inputCurrentPartition_]->addNewRows(inputRows_);

if (isFinished) {
windowPartitions_[inputCurrentPartition_]->setInputRowsFinished();
inputCurrentPartition_++;
}

inputRows_.clear();
}

void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
buildNextInputOrPartition(true);
}

// Wait for the peers to be ready in single partition; these peers are the
// rows that have identical values in the ORDER BY clause.
if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_ &&
compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) {
buildNextInputOrPartition(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
}

void RowsStreamingWindowBuild::noMoreInput() {
buildNextInputOrPartition(true);
}

std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
if (outputCurrentPartition_ > 0) {
windowPartitions_[outputCurrentPartition_].reset();
}

return windowPartitions_[++outputCurrentPartition_];
}

bool RowsStreamingWindowBuild::hasNextPartition() {
return windowPartitions_.size() > 0 &&
outputCurrentPartition_ <= int(windowPartitions_.size() - 2);
}

} // namespace facebook::velox::exec
80 changes: 80 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of
/// processing window functions as rows arrive within a single partition,
/// without the need to wait for the entire partition to be ready. This approach
/// can significantly reduce memory usage, especially when a single partition
/// contains a large amount of data. It is particularly suited for optimizing
/// rank and row_number functions, as well as aggregate window functions with a
/// default frame.
class RowsStreamingWindowBuild : public WindowBuild {
public:
RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection);

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
}

void noMoreInput() override;

bool hasNextPartition() override;

std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.size() == 0 ||
outputCurrentPartition_ == windowPartitions_.size() - 1;
}

private:
void buildNextInputOrPartition(bool isFinished);

// Holds input rows within the current partition.
std::vector<char*> inputRows_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Current partition being output. Used to return the WidnowPartitions.
vector_size_t outputCurrentPartition_ = -1;

// Current partition when adding input. Used to construct WindowPartitions.
vector_size_t inputCurrentPartition_ = 0;

// Holds all the WindowPartitions.
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;
};

} // namespace facebook::velox::exec
58 changes: 58 additions & 0 deletions velox/exec/RowsStreamingWindowPartition.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/RowsStreamingWindowPartition.h"

namespace facebook::velox::exec {

RowsStreamingWindowPartition::RowsStreamingWindowPartition(
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
: WindowPartition(data, rows, inputMapping, sortKeyInfo) {
partitionStartRows_.push_back(0);
}

void RowsStreamingWindowPartition::addNewRows(std::vector<char*> rows) {
partitionStartRows_.push_back(partitionStartRows_.back() + rows.size());

sortedRows_.insert(sortedRows_.end(), rows.begin(), rows.end());
}

bool RowsStreamingWindowPartition::buildNextRows() {
if (currentPartition_ >= int(partitionStartRows_.size() - 2))
return false;

currentPartition_++;

// Erase previous rows in current partition.
if (currentPartition_ > 0) {
auto numPreviousPartitionRows = partitionStartRows_[currentPartition_] -
partitionStartRows_[currentPartition_ - 1];
data_->eraseRows(
folly::Range<char**>(sortedRows_.data(), numPreviousPartitionRows));
sortedRows_.erase(
sortedRows_.begin(), sortedRows_.begin() + numPreviousPartitionRows);
}

auto partitionSize = partitionStartRows_[currentPartition_ + 1] -
partitionStartRows_[currentPartition_];

partition_ = folly::Range(sortedRows_.data(), partitionSize);
return true;
}

} // namespace facebook::velox::exec
89 changes: 89 additions & 0 deletions velox/exec/RowsStreamingWindowPartition.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/exec/RowContainer.h"
#include "velox/exec/WindowPartition.h"

namespace facebook::velox::exec {

/// RowsStreamingWindowPartition is to facilitate RowsStreamingWindowBuild by
/// processing rows within WindowPartition in a streaming manner.
class RowsStreamingWindowPartition : public WindowPartition {
public:
RowsStreamingWindowPartition(
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>&
sortKeyInfo);

// Returns the number of rows in the current partial window partition,
// including the offset within the full partition.
vector_size_t numRows() const override {
if (currentPartition_ == -1) {
return 0;
} else {
return partition_.size() + partitionStartRows_[currentPartition_];
}
}

// Returns the starting offset of the current partial window partition within
// the full partition.
vector_size_t offsetInPartition() const override {
return partitionStartRows_[currentPartition_];
}

// Indicates support for row-level streaming processing.
bool supportRowLevelStreaming() const override {
return true;
}

// Sets the flag indicating that all input rows have been processed on the
// producer side.
void setInputRowsFinished() override {
inputRowsFinished_ = true;
}

// Adds new rows to the partition using a streaming approach on the producer
// side.
void addNewRows(std::vector<char*> rows) override;

// Builds the next set of available rows on the consumer side.
bool buildNextRows() override;

// Determines if the current partition is complete and then proceed to the
// next partition.
bool processFinished() const override {
return (
inputRowsFinished_ &&
currentPartition_ == partitionStartRows_.size() - 2);
}

private:
// Indicates whether all input rows have been added to sortedRows_
bool inputRowsFinished_ = false;

// Stores new rows added to the WindowPartition.
std::vector<char*> sortedRows_;

// Indices of the start row (in sortedRows_) of each partitial partition.
std::vector<vector_size_t> partitionStartRows_;

// Current partial partition being output.
vector_size_t currentPartition_ = -1;
};
} // namespace facebook::velox::exec
6 changes: 3 additions & 3 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
}
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
if (merge_ != nullptr) {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand All @@ -313,7 +313,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

private:
void ensureInputFits(const RowVectorPtr& input);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/StreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() {
partitionStartRows_.push_back(sortedRows_.size());
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

Expand Down Expand Up @@ -89,7 +89,7 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);

return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
Loading

0 comments on commit 8657ebf

Please sign in to comment.