Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
distsql: restore EvalCtx.Mon on the flow cleanup
In `setupFlow`, if we're setting up a flow on the gateway, we're using
`LocalState` to save on deserialization of some state. Notably, we pass
the eval context that we used during the physical planning. That eval
context can be mutated (in particular, we're updating its `Mon` field to
the "flow" memory monitor), and previously this could cause issues when
automatically retrying stats collection jobs (possibly there could be
other issues).

This commit introduces a callback to restore the local eval context to
its original state which is done on the flow cleanup.

Release note (bug fix): Previously, table stats collection issued via
`ANALYZE` statement or via `CREATE STATISTICS` statement without
specifying `AS OF SYSTEM TIME` option could run into
`flow: memory budget exceeded`, and this has been fixed.

Release justification: fix to a long standing bug.
  • Loading branch information
yuzefovich committed Aug 30, 2021
1 parent 6859359 commit 034c189
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
17 changes: 15 additions & 2 deletions pkg/sql/distsql/server.go
Expand Up @@ -264,8 +264,17 @@ func (ds *ServerImpl) setupFlow(

var evalCtx *tree.EvalContext
var leafTxn *kv.Txn
var onFlowCleanup func()
if localState.EvalContext != nil {
evalCtx = localState.EvalContext
// We're about to mutate the evalCtx and we want to restore its original
// state once the flow cleans up. Note that we could have made a copy of
// the whole evalContext, but that isn't free, so we choose to restore
// the original state in order to avoid performance regressions.
origMon := evalCtx.Mon
onFlowCleanup = func() {
evalCtx.Mon = origMon
}
evalCtx.Mon = monitor
} else {
if localState.IsLocal {
Expand Down Expand Up @@ -328,7 +337,7 @@ func (ds *ServerImpl) setupFlow(
// itself when the vectorize mode needs to be changed because we would need
// to restore the original value which can have data races under stress.
isVectorized := req.EvalContext.SessionData.VectorizeMode != sessiondatapb.VectorizeOff
f := newFlow(flowCtx, ds.flowRegistry, syncFlowConsumer, localState.LocalProcs, isVectorized)
f := newFlow(flowCtx, ds.flowRegistry, syncFlowConsumer, localState.LocalProcs, isVectorized, onFlowCleanup)
opt := flowinfra.FuseNormally
if localState.IsLocal {
// If there's no remote flows, fuse everything. This is needed in order for
Expand All @@ -345,6 +354,9 @@ func (ds *ServerImpl) setupFlow(
// and finish the span manually.
monitor.Stop(ctx)
sp.Finish()
if onFlowCleanup != nil {
onFlowCleanup()
}
ctx = tracing.ContextWithSpan(ctx, nil)
return ctx, nil, err
}
Expand Down Expand Up @@ -445,8 +457,9 @@ func newFlow(
syncFlowConsumer execinfra.RowReceiver,
localProcessors []execinfra.LocalProcessor,
isVectorized bool,
onFlowCleanup func(),
) flowinfra.Flow {
base := flowinfra.NewFlowBase(flowCtx, flowReg, syncFlowConsumer, localProcessors)
base := flowinfra.NewFlowBase(flowCtx, flowReg, syncFlowConsumer, localProcessors, onFlowCleanup)
if isVectorized {
return colflow.NewVectorizedFlow(base)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/vectorized_panic_propagation_test.go
Expand Up @@ -47,6 +47,7 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) {
nil, /* flowReg */
nil, /* syncFlowConsumer */
nil, /* localProcessors */
nil, /* onFlowCleanup */
)
flow := colflow.NewVectorizedFlow(base)

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/flowinfra/flow.go
Expand Up @@ -161,6 +161,8 @@ type FlowBase struct {
// - outboxes
waitGroup sync.WaitGroup

onFlowCleanup func()

doneFn func()

status flowStatus
Expand Down Expand Up @@ -212,12 +214,14 @@ func NewFlowBase(
flowReg *FlowRegistry,
syncFlowConsumer execinfra.RowReceiver,
localProcessors []execinfra.LocalProcessor,
onFlowCleanup func(),
) *FlowBase {
base := &FlowBase{
FlowCtx: flowCtx,
flowRegistry: flowReg,
syncFlowConsumer: syncFlowConsumer,
localProcessors: localProcessors,
onFlowCleanup: onFlowCleanup,
}
base.status = FlowNotStarted
return base
Expand Down Expand Up @@ -466,6 +470,9 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
}
f.status = FlowFinished
f.ctxCancel()
if f.onFlowCleanup != nil {
f.onFlowCleanup()
}
if f.doneFn != nil {
f.doneFn()
}
Expand Down

0 comments on commit 034c189

Please sign in to comment.