diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 3a1c520551a4..8d3d0d01f49a 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -264,8 +264,17 @@ func (ds *ServerImpl) setupFlow( var evalCtx *tree.EvalContext var leafTxn *kv.Txn + var onFlowCleanup func() if localState.EvalContext != nil { evalCtx = localState.EvalContext + // We're about to mutate the evalCtx and we want to restore its original + // state once the flow cleans up. Note that we could have made a copy of + // the whole evalContext, but that isn't free, so we choose to restore + // the original state in order to avoid performance regressions. + origMon := evalCtx.Mon + onFlowCleanup = func() { + evalCtx.Mon = origMon + } evalCtx.Mon = monitor } else { if localState.IsLocal { @@ -328,7 +337,7 @@ func (ds *ServerImpl) setupFlow( // itself when the vectorize mode needs to be changed because we would need // to restore the original value which can have data races under stress. isVectorized := req.EvalContext.SessionData.VectorizeMode != sessiondatapb.VectorizeOff - f := newFlow(flowCtx, ds.flowRegistry, syncFlowConsumer, localState.LocalProcs, isVectorized) + f := newFlow(flowCtx, ds.flowRegistry, syncFlowConsumer, localState.LocalProcs, isVectorized, onFlowCleanup) opt := flowinfra.FuseNormally if localState.IsLocal { // If there's no remote flows, fuse everything. This is needed in order for @@ -345,6 +354,9 @@ func (ds *ServerImpl) setupFlow( // and finish the span manually. monitor.Stop(ctx) sp.Finish() + if onFlowCleanup != nil { + onFlowCleanup() + } ctx = tracing.ContextWithSpan(ctx, nil) return ctx, nil, err } @@ -445,8 +457,9 @@ func newFlow( syncFlowConsumer execinfra.RowReceiver, localProcessors []execinfra.LocalProcessor, isVectorized bool, + onFlowCleanup func(), ) flowinfra.Flow { - base := flowinfra.NewFlowBase(flowCtx, flowReg, syncFlowConsumer, localProcessors) + base := flowinfra.NewFlowBase(flowCtx, flowReg, syncFlowConsumer, localProcessors, onFlowCleanup) if isVectorized { return colflow.NewVectorizedFlow(base) } diff --git a/pkg/sql/distsql/vectorized_panic_propagation_test.go b/pkg/sql/distsql/vectorized_panic_propagation_test.go index 9f95e9a12ab9..579335cd6af3 100644 --- a/pkg/sql/distsql/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsql/vectorized_panic_propagation_test.go @@ -47,6 +47,7 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { nil, /* flowReg */ nil, /* syncFlowConsumer */ nil, /* localProcessors */ + nil, /* onFlowCleanup */ ) flow := colflow.NewVectorizedFlow(base) diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 18e782433fea..c5be157b62b4 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -161,6 +161,8 @@ type FlowBase struct { // - outboxes waitGroup sync.WaitGroup + onFlowCleanup func() + doneFn func() status flowStatus @@ -212,12 +214,14 @@ func NewFlowBase( flowReg *FlowRegistry, syncFlowConsumer execinfra.RowReceiver, localProcessors []execinfra.LocalProcessor, + onFlowCleanup func(), ) *FlowBase { base := &FlowBase{ FlowCtx: flowCtx, flowRegistry: flowReg, syncFlowConsumer: syncFlowConsumer, localProcessors: localProcessors, + onFlowCleanup: onFlowCleanup, } base.status = FlowNotStarted return base @@ -466,6 +470,9 @@ func (f *FlowBase) Cleanup(ctx context.Context) { } f.status = FlowFinished f.ctxCancel() + if f.onFlowCleanup != nil { + f.onFlowCleanup() + } if f.doneFn != nil { f.doneFn() }