Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle blocked I/O of exec'd processes, and remove flaky TestExecWindowsOpenHandles #39383

Merged
merged 1 commit into from Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion container/container.go
Expand Up @@ -730,7 +730,7 @@ func (i *rio) Close() error {
}

func (i *rio) Wait() {
i.sc.Wait()
i.sc.Wait(context.Background())

i.IO.Wait()
}
29 changes: 26 additions & 3 deletions container/stream/streams.go
@@ -1,6 +1,7 @@
package stream // import "github.com/docker/docker/container/stream"

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -24,11 +25,12 @@ import (
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".
type Config struct {
sync.WaitGroup
wg sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
dio *cio.DirectIO
}

// NewConfig creates a stream config and initializes
Expand Down Expand Up @@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error {

// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
c.dio = iop
copyFunc := func(w io.Writer, r io.ReadCloser) {
c.Add(1)
c.wg.Add(1)
go func() {
if _, err := pools.Copy(w, r); err != nil {
logrus.Errorf("stream copy error: %v", err)
}
r.Close()
c.Done()
c.wg.Done()
}()
}

Expand All @@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) {
}
}
}

// Wait for the stream to close
// Wait supports timeouts via the context to unblock and forcefully
// close the io streams
func (c *Config) Wait(ctx context.Context) {
crosbymichael marked this conversation as resolved.
Show resolved Hide resolved
done := make(chan struct{}, 1)
go func() {
c.wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
if c.dio != nil {
c.dio.Cancel()
c.dio.Wait()
c.dio.Close()
}
}
}
3 changes: 2 additions & 1 deletion daemon/exec/exec.go
@@ -1,6 +1,7 @@
package exec // import "github.com/docker/docker/daemon/exec"

import (
"context"
"runtime"
"sync"

Expand Down Expand Up @@ -58,7 +59,7 @@ func (i *rio) Close() error {
}

func (i *rio) Wait() {
i.sc.Wait()
i.sc.Wait(context.Background())

i.IO.Wait()
}
Expand Down
8 changes: 6 additions & 2 deletions daemon/monitor.go
Expand Up @@ -55,8 +55,9 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
if err != nil {
logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
}
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)

c.StreamConfig.Wait()
c.StreamConfig.Wait(ctx)
c.Reset(false)

exitStatus := container.ExitStatus{
Expand Down Expand Up @@ -123,7 +124,10 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
defer execConfig.Unlock()
execConfig.ExitCode = &ec
execConfig.Running = false
execConfig.StreamConfig.Wait()

ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
execConfig.StreamConfig.Wait(ctx)

if err := execConfig.CloseStreams(); err != nil {
logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
}
Expand Down
99 changes: 0 additions & 99 deletions integration-cli/docker_cli_exec_test.go
Expand Up @@ -11,15 +11,13 @@ import (
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/docker/client"
"github.com/docker/docker/integration-cli/cli"
"github.com/docker/docker/integration-cli/cli/build"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/go-check/check"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
Expand Down Expand Up @@ -534,100 +532,3 @@ func (s *DockerSuite) TestExecEnvLinksHost(c *check.C) {
assert.Check(c, is.Contains(out, "HOSTNAME=myhost"))
assert.Check(c, is.Contains(out, "DB_NAME=/bar/db"))
}

func (s *DockerSuite) TestExecWindowsOpenHandles(c *check.C) {
testRequires(c, DaemonIsWindows)

if runtime.GOOS == "windows" {
v, err := kernel.GetKernelVersion()
assert.NilError(c, err)
build, _ := strconv.Atoi(strings.Split(strings.SplitN(v.String(), " ", 3)[2][1:], ".")[0])
if build >= 17743 {
c.Skip("Temporarily disabled on RS5 17743+ builds due to platform bug")

// This is being tracked internally. @jhowardmsft. Summary of failure
// from an email in early July 2018 below:
//
// Platform regression. In cmd.exe by the look of it. I can repro
// it outside of CI. It fails the same on 17681, 17676 and even as
// far back as 17663, over a month old. From investigating, I can see
// what's happening in the container, but not the reason. The test
// starts a long-running container based on the Windows busybox image.
// It then adds another process (docker exec) to that container to
// sleep. It loops waiting for two instances of busybox.exe running,
// and cmd.exe to quit. What's actually happening is that the second
// exec hangs indefinitely, and from docker top, I can see
// "OpenWith.exe" running.

//Manual repro would be
//# Start the first long-running container
//docker run --rm -d --name test busybox sleep 300

//# In another window, docker top test. There should be a single instance of busybox.exe running
//# In a third window, docker exec test cmd /c start sleep 10 NOTE THIS HANGS UNTIL 5 MIN TIMEOUT
//# In the second window, run docker top test. Note that OpenWith.exe is running, one cmd.exe and only one busybox. I would expect no "OpenWith" and two busybox.exe's.
}
}

runSleepingContainer(c, "-d", "--name", "test")
exec := make(chan bool)
go func() {
dockerCmd(c, "exec", "test", "cmd", "/c", "start sleep 10")
exec <- true
}()

count := 0
for {
top := make(chan string)
var out string
go func() {
out, _ := dockerCmd(c, "top", "test")
top <- out
}()

select {
case <-time.After(time.Second * 5):
c.Fatal("timed out waiting for top while exec is exiting")
case out = <-top:
break
}

if strings.Count(out, "busybox.exe") == 2 && !strings.Contains(out, "cmd.exe") {
// The initial exec process (cmd.exe) has exited, and both sleeps are currently running
break
}
count++
if count >= 30 {
c.Fatal("too many retries")
}
time.Sleep(1 * time.Second)
}

inspect := make(chan bool)
go func() {
dockerCmd(c, "inspect", "test")
inspect <- true
}()

select {
case <-time.After(time.Second * 5):
c.Fatal("timed out waiting for inspect while exec is exiting")
case <-inspect:
break
}

// Ensure the background sleep is still running
out, _ := dockerCmd(c, "top", "test")
assert.Equal(c, strings.Count(out, "busybox.exe"), 2)

// The exec should exit when the background sleep exits
select {
case <-time.After(time.Second * 15):
c.Fatal("timed out waiting for async exec to exit")
case <-exec:
// Ensure the background sleep has actually exited
out, _ := dockerCmd(c, "top", "test")
assert.Equal(c, strings.Count(out, "busybox.exe"), 1)
break
}
}
16 changes: 8 additions & 8 deletions libcontainerd/remote/client.go
Expand Up @@ -652,13 +652,6 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
}).Error("exit event")
return
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}

ctr, err := c.getContainer(ctx, ei.ContainerID)
if err != nil {
Expand All @@ -672,11 +665,18 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
"error": err,
}).Error("failed to find container")
}).Error("failed to get container labels")
return
}
newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
}
})
}
Expand Down