Skip to content

Commit

Permalink
cache: replace context.WithCancel with WithCancelCause
Browse files Browse the repository at this point in the history
Keep stack traces for cancellation errors where possible.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
  • Loading branch information
tonistiigi committed Dec 5, 2023
1 parent 06c971f commit 2b3a949
Show file tree
Hide file tree
Showing 46 changed files with 187 additions and 165 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ linters-settings:
forbid:
- '^fmt\.Errorf(# use errors\.Errorf instead)?$'
- '^logrus\.(Trace|Debug|Info|Warn|Warning|Error|Fatal)(f|ln)?(# use bklog\.G or bklog\.L instead of logrus directly)?$'
- '^context\.WithCancel(# use context\.WithCancelCause instead)?$'
- '^ctx\.Err(# use context\.Cause instead)?$'
importas:
alias:
- pkg: "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down
2 changes: 1 addition & 1 deletion cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt

select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
default:
return cm.prune(ctx, ch, opt)
}
Expand Down
6 changes: 3 additions & 3 deletions client/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,8 +1266,8 @@ func testClientGatewayContainerCancelExecTty(t *testing.T, sb integration.Sandbo
defer pid1.Wait()
defer ctr.Release(ctx)

execCtx, cancel := context.WithCancel(ctx)
defer cancel()
execCtx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.WithStack(context.Canceled))

