Skip to content

Commit

Permalink
[BugFix] Fix them mem statistics bug of MemoryScratchSink (backport S…
Browse files Browse the repository at this point in the history
…tarRocks#30751) (StarRocks#30779)

Co-authored-by: trueeyu <lxhhust350@qq.com>
  • Loading branch information
mergify[bot] and trueeyu committed Sep 12, 2023
1 parent 1d2d64f commit 9e9dfcf
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ StatusOr<ChunkPtr> MemoryScratchSinkOperator::pull_chunk(RuntimeState* state) {
}

Status MemoryScratchSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
// Same as ResultSinkOperator, The memory of the output result set should not be counted in the query memory,
// otherwise it will cause memory statistics errors.
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(nullptr);
if (nullptr == chunk || 0 == chunk->num_rows()) {
return Status::OK();
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/memory_scratch_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@

#include "common/logging.h"
#include "exprs/expr.h"
#include "runtime/current_thread.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/starrocks_column_to_arrow.h"
#include "util/date_func.h"

namespace starrocks {

Expand Down Expand Up @@ -97,6 +97,9 @@ Status MemoryScratchSink::prepare(RuntimeState* state) {
}

Status MemoryScratchSink::send_chunk(RuntimeState* state, Chunk* chunk) {
// Same as ResultSinkOperator, The memory of the output result set should not be counted in the query memory,
// otherwise it will cause memory statistics errors.
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(nullptr);
if (nullptr == chunk || 0 == chunk->num_rows()) {
return Status::OK();
}
Expand Down
13 changes: 9 additions & 4 deletions be/src/util/arrow/starrocks_column_to_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,13 @@ Status convert_chunk_to_arrow_batch(Chunk* chunk, std::vector<ExprContext*>& _ou
int result_num_column = _output_expr_ctxs.size();
std::vector<std::shared_ptr<arrow::Array>> arrays(result_num_column);

size_t num_rows = chunk->num_rows();
for (auto i = 0; i < result_num_column; ++i) {
ASSIGN_OR_RETURN(ColumnPtr column, _output_expr_ctxs[i]->evaluate(chunk))
Expr* expr = _output_expr_ctxs[i]->root();
if (column->is_constant()) {
column = ColumnHelper::unfold_const_column(expr->type(), chunk->num_rows(), column);
// Don't modify the column of src chunk, otherwise the memory statistics of query is invalid.
column = ColumnHelper::copy_and_unfold_const_column(expr->type(), column->is_nullable(), column, num_rows);
}
auto& array = arrays[i];
ColumnToArrowArrayConverter converter(column, pool, expr->type().type, array);
Expand All @@ -340,7 +342,7 @@ Status convert_chunk_to_arrow_batch(Chunk* chunk, std::vector<ExprContext*>& _ou
return Status::InvalidArgument(arrow_st.ToString());
}
}
*result = arrow::RecordBatch::Make(schema, chunk->num_rows(), std::move(arrays));
*result = arrow::RecordBatch::Make(schema, num_rows, std::move(arrays));
return Status::OK();
}

Expand All @@ -354,10 +356,13 @@ Status convert_chunk_to_arrow_batch(Chunk* chunk, const std::vector<const TypeDe

std::vector<std::shared_ptr<arrow::Array>> arrays(slot_types.size());

size_t num_rows = chunk->num_rows();
for (auto i = 0; i < slot_types.size(); ++i) {
auto column = chunk->get_column_by_slot_id(slot_ids[i]);
if (column->is_constant()) {
column = ColumnHelper::unfold_const_column(*slot_types[i], chunk->num_rows(), column);
// Don't modify the column of src chunk, otherwise the memory statistics of query is invalid.
column =
ColumnHelper::copy_and_unfold_const_column(*slot_types[i], column->is_nullable(), column, num_rows);
}
auto& array = arrays[i];
ColumnToArrowArrayConverter converter(column, pool, slot_types[i]->type, array);
Expand All @@ -366,7 +371,7 @@ Status convert_chunk_to_arrow_batch(Chunk* chunk, const std::vector<const TypeDe
return Status::InvalidArgument(arrow_st.ToString());
}
}
*result = arrow::RecordBatch::Make(schema, chunk->num_rows(), std::move(arrays));
*result = arrow::RecordBatch::Make(schema, num_rows, std::move(arrays));
return Status::OK();
}
} // namespace starrocks

0 comments on commit 9e9dfcf

Please sign in to comment.