Skip to content

Commit

Permalink
Fix merge join memory expand during subplan filter rescan
Browse files Browse the repository at this point in the history
  • Loading branch information
hezuojiao authored and ob-robot committed Aug 28, 2023
1 parent 0c5ffaf commit fbc4711
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
47 changes: 28 additions & 19 deletions src/sql/engine/join/ob_merge_join_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,28 +963,22 @@ int ObMergeJoinOp::ChildBatchFetcher::init(
int ObMergeJoinOp::ChildBatchFetcher::get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(0 != backup_rows_cnt_)) {
const int64_t remain_backup_rows = backup_rows_cnt_ - backup_rows_used_;
if (OB_UNLIKELY(0 != remain_backup_rows)) {
if (OB_UNLIKELY(backup_datums_.count() != all_exprs_->count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("store datums cnt and child output cnt not equal", K(ret),
K(backup_datums_.count()), K(all_exprs_->count()));
} else {
const int64_t restore_cnt = MIN(max_row_cnt, backup_rows_cnt_);
const int64_t restore_cnt = MIN(max_row_cnt, remain_backup_rows);
for (int64_t i = 0; i < backup_datums_.count(); i++) {
ObDatum *datum = all_exprs_->at(i)->locate_batch_datums(merge_join_op_.eval_ctx_);
MEMCPY(datum, backup_datums_.at(i), sizeof(ObDatum) * restore_cnt);
MEMCPY(datum, backup_datums_.at(i) + backup_rows_used_, sizeof(ObDatum) * restore_cnt);
}
brs_.size_ = restore_cnt;
brs_.end_ = false;
brs_.skip_ = NULL;
backup_rows_cnt_ -= restore_cnt;
if (OB_LIKELY(0 == backup_rows_cnt_)) {
backup_datums_.reset();
} else {
for (int64_t i = 0; i < backup_datums_.count(); i++) {
backup_datums_.at(i) = backup_datums_.at(i) + restore_cnt;
}
}
backup_rows_used_ += restore_cnt;
}
} else {
if (brs_.end_) {
Expand Down Expand Up @@ -1021,21 +1015,36 @@ int ObMergeJoinOp::ChildBatchFetcher::backup_remain_rows()
if (OB_UNLIKELY(cur_idx_ >= brs_.size_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("no remain rows", K(ret), K(cur_idx_), K(brs_.size_));
} else {
int64_t alloc_size = sizeof(ObDatum) * (brs_.size_ - cur_idx_);
} else if (backup_datums_.empty()) {
int64_t alloc_size = sizeof(ObDatum) * merge_join_op_.spec_.max_batch_size_;
for (int64_t i = 0; i < all_exprs_->count() && OB_SUCC(ret); i++) {
backup_rows_cnt_ = 0;
ObDatum *datum = NULL;
ObDatumVector src_datum = all_exprs_->at(i)->locate_expr_datumvector(merge_join_op_.eval_ctx_);
if (OB_ISNULL(datum = static_cast<ObDatum *>(allocator.alloc(alloc_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else if (OB_FAIL(backup_datums_.push_back(datum))) {
LOG_WARN("push back failed", K(ret));
} else {
for (int64_t j = cur_idx_; j < brs_.size_ && OB_SUCC(ret); j++) {
if (!brs_.skip_->contain(j)) {
datum[backup_rows_cnt_++] = *src_datum.at(j);
}
}
}
if (OB_SUCC(ret)) {
if (OB_UNLIKELY(all_exprs_->count() != backup_datums_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("count mismatch", K(ret), K(all_exprs_->count()), K(backup_datums_.count()));
} else {
for (int64_t i = 0; i < all_exprs_->count() && OB_SUCC(ret); i++) {
backup_rows_cnt_ = 0;
backup_rows_used_ = 0;
ObDatumVector src_datum = all_exprs_->at(i)->locate_expr_datumvector(merge_join_op_.eval_ctx_);
ObDatum *datum = backup_datums_.at(i);
if (OB_ISNULL(datum)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("backup datums memory is null", K(ret), K(i), K(all_exprs_->count()));
} else {
for (int64_t j = cur_idx_; j < brs_.size_ && OB_SUCC(ret); j++) {
if (!brs_.skip_->contain(j)) {
datum[backup_rows_cnt_++] = *src_datum.at(j);
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/sql/engine/join/ob_merge_join_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class ObMergeJoinOp: public ObJoinOp
cur_idx_(0), brs_(), batch_size_(0), child_(NULL),
match_groups_(match_groups), merge_join_op_(merge_join_op),
all_exprs_(NULL), datum_store_(), backup_datums_(),
backup_rows_cnt_(0), brs_holder_(),
backup_rows_cnt_(0), backup_rows_used_(0), brs_holder_(),
equal_param_idx_(allocator)
{}
int init(const uint64_t tenant_id, bool is_left, ObOperator *child,
Expand Down Expand Up @@ -242,6 +242,7 @@ class ObMergeJoinOp: public ObJoinOp
datum_store_.reuse();
backup_datums_.reuse();
backup_rows_cnt_ = 0;
backup_rows_used_ = 0;
brs_holder_.reset();
}
// for destroy
Expand All @@ -254,6 +255,7 @@ class ObMergeJoinOp: public ObJoinOp
datum_store_.reset();
backup_datums_.reset();
backup_rows_cnt_ = 0;
backup_rows_used_ = 0;
brs_holder_.reset();
}
int64_t cur_idx_;
Expand All @@ -269,6 +271,7 @@ class ObMergeJoinOp: public ObJoinOp
// We need store these rows and output first, then get batch from child and output directly.
ObSEArray<ObDatum *, 256> backup_datums_;
int64_t backup_rows_cnt_;
int64_t backup_rows_used_;
ObBatchResultHolder brs_holder_;

common::ObFixedArray<int64_t, common::ObIAllocator> equal_param_idx_;
Expand Down

0 comments on commit fbc4711

Please sign in to comment.