prompt := newTestPrompt(execCtx, t, inputW, output)
pid2, err := ctr.Start(execCtx, client.StartRequest{
Expand All @@ -1281,7 +1281,7 @@ func testClientGatewayContainerCancelExecTty(t *testing.T, sb integration.Sandbo
require.NoError(t, err)

prompt.SendExpect("echo hi", "hi")
cancel()
cancel(errors.WithStack(context.Canceled))

err = pid2.Wait()
require.ErrorIs(t, err, context.Canceled)
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *Client) Wait(ctx context.Context) error {

select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
case <-time.After(time.Second):
}
c.conn.ResetConnectBackoff()
Expand Down
4 changes: 2 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7407,8 +7407,8 @@ func testInvalidExporter(t *testing.T, sb integration.Sandbox) {

// moby/buildkit#492
func testParallelLocalBuilds(t *testing.T, sb integration.Sandbox) {
ctx, cancel := context.WithCancel(sb.Context())
defer cancel()
ctx, cancel := context.WithCancelCause(sb.Context())
defer cancel(errors.WithStack(context.Canceled))

c, err := New(ctx, sb.Address())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion client/llb/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (as *asyncState) Do(ctx context.Context, c *Constraints) error {
if err != nil {
select {
case <-ctx.Done():
if errors.Is(err, ctx.Err()) {
if errors.Is(err, context.Cause(ctx)) {
return res, err
}
default:
Expand Down
12 changes: 6 additions & 6 deletions client/solve.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
}
eg, ctx := errgroup.WithContext(ctx)

statusContext, cancelStatus := context.WithCancel(context.Background())
defer cancelStatus()
statusContext, cancelStatus := context.WithCancelCause(context.Background())
defer cancelStatus(errors.WithStack(context.Canceled))

if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
statusContext = trace.ContextWithSpan(statusContext, span)
Expand Down Expand Up @@ -230,16 +230,16 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
frontendAttrs[k] = v
}

solveCtx, cancelSolve := context.WithCancel(ctx)
solveCtx, cancelSolve := context.WithCancelCause(ctx)
var res *SolveResponse
eg.Go(func() error {
ctx := solveCtx
defer cancelSolve()
defer cancelSolve(errors.WithStack(context.Canceled))

defer func() { // make sure the Status ends cleanly on build errors
go func() {
<-time.After(3 * time.Second)
cancelStatus()
cancelStatus(errors.WithStack(context.Canceled))
}()
if !opt.SessionPreInitialized {
bklog.G(ctx).Debugf("stopping session")
Expand Down Expand Up @@ -298,7 +298,7 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
select {
case <-solveCtx.Done():
case <-time.After(5 * time.Second):
cancelSolve()
cancelSolve(errors.WithStack(context.Canceled))
}

return err
Expand Down
14 changes: 7 additions & 7 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func main() {
if os.Geteuid() > 0 {
return errors.New("rootless mode requires to be executed as the mapped root in a user namespace; you may use RootlessKit for setting up the namespace")
}
ctx, cancel := context.WithCancel(appcontext.Context())
defer cancel()
ctx, cancel := context.WithCancelCause(appcontext.Context())
defer cancel(errors.WithStack(context.Canceled))

cfg, err := config.LoadFile(c.GlobalString("config"))
if err != nil {
Expand Down Expand Up @@ -344,9 +344,9 @@ func main() {
select {
case serverErr := <-errCh:
err = serverErr
cancel()
cancel(err)
case <-ctx.Done():
err = ctx.Err()
err = context.Cause(ctx)
}

bklog.G(ctx).Infof("stopping server")
Expand Down Expand Up @@ -634,14 +634,14 @@ func unaryInterceptor(globalCtx context.Context, tp trace.TracerProvider) grpc.U
withTrace := otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp), otelgrpc.WithPropagators(propagators))

return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.WithStack(context.Canceled))

go func() {
select {
case <-ctx.Done():
case <-globalCtx.Done():
cancel()
cancel(context.Cause(globalCtx))
}
}()

Expand Down
4 changes: 2 additions & 2 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,10 @@ func (c *Controller) Session(stream controlapi.Control_SessionServer) error {
conn, closeCh, opts := grpchijack.Hijack(stream)
defer conn.Close()

ctx, cancel := context.WithCancel(stream.Context())
ctx, cancel := context.WithCancelCause(stream.Context())
go func() {
<-closeCh
cancel()
cancel(errors.WithStack(context.Canceled))
}()

err := c.opt.SessionManager.HandleConn(ctx, conn, opts)
Expand Down
8 changes: 4 additions & 4 deletions executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut
}
select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
case err, ok := <-details.done:
if !ok || err == nil {
return errors.Errorf("container %s has stopped", id)
Expand Down Expand Up @@ -336,8 +336,8 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces

// handle signals (and resize) in separate go loop so it does not
// potentially block the container cancel/exit status loop below.
eventCtx, eventCancel := context.WithCancel(ctx)
defer eventCancel()
eventCtx, eventCancel := context.WithCancelCause(ctx)
defer eventCancel(errors.WithStack(context.Canceled))
go func() {
for {
select {
Expand Down Expand Up @@ -403,7 +403,7 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces
}
select {
case <-ctx.Done():
exitErr.Err = errors.Wrap(ctx.Err(), exitErr.Error())
exitErr.Err = errors.Wrap(context.Cause(ctx), exitErr.Error())
default:
}
return exitErr
Expand Down
14 changes: 7 additions & 7 deletions executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func exitError(ctx context.Context, err error) error {
)
select {
case <-ctx.Done():
exitErr.Err = errors.Wrapf(ctx.Err(), exitErr.Error())
exitErr.Err = errors.Wrapf(context.Cause(ctx), exitErr.Error())
return exitErr
default:
return stack.Enable(exitErr)
Expand Down Expand Up @@ -402,7 +402,7 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
}
select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
case err, ok := <-done:
if !ok || err == nil {
return errors.Errorf("container %s has stopped", id)
Expand Down Expand Up @@ -580,7 +580,7 @@ type procHandle struct {
monitorProcess *os.Process
ready chan struct{}
ended chan struct{}
shutdown func()
shutdown func(error)
// this this only used when the request context is canceled and we need
// to kill the in-container process.
killer procKiller
Expand All @@ -594,7 +594,7 @@ type procHandle struct {
// The goal is to allow for runc to gracefully shutdown when the request context
// is cancelled.
func runcProcessHandle(ctx context.Context, killer procKiller) (*procHandle, context.Context) {
runcCtx, cancel := context.WithCancel(context.Background())
runcCtx, cancel := context.WithCancelCause(context.Background())
p := &procHandle{
ready: make(chan struct{}),
ended: make(chan struct{}),
Expand All @@ -620,7 +620,7 @@ func runcProcessHandle(ctx context.Context, killer procKiller) (*procHandle, con
select {
case <-killCtx.Done():
timeout()
cancel()
cancel(errors.WithStack(context.Cause(ctx)))
return
default:
}
Expand Down Expand Up @@ -653,7 +653,7 @@ func (p *procHandle) Release() {
// goroutines.
func (p *procHandle) Shutdown() {
if p.shutdown != nil {
p.shutdown()
p.shutdown(errors.WithStack(context.Canceled))
}
}

Expand All @@ -663,7 +663,7 @@ func (p *procHandle) Shutdown() {
func (p *procHandle) WaitForReady(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
case <-p.ready:
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions frontend/gateway/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Mount struct {
}

func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
eg, ctx := errgroup.WithContext(ctx)
platform := opspb.Platform{
OS: runtime.GOOS,
Expand Down Expand Up @@ -300,7 +300,7 @@ type gatewayContainer struct {
mu sync.Mutex
cleanup []func() error
ctx context.Context
cancel func()
cancel func(error)
}

func (gwCtr *gatewayContainer) Start(ctx context.Context, req client.StartRequest) (client.ContainerProcess, error) {
Expand Down Expand Up @@ -408,7 +408,7 @@ func (gwCtr *gatewayContainer) loadSecretEnv(ctx context.Context, secretEnv []*p
func (gwCtr *gatewayContainer) Release(ctx context.Context) error {
gwCtr.mu.Lock()
defer gwCtr.mu.Unlock()
gwCtr.cancel()
gwCtr.cancel(errors.WithStack(context.Canceled))
err1 := gwCtr.errGroup.Wait()

var err2 error
Expand Down
8 changes: 4 additions & 4 deletions frontend/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridg
}

func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
lbf := newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor))
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
Expand All @@ -469,7 +469,7 @@ func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLB
default:
lbf.isErrServerClosed = true
}
cancel()
cancel(errors.WithStack(context.Canceled))
}()

return lbf, ctx, nil
Expand Down Expand Up @@ -1322,8 +1322,8 @@ func (lbf *llbBridgeForwarder) ExecProcess(srv pb.LLBBridge_ExecProcessServer) e
return stack.Enable(status.Errorf(codes.NotFound, "container %q previously released or not created", id))
}

initCtx, initCancel := context.WithCancel(context.Background())
defer initCancel()
initCtx, initCancel := context.WithCancelCause(context.Background())
defer initCancel(errors.WithStack(context.Canceled))

pio := newProcessIO(pid, init.Fds)
pios[pid] = pio
Expand Down
10 changes: 5 additions & 5 deletions frontend/gateway/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (b *procMessageForwarder) Close() {
type messageForwarder struct {
client pb.LLBBridgeClient
ctx context.Context
cancel func()
cancel func(error)
eg *errgroup.Group
mu sync.Mutex
pids map[string]*procMessageForwarder
Expand All @@ -630,7 +630,7 @@ type messageForwarder struct {
}

func newMessageForwarder(ctx context.Context, client pb.LLBBridgeClient) *messageForwarder {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
eg, ctx := errgroup.WithContext(ctx)
return &messageForwarder{
client: client,
Expand Down Expand Up @@ -719,7 +719,7 @@ func (m *messageForwarder) Send(msg *pb.ExecMessage) error {
}

func (m *messageForwarder) Release() error {
m.cancel()
m.cancel(errors.WithStack(context.Canceled))
return m.eg.Wait()
}

Expand Down Expand Up @@ -949,7 +949,7 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien
closeDoneOnce.Do(func() {
close(done)
})
return ctx.Err()
return context.Cause(ctx)
}

if file := msg.GetFile(); file != nil {
Expand Down Expand Up @@ -1145,7 +1145,7 @@ func grpcClientConn(ctx context.Context) (context.Context, *grpc.ClientConn, err
return nil, nil, errors.Wrap(err, "failed to create grpc client")
}

ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
_ = cancel
// go monitorHealth(ctx, cc, cancel)

Expand Down
4 changes: 2 additions & 2 deletions session/filesync/filesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error {

opts[keyDirName] = []string{opt.Name}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.WithStack(context.Canceled))

client := NewFileSyncClient(c.Conn())

Expand Down
6 changes: 3 additions & 3 deletions session/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.
return nil, nil, errors.Wrap(err, "failed to create grpc client")
}

ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
go monitorHealth(ctx, cc, cancel)

return ctx, cc, nil
}

func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func()) {
defer cancelConn()
func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func(error)) {
defer cancelConn(errors.WithStack(context.Canceled))
defer cc.Close()

ticker := time.NewTicker(5 * time.Second)
Expand Down

0 comments on commit 2b3a949

Please sign in to comment.