Skip to content

Commit

Permalink
Merge pull request #3754 from coryb/issue-3751
Browse files Browse the repository at this point in the history
runc worker: fix sigkill handling
  • Loading branch information
coryb committed Mar 31, 2023
2 parents 86c3b26 + 6073f58 commit 3187d2d
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 45 deletions.
171 changes: 142 additions & 29 deletions executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -429,25 +430,129 @@ func (s *forwardIO) Stderr() io.ReadCloser {
return nil
}

// procHandle is to track the os process so we can send signals to it.
// newRuncProcKiller returns an abstraction for sending SIGKILL to the
// process inside the container initiated from `runc run`.
func newRunProcKiller(runC *runc.Runc, id string) procKiller {
return procKiller{runC: runC, id: id}
}

// newExecProcKiller returns an abstraction for sending SIGKILL to the
// process inside the container initiated from `runc exec`.
func newExecProcKiller(runC *runc.Runc, id string) (procKiller, error) {
// for `runc exec` we need to create a pidfile and read it later to kill
// the process
tdir, err := os.MkdirTemp("", "runc")
if err != nil {
return procKiller{}, errors.Wrap(err, "failed to create directory for runc pidfile")
}

return procKiller{
runC: runC,
id: id,
pidfile: filepath.Join(tdir, "pidfile"),
cleanup: func() {
os.RemoveAll(tdir)
},
}, nil
}

type procKiller struct {
runC *runc.Runc
id string
pidfile string
cleanup func()
}

// Cleanup will delete any tmp files created for the pidfile allocation
// if this killer was for a `runc exec` process.
func (k procKiller) Cleanup() {
if k.cleanup != nil {
k.cleanup()
}
}

// Kill will send SIGKILL to the process running inside the container.
// If the process was created by `runc run` then we will use `runc kill`,
// otherwise for `runc exec` we will read the pid from a pidfile and then
// send the signal directly that process.
func (k procKiller) Kill(ctx context.Context) (err error) {
bklog.G(ctx).Debugf("sending sigkill to process in container %s", k.id)
defer func() {
if err != nil {
bklog.G(ctx).Errorf("failed to kill process in container id %s: %+v", k.id, err)
}
}()

// this timeout is generally a no-op, the Kill ctx should already have a
// shorter timeout but here as a fail-safe for future refactoring.
ctx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()

if k.pidfile == "" {
// for `runc run` process we use `runc kill` to terminate the process
return k.runC.Kill(ctx, k.id, int(syscall.SIGKILL), nil)
}

// `runc exec` will write the pidfile a few milliseconds after we
// get the runc pid via the startedCh, so we might need to retry until
// it appears in the edge case where we want to kill a process
// immediately after it was created.
var pidData []byte
for {
pidData, err = os.ReadFile(k.pidfile)
if err != nil {
if os.IsNotExist(err) {
select {
case <-ctx.Done():
return errors.New("context cancelled before runc wrote pidfile")
case <-time.After(10 * time.Millisecond):
continue
}
}
return errors.Wrap(err, "failed to read pidfile from runc")
}
break
}
pid, err := strconv.Atoi(string(pidData))
if err != nil {
return errors.Wrap(err, "read invalid pid from pidfile")
}
process, err := os.FindProcess(pid)
if err != nil {
// error only possible on non-unix hosts
return errors.Wrapf(err, "failed to find process for pid %d from pidfile", pid)
}
defer process.Release()
return process.Signal(syscall.SIGKILL)
}

// procHandle is to track the process so we can send signals to it
// and handle graceful shutdown.
type procHandle struct {
Process *os.Process
ready chan struct{}
ended chan struct{}
shutdown func()
// this is for the runc process (not the process in-container)
monitorProcess *os.Process
ready chan struct{}
ended chan struct{}
shutdown func()
// this this only used when the request context is canceled and we need
// to kill the in-container process.
killer procKiller
}

