Skip to content

Commit

Permalink
runner/docker: use a cancelable context with containers & printers
Browse files Browse the repository at this point in the history
  • Loading branch information
laurentsenta committed Aug 3, 2022
1 parent e8f2bc6 commit 6d665fc
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions pkg/runner/local_docker.go
Expand Up @@ -483,14 +483,14 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
}

// ## Start the containers & log their outputs.
collectCtx, cancelCollect := context.WithCancel(ctx)
runCtx, cancelRun := context.WithCancel(ctx)

defer func() {
cancelCollect()
cancelRun()
}()

// First we collect every container outcomes.
outcomesCollectIsCompleteCh, err := r.collectOutcomes(collectCtx, result, &template)
outcomesCollectIsCompleteCh, err := r.collectOutcomes(runCtx, result, &template)
if err != nil {
log.Error(err)
return
Expand All @@ -499,7 +499,8 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
// Second we start the containers
log.Infow("starting containers", "count", len(containers))
var (
startGroup, startGroupCtx = errgroup.WithContext(ctx)
startGroup, startGroupCtx = errgroup.WithContext(runCtx)
runGroup, runGroupCtx = errgroup.WithContext(runCtx)
ratelimit = make(chan struct{}, 16)
started = make(chan testContainerInstance, len(containers))
)
Expand Down Expand Up @@ -534,16 +535,17 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
// Tail the sidecar container logs and appends them to the pretty printer.
go func() {
t := time.Now().Add(time.Duration(-10) * time.Second) // sidecar is a long running daemon, so we care only about logs around the execution of our test run
stream, err := cli.ContainerLogs(ctx, "testground-sidecar", types.ContainerLogsOptions{
stream, err := cli.ContainerLogs(runCtx, "testground-sidecar", types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: false,
Since: t.Format("2006-01-02T15:04:05"),
Follow: true,
})

if err != nil {
// TODO: handle
log.Errorw("failed to attach sidecar", "error", err)
cancelRun()
return
}

rstdout, wstdout := io.Pipe()
Expand All @@ -564,17 +566,17 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
select {
case c := <-started:
log.Infow("attaching container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx)
stream, err := cli.ContainerLogs(ctx, c.containerID, types.ContainerLogsOptions{
stream, err := cli.ContainerLogs(runCtx, c.containerID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Since: "2019-01-01T00:00:00",
Follow: true,
})

if err != nil {
// TODO: handle
log.Errorw("failed to attach container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "error", err)
continue
cancelRun()
return
}

rstdout, wstdout := io.Pipe()
Expand All @@ -588,7 +590,7 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
// instance tag in output: << group[zero_padded_i] >> (container_id[0:6]), e.g. << miner[003] (a1b2c3) >>
tag := fmt.Sprintf("%s[%03d] (%s)", c.groupID, c.groupIdx, c.containerID[0:6])
pretty.Manage(tag, rstdout, rstderr)
case <-ctx.Done():
case <-runCtx.Done():
// Exit
return
}
Expand All @@ -604,15 +606,13 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
}

// Finally, we're going to follow our containers until they are done
runGroup, runGroupCtx := errgroup.WithContext(ctx)

for _, c := range containers {
c := c
f := func() error {
log.Infow("waiting for container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx)

// TODO: Make sure we wait on a context that will be canceled if the group is canceled.
statusCh, errCh := cli.ContainerWait(ctx, c.containerID, container.WaitConditionNotRunning)
statusCh, errCh := cli.ContainerWait(runCtx, c.containerID, container.WaitConditionNotRunning)

select {
case err := <-errCh:
Expand All @@ -622,8 +622,6 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
}
return nil
case status := <-statusCh:
// TODO: This channel output and error field and the container exit status,
// make sure we don't miss any error. And use the container's exit status to provide more logging.
log.Infow("container exited", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "status", status.StatusCode)
return nil
case <-runGroupCtx.Done(): // race with the group
Expand Down Expand Up @@ -669,8 +667,8 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
case <-outcomesCollectTimeout:
log.Infow("we timeout'd waiting for outcomes")
waitingForOutcomes = false
case <-ctx.Done():
log.Infow("test run is canceled")
case <-runCtx.Done():
log.Infow("the test run ended early")
return
}
}
Expand Down

0 comments on commit 6d665fc

Please sign in to comment.