Skip to content

Commit

Permalink
runtime: Add more debug logs for container io stream copy
Browse files Browse the repository at this point in the history
This can help debugging container lifecycle issues

Fixes: #3913

Signed-off-by: Feng Wang <feng.wang@databricks.com>
  • Loading branch information
fengwang-db committed Mar 25, 2022
1 parent 853dd98 commit 19f372b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
14 changes: 12 additions & 2 deletions src/runtime/pkg/containerd-shim-v2/start.go
Expand Up @@ -8,12 +8,14 @@ package containerdshim
import (
"context"
"fmt"
"github.com/sirupsen/logrus"

"github.com/containerd/containerd/api/types/task"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
)

func startContainer(ctx context.Context, s *service, c *container) (retErr error) {
shimLog.WithField("container", c.id).Debug("start container")
defer func() {
if retErr != nil {
// notify the wait goroutine to continue
Expand Down Expand Up @@ -78,7 +80,8 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)

go ioCopy(shimLog.WithField("container", c.id), c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else {
// close the io exit channel, since there is no io for this container,
// otherwise the following wait goroutine will hang on this channel.
Expand All @@ -94,6 +97,10 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error
}

func startExec(ctx context.Context, s *service, containerID, execID string) (e *exec, retErr error) {
shimLog.WithFields(logrus.Fields{
"container": containerID,
"exec": execID,
}).Debug("start container execution")
// start an exec
c, err := s.getContainer(containerID)
if err != nil {
Expand Down Expand Up @@ -140,7 +147,10 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (e *
}
execs.ttyio = tty

go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
go ioCopy(shimLog.WithFields(logrus.Fields{
"container": c.id,
"exec": execID,
}), execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)

go wait(ctx, s, c, execID)

Expand Down
10 changes: 9 additions & 1 deletion src/runtime/pkg/containerd-shim-v2/stream.go
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"

"github.com/containerd/fifo"
"github.com/sirupsen/logrus"
)

// The buffer size used to specify the buffer for IO streams copy
Expand Down Expand Up @@ -86,25 +87,28 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
return ttyIO, nil
}

func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup

if tty.Stdin != nil {
wg.Add(1)
go func() {
shimLog.Debug("stdin io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
shimLog.Debug("stdin io stream copy exited")
}()
}

if tty.Stdout != nil {
wg.Add(1)

go func() {
shimLog.Debug("stdout io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
Expand All @@ -113,20 +117,24 @@ func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteClo
// close stdin to make the other routine stop
tty.Stdin.Close()
}
shimLog.Debug("stdout io stream copy exited")
}()
}

if tty.Stderr != nil && stderrPipe != nil {
wg.Add(1)
go func() {
shimLog.Debug("stderr io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
wg.Done()
shimLog.Debug("stderr io stream copy exited")
}()
}

wg.Wait()
tty.close()
close(exitch)
shimLog.Debug("all io stream copy goroutines exited")
}
3 changes: 2 additions & 1 deletion src/runtime/pkg/containerd-shim-v2/stream_test.go
Expand Up @@ -7,6 +7,7 @@ package containerdshim

import (
"context"
"github.com/sirupsen/logrus"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestIoCopy(t *testing.T) {
defer tty.close()

// start the ioCopy threads : copy from src to dst
go ioCopy(exitioch, stdinCloser, tty, dstInW, srcOutR, srcErrR)
go ioCopy(logrus.WithContext(context.Background()), exitioch, stdinCloser, tty, dstInW, srcOutR, srcErrR)

var firstW, secondW, thirdW io.WriteCloser
var firstR, secondR, thirdR io.Reader
Expand Down
11 changes: 10 additions & 1 deletion src/runtime/pkg/containerd-shim-v2/wait.go
Expand Up @@ -31,12 +31,17 @@ func wait(ctx context.Context, s *service, c *container, execID string) (int32,
if execID == "" {
//wait until the io closed, then wait the container
<-c.exitIOch
shimLog.WithField("container", c.id).Debug("The container io streams closed")
} else {
execs, err = c.getExec(execID)
if err != nil {
return exitCode255, err
}
<-execs.exitIOch
shimLog.WithFields(logrus.Fields{
"container": c.id,
"exec": execID,
}).Debug("The container process io streams closed")
//This wait could be triggered before exec start which
//will get the exec's id, thus this assignment must after
//the exec exit, to make sure it get the exec's id.
Expand Down Expand Up @@ -82,13 +87,17 @@ func wait(ctx context.Context, s *service, c *container, execID string) (int32,
c.exitTime = timeStamp

c.exitCh <- uint32(ret)

shimLog.WithField("container", c.id).Debug("The container status is StatusStopped")
} else {
execs.status = task.StatusStopped
execs.exitCode = ret
execs.exitTime = timeStamp

execs.exitCh <- uint32(ret)
shimLog.WithFields(logrus.Fields{
"container": c.id,
"exec": execID,
}).Debug("The container exec status is StatusStopped")
}
s.mu.Unlock()

Expand Down

0 comments on commit 19f372b

Please sign in to comment.