// runcProcessHandle will create a procHandle that will be monitored, where
// on ctx.Done the process will be killed. If the kill fails, then the cancel
// will be called. This is to allow for runc to go through its normal shutdown
// procedure if the ctx is canceled and to ensure there are no zombie processes
// left by runc.
func runcProcessHandle(ctx context.Context, id string) (*procHandle, context.Context) {
// on ctx.Done the in-container process will receive a SIGKILL. The returned
// context should be used for the go-runc.(Run|Exec) invocations. The returned
// context will only be canceled in the case where the request context is
// canceled and we are unable to send the SIGKILL to the in-container process.
// The goal is to allow for runc to gracefully shutdown when the request context
// is cancelled.
func runcProcessHandle(ctx context.Context, killer procKiller) (*procHandle, context.Context) {
runcCtx, cancel := context.WithCancel(context.Background())
p := &procHandle{
ready: make(chan struct{}),
ended: make(chan struct{}),
shutdown: cancel,
killer: killer,
}
// preserve the logger on the context used for the runc process handling
runcCtx = bklog.WithLogger(runcCtx, bklog.G(ctx))
Expand All @@ -464,8 +569,7 @@ func runcProcessHandle(ctx context.Context, id string) (*procHandle, context.Con
select {
case <-ctx.Done():
killCtx, timeout := context.WithTimeout(context.Background(), 7*time.Second)
if err := p.Process.Kill(); err != nil {
bklog.G(ctx).Errorf("failed to kill runc %s: %+v", id, err)
if err := p.killer.Kill(killCtx); err != nil {
select {
case <-killCtx.Done():
timeout()
Expand All @@ -492,8 +596,8 @@ func runcProcessHandle(ctx context.Context, id string) (*procHandle, context.Con
// Release will free resources with a procHandle.
func (p *procHandle) Release() {
close(p.ended)
if p.Process != nil {
p.Process.Release()
if p.monitorProcess != nil {
p.monitorProcess.Release()
}
}

Expand All @@ -506,9 +610,9 @@ func (p *procHandle) Shutdown() {
}
}

// WaitForReady will wait until the Process has been populated or the
// provided context was cancelled. This should be called before using
// the Process field.
// WaitForReady will wait until we have received the runc pid via the go-runc
// Started channel, or until the request context is canceled. This should
// return without errors before attempting to send signals to the runc process.
func (p *procHandle) WaitForReady(ctx context.Context) error {
select {
case <-ctx.Done():
Expand All @@ -518,34 +622,36 @@ func (p *procHandle) WaitForReady(ctx context.Context) error {
}
}

// WaitForStart will record the pid reported by Runc via the channel.
// We wait for up to 10s for the runc process to start. If the started
// WaitForStart will record the runc pid reported by go-runc via the channel.
// We wait for up to 10s for the runc pid to be reported. If the started
// callback is non-nil it will be called after receiving the pid.
func (p *procHandle) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error {
startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()
var err error
select {
case <-startedCtx.Done():
return errors.New("runc started message never received")
case pid, ok := <-startedCh:
return errors.New("go-runc started message never received")
case runcPid, ok := <-startedCh:
if !ok {
return errors.New("runc process failed to send pid")
return errors.New("go-runc failed to send pid")
}
if started != nil {
started()
}
p.Process, err = os.FindProcess(pid)
var err error
p.monitorProcess, err = os.FindProcess(runcPid)
if err != nil {
return errors.Wrapf(err, "unable to find runc process for pid %d", pid)
// error only possible on non-unix hosts
return errors.Wrapf(err, "failed to find runc process %d", runcPid)
}
close(p.ready)
}
return nil
}

// handleSignals will wait until the runcProcess is ready then will
// send each signal received on the channel to the process.
// handleSignals will wait until the procHandle is ready then will
// send each signal received on the channel to the runc process (not directly
// to the in-container process)
func handleSignals(ctx context.Context, runcProcess *procHandle, signals <-chan syscall.Signal) error {
if signals == nil {
return nil
Expand All @@ -559,8 +665,15 @@ func handleSignals(ctx context.Context, runcProcess *procHandle, signals <-chan
case <-ctx.Done():
return nil
case sig := <-signals:
err := runcProcess.Process.Signal(sig)
if err != nil {
if sig == syscall.SIGKILL {
// never send SIGKILL directly to runc, it needs to go to the
// process in-container
if err := runcProcess.killer.Kill(ctx); err != nil {
return err
}
continue
}
if err := runcProcess.monitorProcess.Signal(sig); err != nil {
bklog.G(ctx).Errorf("failed to signal %s to process: %s", sig, err)
return err
}
Expand Down
28 changes: 21 additions & 7 deletions executor/runcexecutor/executor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

runc "github.com/containerd/go-runc"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/util/bklog"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
Expand All @@ -21,7 +22,8 @@ func (w *runcExecutor) run(ctx context.Context, id, bundle string, process execu
if process.Meta.Tty {
return unsupportedConsoleError
}
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
killer := newRunProcKiller(w.runc, id)
return w.commonCall(ctx, id, bundle, process, started, killer, func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error {
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
NoPivot: w.noPivot,
Started: started,
Expand All @@ -35,25 +37,37 @@ func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess
if process.Meta.Tty {
return unsupportedConsoleError
}
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {

killer, err := newExecProcKiller(w.runc, id)
if err != nil {
return errors.Wrap(err, "failed to initialize process killer")
}
defer killer.Cleanup()

return w.commonCall(ctx, id, bundle, process, started, killer, func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error {
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
Started: started,
IO: io,
PidFile: pidfile,
})
})
}

type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error
type runcCall func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error

// commonCall is the common run/exec logic used for non-linux runtimes. A tty
// is only supported for linux, so this really just handles signal propagation
// to the started runc process.
func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
runcProcess, ctx := runcProcessHandle(ctx, id)
func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), killer procKiller, call runcCall) error {
runcProcess, ctx := runcProcessHandle(ctx, killer)
defer runcProcess.Release()

eg, ctx := errgroup.WithContext(ctx)
defer eg.Wait()
defer func() {
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
bklog.G(ctx).Errorf("runc process monitoring error: %s", err)
}
}()
defer runcProcess.Shutdown()

startedCh := make(chan int, 1)
Expand All @@ -65,5 +79,5 @@ func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, proces
return handleSignals(ctx, runcProcess, process.Signal)
})

return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, killer.pidfile)
}
32 changes: 23 additions & 9 deletions executor/runcexecutor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func updateRuncFieldsForHostOS(runtime *runc.Runc) {
}

func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func()) error {
return w.callWithIO(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
killer := newRunProcKiller(w.runc, id)
return w.callWithIO(ctx, id, bundle, process, started, killer, func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error {
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
NoPivot: w.noPivot,
Started: started,
Expand All @@ -33,22 +34,33 @@ func (w *runcExecutor) run(ctx context.Context, id, bundle string, process execu
}

func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo, started func()) error {
return w.callWithIO(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
killer, err := newExecProcKiller(w.runc, id)
if err != nil {
return errors.Wrap(err, "failed to initialize process killer")
}
defer killer.Cleanup()

return w.callWithIO(ctx, id, bundle, process, started, killer, func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error {
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
Started: started,
IO: io,
PidFile: pidfile,
})
})
}

type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error
type runcCall func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error

func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
runcProcess, ctx := runcProcessHandle(ctx, id)
func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), killer procKiller, call runcCall) error {
runcProcess, ctx := runcProcessHandle(ctx, killer)
defer runcProcess.Release()

eg, ctx := errgroup.WithContext(ctx)
defer eg.Wait()
defer func() {
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
bklog.G(ctx).Errorf("runc process monitoring error: %s", err)
}
}()
defer runcProcess.Shutdown()

startedCh := make(chan int, 1)
Expand All @@ -61,7 +73,7 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
})

if !process.Meta.Tty {
return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, killer.pidfile)
}

ptm, ptsName, err := console.NewPty()
Expand Down Expand Up @@ -132,7 +144,9 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
if err != nil {
bklog.G(ctx).Errorf("failed to resize ptm: %s", err)
}
err = runcProcess.Process.Signal(signal.SIGWINCH)
// SIGWINCH must be sent to the runc monitor process, as
// terminal resizing is done in runc.
err = runcProcess.monitorProcess.Signal(signal.SIGWINCH)
if err != nil {
bklog.G(ctx).Errorf("failed to send SIGWINCH to process: %s", err)
}
Expand All @@ -151,5 +165,5 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
runcIO.stderr = pts
}

return call(ctx, startedCh, runcIO)
return call(ctx, startedCh, runcIO, killer.pidfile)
}

0 comments on commit 3187d2d

Please sign in to comment.