Skip to content

Commit

Permalink
executor: fix mppIterator memory leak when got error (#53106)
Browse files Browse the repository at this point in the history
close #52406
  • Loading branch information
guo-shaoge committed May 9, 2024
1 parent f0a7447 commit 9bf3500
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
3 changes: 0 additions & 3 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,6 @@ func (c *localMppCoordinator) cancelMppTasks() {
func (c *localMppCoordinator) receiveResults(req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta, bo *backoff.Backoffer) {
stream, err := c.sessionCtx.GetMPPClient().EstablishMPPConns(kv.EstablishMPPConnsParam{Ctx: bo.GetCtx(), Req: req, TaskMeta: taskMeta})
if err != nil {
if stream != nil {
stream.Close()
}
// if NeedTriggerFallback is true, we return timeout to trigger tikv's fallback
if c.needTriggerFallback {
c.sendError(derr.ErrTiFlashServerTimeout)
Expand Down
11 changes: 9 additions & 2 deletions pkg/store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,23 @@ func (c *MPPClient) EstablishMPPConns(param kv.EstablishMPPConnsParam) (*tikvrpc
// We don't need to process any special error. When we meet errors, just let it fail.
rpcResp, err := c.store.GetTiKVClient().SendRequest(param.Ctx, req.Meta.GetAddress(), wrappedReq, TiFlashReadTimeoutUltraLong)

var stream *tikvrpc.MPPStreamResponse
if rpcResp != nil && rpcResp.Resp != nil {
stream = rpcResp.Resp.(*tikvrpc.MPPStreamResponse)
}

if err != nil {
if stream != nil {
stream.Close()
}
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion))
if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler {
c.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
return nil, err
}

streamResponse := rpcResp.Resp.(*tikvrpc.MPPStreamResponse)
return streamResponse, nil
return stream, nil
}

// CheckVisibility checks if it is safe to read using given ts.
Expand Down

0 comments on commit 9bf3500

Please sign in to comment.