Skip to content

Commit

Permalink
Handle blocked I/O of exec'd processes
Browse files Browse the repository at this point in the history
This is the second part to
containerd/containerd#3361 and will help process
delete not block forever when the process exists but the I/O was
inherited by a subprocess that lives on.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
  • Loading branch information
crosbymichael committed Jun 21, 2019
1 parent 384c782 commit b5f2886
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 114 deletions.
2 changes: 1 addition & 1 deletion container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func (i *rio) Close() error {
}

func (i *rio) Wait() {
i.sc.Wait()
i.sc.Wait(context.Background())

i.IO.Wait()
}
29 changes: 26 additions & 3 deletions container/stream/streams.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream // import "github.com/docker/docker/container/stream"

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -24,11 +25,12 @@ import (
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".
type Config struct {
sync.WaitGroup
wg sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
dio *cio.DirectIO
}

// NewConfig creates a stream config and initializes
Expand Down Expand Up @@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error {

// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
c.dio = iop
copyFunc := func(w io.Writer, r io.ReadCloser) {
c.Add(1)
c.wg.Add(1)
go func() {
if _, err := pools.Copy(w, r); err != nil {
logrus.Errorf("stream copy error: %v", err)
}
r.Close()
c.Done()
c.wg.Done()
}()
}

Expand All @@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) {
}
}
}

// Wait for the stream to close
// Wait supports timeouts via the context to unblock and forcefully
// close the io streams
func (c *Config) Wait(ctx context.Context) {
done := make(chan struct{}, 1)
go func() {
c.wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
if c.dio != nil {
c.dio.Cancel()
c.dio.Wait()
c.dio.Close()
}
}
}
3 changes: 2 additions & 1 deletion daemon/exec/exec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exec // import "github.com/docker/docker/daemon/exec"

import (
"context"
"runtime"
"sync"

Expand Down Expand Up @@ -58,7 +59,7 @@ func (i *rio) Close() error {
}

func (i *rio) Wait() {
i.sc.Wait()
i.sc.Wait(context.Background())

i.IO.Wait()
}
Expand Down
8 changes: 6 additions & 2 deletions daemon/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
if err != nil {
logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
}
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)

c.StreamConfig.Wait()
c.StreamConfig.Wait(ctx)
c.Reset(false)

exitStatus := container.ExitStatus{
Expand Down Expand Up @@ -123,7 +124,10 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
defer execConfig.Unlock()
execConfig.ExitCode = &ec
execConfig.Running = false
execConfig.StreamConfig.Wait()

ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
execConfig.StreamConfig.Wait(ctx)

if err := execConfig.CloseStreams(); err != nil {
logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
}
Expand Down
99 changes: 0 additions & 99 deletions integration-cli/docker_cli_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import (
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/docker/client"
"github.com/docker/docker/integration-cli/cli"
"github.com/docker/docker/integration-cli/cli/build"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/go-check/check"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
Expand Down Expand Up @@ -534,100 +532,3 @@ func (s *DockerSuite) TestExecEnvLinksHost(c *check.C) {
assert.Check(c, is.Contains(out, "HOSTNAME=myhost"))
assert.Check(c, is.Contains(out, "DB_NAME=/bar/db"))
}

func (s *DockerSuite) TestExecWindowsOpenHandles(c *check.C) {
testRequires(c, DaemonIsWindows)

if runtime.GOOS == "windows" {
v, err := kernel.GetKernelVersion()
assert.NilError(c, err)
build, _ := strconv.Atoi(strings.Split(strings.SplitN(v.String(), " ", 3)[2][1:], ".")[0])
if build >= 17743 {
c.Skip("Temporarily disabled on RS5 17743+ builds due to platform bug")

// This is being tracked internally. @jhowardmsft. Summary of failure
// from an email in early July 2018 below:
//
// Platform regression. In cmd.exe by the look of it. I can repro
// it outside of CI. It fails the same on 17681, 17676 and even as
// far back as 17663, over a month old. From investigating, I can see
// what's happening in the container, but not the reason. The test
// starts a long-running container based on the Windows busybox image.
// It then adds another process (docker exec) to that container to
// sleep. It loops waiting for two instances of busybox.exe running,
// and cmd.exe to quit. What's actually happening is that the second
// exec hangs indefinitely, and from docker top, I can see
// "OpenWith.exe" running.

//Manual repro would be
//# Start the first long-running container
//docker run --rm -d --name test busybox sleep 300

//# In another window, docker top test. There should be a single instance of busybox.exe running
//# In a third window, docker exec test cmd /c start sleep 10 NOTE THIS HANGS UNTIL 5 MIN TIMEOUT
//# In the second window, run docker top test. Note that OpenWith.exe is running, one cmd.exe and only one busybox. I would expect no "OpenWith" and two busybox.exe's.
}
}

runSleepingContainer(c, "-d", "--name", "test")
exec := make(chan bool)
go func() {
dockerCmd(c, "exec", "test", "cmd", "/c", "start sleep 10")
exec <- true
}()

count := 0
for {
top := make(chan string)
var out string
go func() {
out, _ := dockerCmd(c, "top", "test")
top <- out
}()

select {
case <-time.After(time.Second * 5):
c.Fatal("timed out waiting for top while exec is exiting")
case out = <-top:
break
}

if strings.Count(out, "busybox.exe") == 2 && !strings.Contains(out, "cmd.exe") {
// The initial exec process (cmd.exe) has exited, and both sleeps are currently running
break
}
count++
if count >= 30 {
c.Fatal("too many retries")
}
time.Sleep(1 * time.Second)
}

inspect := make(chan bool)
go func() {
dockerCmd(c, "inspect", "test")
inspect <- true
}()

select {
case <-time.After(time.Second * 5):
c.Fatal("timed out waiting for inspect while exec is exiting")
case <-inspect:
break
}

// Ensure the background sleep is still running
out, _ := dockerCmd(c, "top", "test")
assert.Equal(c, strings.Count(out, "busybox.exe"), 2)

// The exec should exit when the background sleep exits
select {
case <-time.After(time.Second * 15):
c.Fatal("timed out waiting for async exec to exit")
case <-exec:
// Ensure the background sleep has actually exited
out, _ := dockerCmd(c, "top", "test")
assert.Equal(c, strings.Count(out, "busybox.exe"), 1)
break
}
}
16 changes: 8 additions & 8 deletions libcontainerd/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,13 +652,6 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
}).Error("exit event")
return
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}

ctr, err := c.getContainer(ctx, ei.ContainerID)
if err != nil {
Expand All @@ -672,11 +665,18 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
"error": err,
}).Error("failed to find container")
}).Error("failed to get container labels")
return
}
newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
}
})
}
Expand Down

0 comments on commit b5f2886

Please sign in to comment.