From f84592a18f559a1524f0633b662a94ac5cc09ac5 Mon Sep 17 00:00:00 2001 From: Laurent Senta Date: Wed, 27 Jul 2022 16:40:25 +0200 Subject: [PATCH] runner/docker: rewrite sync to fix race conditions --- pkg/runner/common_result.go | 27 +++++ pkg/runner/local_docker.go | 197 +++++++++++++++++++++--------------- 2 files changed, 143 insertions(+), 81 deletions(-) diff --git a/pkg/runner/common_result.go b/pkg/runner/common_result.go index 77ec1c15d..29d947e04 100644 --- a/pkg/runner/common_result.go +++ b/pkg/runner/common_result.go @@ -30,3 +30,30 @@ func newResult(input *api.RunInput) *Result { return result } + +func (r *Result) addOutcome(groupID string, outcome task.Outcome) { + switch outcome { + case task.OutcomeSuccess: + r.Outcomes[groupID].Ok++ + default: + // skip + } +} +func (r *Result) countTotalInstances() int { + count := 0 + for _, g := range r.Outcomes { + count += g.Total + } + return count +} + +// TODO: this should be a getter instead of a mutation +func (r *Result) updateOutcome() { + for _, g := range r.Outcomes { + if g.Total != g.Ok { + r.Outcome = task.OutcomeFailure + return + } + } + r.Outcome = task.OutcomeSuccess +} diff --git a/pkg/runner/local_docker.go b/pkg/runner/local_docker.go index b1aaeab2f..17dd605de 100644 --- a/pkg/runner/local_docker.go +++ b/pkg/runner/local_docker.go @@ -201,42 +201,43 @@ func (r *LocalDockerRunner) setupSyncClient() error { return nil } +// collectOutcomes listens to the sync service and collects the outcome for every test instance. +// It stops when all instances have submitted a result or the context was canceled. func (r *LocalDockerRunner) collectOutcomes(ctx context.Context, result *Result, tpl *runtime.RunParams) (chan bool, error) { eventsCh, err := r.syncClient.SubscribeEvents(ctx, tpl) if err != nil { return nil, err } + // TODO: eventually we'll keep a trace of each test instance status. + // Right now, if a container sends multiple events, it will mess up the outcomes. + // We have to pass its group id to the container, so that it can send us back messages + // with its own id. + expectingOutcomes := result.countTotalInstances() done := make(chan bool) go func() { running := true - for running { + for running && expectingOutcomes > 0 { select { case <-ctx.Done(): running = false case e := <-eventsCh: - // for now we emit only outcome OK events, so no need for more checks if e.SuccessEvent != nil { - se := e.SuccessEvent - o := result.Outcomes[se.TestGroupID] - o.Ok = o.Ok + 1 + result.addOutcome(e.SuccessEvent.TestGroupID, task.OutcomeSuccess) + expectingOutcomes -= 1 + } else if e.FailureEvent != nil { + result.addOutcome(e.SuccessEvent.TestGroupID, task.OutcomeFailure) + expectingOutcomes -= 1 + } else if e.CrashEvent != nil { + result.addOutcome(e.SuccessEvent.TestGroupID, task.OutcomeFailure) + expectingOutcomes -= 1 } + // else: skip } } - result.Outcome = task.OutcomeSuccess - if len(result.Outcomes) == 0 { - result.Outcome = task.OutcomeFailure - } - - for g := range result.Outcomes { - if result.Outcomes[g].Total != result.Outcomes[g].Ok { - result.Outcome = task.OutcomeFailure - break - } - } - + result.updateOutcome() done <- true }() @@ -274,12 +275,6 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp Result: result, } - // Grab a read lock. This will allow many runs to run simultaneously, but - // they will be exclusive of state-altering healthchecks. - // TODO: I'm not sure this is true anymore. - r.lk.RLock() - defer r.lk.RUnlock() - defer func() { if err != nil { // TODO: maybe replace only if the outcome is a default value. @@ -296,6 +291,12 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp return } + // Grab a read lock. This will allow many runs to run simultaneously, but + // they will be exclusive of state-altering healthchecks. + // TODO: I'm not sure this is true anymore. + r.lk.RLock() + defer r.lk.RUnlock() + // ## Prepare Execution Context // Create a docker client. @@ -478,25 +479,23 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp return } - var ( - doneCh = make(chan error, 2) - started = make(chan testContainerInstance, len(containers)) - ratelimit = make(chan struct{}, 16) - ) - - ctxContainers, cancel := context.WithCancel(ctx) - defer cancel() + // ## Start the containers & log their outputs. - // collect the outcomes in parallel while the process runs. - outcomesDoneCh, err := r.collectOutcomes(ctxContainers, result, &template) + // First we collect every container outcomes. + outcomesCollectIsCompleteCh, err := r.collectOutcomes(ctx, result, &template) if err != nil { log.Error(err) return } + // Second we start the containers log.Infow("starting containers", "count", len(containers)) + var ( + startGroup, startGroupCtx = errgroup.WithContext(ctx) + ratelimit = make(chan struct{}, 16) + started = make(chan testContainerInstance, len(containers)) + ) - g, gctx := errgroup.WithContext(ctxContainers) for _, c := range containers { c := c f := func() error { @@ -505,37 +504,27 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp log.Infow("starting container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx) - err := cli.ContainerStart(ctx, c.containerID, types.ContainerStartOptions{}) + err := cli.ContainerStart(startGroupCtx, c.containerID, types.ContainerStartOptions{}) if err == nil { log.Debugw("started container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx) select { - case <-gctx.Done(): + // TODO: this is a cancel signal. We should race it with the ContainerStart operation. + case <-startGroupCtx.Done(): default: started <- c } } + return err } - g.Go(f) + startGroup.Go(f) } - // Wait until we're done to close the started channel. - go func() { - err := g.Wait() - close(started) - - if err != nil { - log.Error(err) - doneCh <- err - } else { - log.Infow("started containers", "count", len(containers)) - } - }() - + // Third we start the pretty printer if !cfg.Background { pretty := NewPrettyPrinter(ow) - // This goroutine tails the sidecar container logs and appends them to the pretty printer. + // Tail the sidecar container logs and appends them to the pretty printer. go func() { t := time.Now().Add(time.Duration(-10) * time.Second) // sidecar is a long running daemon, so we care only about logs around the execution of our test run stream, err := cli.ContainerLogs(ctx, "testground-sidecar", types.ContainerLogsOptions{ @@ -546,8 +535,8 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp }) if err != nil { - doneCh <- err - return + // TODO: handle + log.Errorw("failed to attach sidecar", "error", err) } rstdout, wstdout := io.Pipe() @@ -561,17 +550,14 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp pretty.Append("sidecar ", rstdout, rstderr) }() + // Tail the other container logs and appends them to the pretty printer. // This goroutine takes started containers and attaches them to the pretty printer. go func() { - Outer: for { select { - case tc, more := <-started: - if !more { - break Outer - } - - stream, err := cli.ContainerLogs(ctx, tc.containerID, types.ContainerLogsOptions{ + case c := <-started: + log.Infow("attaching container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx) + stream, err := cli.ContainerLogs(ctx, c.containerID, types.ContainerLogsOptions{ ShowStdout: true, ShowStderr: true, Since: "2019-01-01T00:00:00", @@ -579,8 +565,9 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp }) if err != nil { - doneCh <- err - return + // TODO: handle + log.Errorw("failed to attach container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "error", err) + continue } rstdout, wstdout := io.Pipe() @@ -592,37 +579,85 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp }() // instance tag in output: << group[zero_padded_i] >> (container_id[0:6]), e.g. << miner[003] (a1b2c3) >> - tag := fmt.Sprintf("%s[%03d] (%s)", tc.groupID, tc.groupIdx, tc.containerID[0:6]) + tag := fmt.Sprintf("%s[%03d] (%s)", c.groupID, c.groupIdx, c.containerID[0:6]) pretty.Manage(tag, rstdout, rstderr) - case <-ctx.Done(): - // yield if we've been cancelled. - doneCh <- ctx.Err() + // Exit return } } + }() + } + + // Wait for all container to have started + err = startGroup.Wait() + if err != nil { + log.Error(err) + return + } + + // Finally, we're going to follow our containers until they are done + runGroup, runGroupCtx := errgroup.WithContext(ctx) + + for _, c := range containers { + c := c + f := func() error { + log.Infow("waiting for container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx) + + // TODO: Make sure we wait on a context that will be canceled if the group is canceled. + statusCh, errCh := cli.ContainerWait(ctx, c.containerID, container.WaitConditionNotRunning) select { - case err := <-pretty.Wait(): - doneCh <- err - case <-ctx.Done(): - log.Error(ctx) // yield if we're been cancelled. - doneCh <- ctx.Err() + case err := <-errCh: + log.Infow("container failed", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "error", err) + if err != nil { + return err + } + return nil + case status := <-statusCh: + // TODO: This channel output and error field and the container exit status, + // make sure we don't miss any error. And use the container's exit status to provide more logging. + log.Infow("container exited", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "status", status.StatusCode) + return nil + case <-runGroupCtx.Done(): // race with the group + log.Infow("container group exited", runGroupCtx.Err()) + return nil } - }() + } + runGroup.Go(f) } - select { - case err = <-doneCh: - case <-ctx.Done(): - err = ctx.Err() - } + // When we're here, our containers are started, the outcomes are being collected. + // We waint until either: + // - all container are done and outcome have been received + // - we reach a timeout. + + containersAreCompleteCh := make(chan bool) - cancel() + // Wait for the containers + go func() { + err = runGroup.Wait() + containersAreCompleteCh <- true + }() + + waitingForContainers := true + waitingForOutcomes := true + + log.Info("Containers started, waiting for containers and outcome signals") + for waitingForContainers || waitingForOutcomes { + select { + case <-containersAreCompleteCh: + log.Infow("all containers are complete") + waitingForContainers = false + case <-outcomesCollectIsCompleteCh: + log.Infow("all outcomes are complete") + waitingForOutcomes = false + case <-ctx.Done(): + log.Infow("test run is canceled") + return + } + } - // NOTE: we wait for the outcome done channel here, - // but it doesn't really wait for our containers results. - <-outcomesDoneCh return }