Skip to content

Commit

Permalink
runner/docker: rewrite sync to fix race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
laurentsenta committed Aug 1, 2022
1 parent 0626585 commit f84592a
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 81 deletions.
27 changes: 27 additions & 0 deletions pkg/runner/common_result.go
Expand Up @@ -30,3 +30,30 @@ func newResult(input *api.RunInput) *Result {

return result
}

func (r *Result) addOutcome(groupID string, outcome task.Outcome) {
switch outcome {
case task.OutcomeSuccess:
r.Outcomes[groupID].Ok++
default:
// skip
}
}
func (r *Result) countTotalInstances() int {
count := 0
for _, g := range r.Outcomes {
count += g.Total
}
return count
}

// TODO: this should be a getter instead of a mutation
func (r *Result) updateOutcome() {
for _, g := range r.Outcomes {
if g.Total != g.Ok {
r.Outcome = task.OutcomeFailure
return
}
}
r.Outcome = task.OutcomeSuccess
}
197 changes: 116 additions & 81 deletions pkg/runner/local_docker.go
Expand Up @@ -201,42 +201,43 @@ func (r *LocalDockerRunner) setupSyncClient() error {
return nil
}

// collectOutcomes listens to the sync service and collects the outcome for every test instance.
// It stops when all instances have submitted a result or the context was canceled.
func (r *LocalDockerRunner) collectOutcomes(ctx context.Context, result *Result, tpl *runtime.RunParams) (chan bool, error) {
eventsCh, err := r.syncClient.SubscribeEvents(ctx, tpl)
if err != nil {
return nil, err
}

// TODO: eventually we'll keep a trace of each test instance status.
// Right now, if a container sends multiple events, it will mess up the outcomes.
// We have to pass its group id to the container, so that it can send us back messages
// with its own id.
expectingOutcomes := result.countTotalInstances()
done := make(chan bool)

go func() {
running := true
for running {
for running && expectingOutcomes > 0 {
select {
case <-ctx.Done():
running = false
case e := <-eventsCh:
// for now we emit only outcome OK events, so no need for more checks
if e.SuccessEvent != nil {
se := e.SuccessEvent
o := result.Outcomes[se.TestGroupID]
o.Ok = o.Ok + 1
result.addOutcome(e.SuccessEvent.TestGroupID, task.OutcomeSuccess)
expectingOutcomes -= 1
} else if e.FailureEvent != nil {
result.addOutcome(e.SuccessEvent.TestGroupID, task.OutcomeFailure)
expectingOutcomes -= 1
} else if e.CrashEvent != nil {
result.addOutcome(e.SuccessEvent.TestGroupID, task.OutcomeFailure)
expectingOutcomes -= 1
}
// else: skip
}
}

result.Outcome = task.OutcomeSuccess
if len(result.Outcomes) == 0 {
result.Outcome = task.OutcomeFailure
}

for g := range result.Outcomes {
if result.Outcomes[g].Total != result.Outcomes[g].Ok {
result.Outcome = task.OutcomeFailure
break
}
}

result.updateOutcome()
done <- true
}()

Expand Down Expand Up @@ -274,12 +275,6 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
Result: result,
}

// Grab a read lock. This will allow many runs to run simultaneously, but
// they will be exclusive of state-altering healthchecks.
// TODO: I'm not sure this is true anymore.
r.lk.RLock()
defer r.lk.RUnlock()

defer func() {
if err != nil {
// TODO: maybe replace only if the outcome is a default value.
Expand All @@ -296,6 +291,12 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
return
}

// Grab a read lock. This will allow many runs to run simultaneously, but
// they will be exclusive of state-altering healthchecks.
// TODO: I'm not sure this is true anymore.
r.lk.RLock()
defer r.lk.RUnlock()

// ## Prepare Execution Context

// Create a docker client.
Expand Down Expand Up @@ -478,25 +479,23 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
return
}

var (
doneCh = make(chan error, 2)
started = make(chan testContainerInstance, len(containers))
ratelimit = make(chan struct{}, 16)
)

ctxContainers, cancel := context.WithCancel(ctx)
defer cancel()
// ## Start the containers & log their outputs.

// collect the outcomes in parallel while the process runs.
outcomesDoneCh, err := r.collectOutcomes(ctxContainers, result, &template)
// First we collect every container outcomes.
outcomesCollectIsCompleteCh, err := r.collectOutcomes(ctx, result, &template)
if err != nil {
log.Error(err)
return
}

// Second we start the containers
log.Infow("starting containers", "count", len(containers))
var (
startGroup, startGroupCtx = errgroup.WithContext(ctx)
ratelimit = make(chan struct{}, 16)
started = make(chan testContainerInstance, len(containers))
)

g, gctx := errgroup.WithContext(ctxContainers)
for _, c := range containers {
c := c
f := func() error {
Expand All @@ -505,37 +504,27 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp

log.Infow("starting container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx)

err := cli.ContainerStart(ctx, c.containerID, types.ContainerStartOptions{})
err := cli.ContainerStart(startGroupCtx, c.containerID, types.ContainerStartOptions{})
if err == nil {
log.Debugw("started container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx)
select {
case <-gctx.Done():
// TODO: this is a cancel signal. We should race it with the ContainerStart operation.
case <-startGroupCtx.Done():
default:
started <- c
}
}

return err
}
g.Go(f)
startGroup.Go(f)
}

// Wait until we're done to close the started channel.
go func() {
err := g.Wait()
close(started)

if err != nil {
log.Error(err)
doneCh <- err
} else {
log.Infow("started containers", "count", len(containers))
}
}()

// Third we start the pretty printer
if !cfg.Background {
pretty := NewPrettyPrinter(ow)

// This goroutine tails the sidecar container logs and appends them to the pretty printer.
// Tail the sidecar container logs and appends them to the pretty printer.
go func() {
t := time.Now().Add(time.Duration(-10) * time.Second) // sidecar is a long running daemon, so we care only about logs around the execution of our test run
stream, err := cli.ContainerLogs(ctx, "testground-sidecar", types.ContainerLogsOptions{
Expand All @@ -546,8 +535,8 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
})

if err != nil {
doneCh <- err
return
// TODO: handle
log.Errorw("failed to attach sidecar", "error", err)
}

rstdout, wstdout := io.Pipe()
Expand All @@ -561,26 +550,24 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
pretty.Append("sidecar ", rstdout, rstderr)
}()

// Tail the other container logs and appends them to the pretty printer.
// This goroutine takes started containers and attaches them to the pretty printer.
go func() {
Outer:
for {
select {
case tc, more := <-started:
if !more {
break Outer
}

stream, err := cli.ContainerLogs(ctx, tc.containerID, types.ContainerLogsOptions{
case c := <-started:
log.Infow("attaching container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx)
stream, err := cli.ContainerLogs(ctx, c.containerID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Since: "2019-01-01T00:00:00",
Follow: true,
})

if err != nil {
doneCh <- err
return
// TODO: handle
log.Errorw("failed to attach container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "error", err)
continue
}

rstdout, wstdout := io.Pipe()
Expand All @@ -592,37 +579,85 @@ func (r *LocalDockerRunner) Run(ctx context.Context, input *api.RunInput, ow *rp
}()

// instance tag in output: << group[zero_padded_i] >> (container_id[0:6]), e.g. << miner[003] (a1b2c3) >>
tag := fmt.Sprintf("%s[%03d] (%s)", tc.groupID, tc.groupIdx, tc.containerID[0:6])
tag := fmt.Sprintf("%s[%03d] (%s)", c.groupID, c.groupIdx, c.containerID[0:6])
pretty.Manage(tag, rstdout, rstderr)

case <-ctx.Done():
// yield if we've been cancelled.
doneCh <- ctx.Err()
// Exit
return
}
}
}()
}

// Wait for all container to have started
err = startGroup.Wait()
if err != nil {
log.Error(err)
return
}

// Finally, we're going to follow our containers until they are done
runGroup, runGroupCtx := errgroup.WithContext(ctx)

for _, c := range containers {
c := c
f := func() error {
log.Infow("waiting for container", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx)

// TODO: Make sure we wait on a context that will be canceled if the group is canceled.
statusCh, errCh := cli.ContainerWait(ctx, c.containerID, container.WaitConditionNotRunning)

select {
case err := <-pretty.Wait():
doneCh <- err
case <-ctx.Done():
log.Error(ctx) // yield if we're been cancelled.
doneCh <- ctx.Err()
case err := <-errCh:
log.Infow("container failed", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "error", err)
if err != nil {
return err
}
return nil
case status := <-statusCh:
// TODO: This channel output and error field and the container exit status,
// make sure we don't miss any error. And use the container's exit status to provide more logging.
log.Infow("container exited", "id", c.containerID, "group", c.groupID, "group_index", c.groupIdx, "status", status.StatusCode)
return nil
case <-runGroupCtx.Done(): // race with the group
log.Infow("container group exited", runGroupCtx.Err())
return nil
}
}()
}
runGroup.Go(f)
}

select {
case err = <-doneCh:
case <-ctx.Done():
err = ctx.Err()
}
// When we're here, our containers are started, the outcomes are being collected.
// We waint until either:
// - all container are done and outcome have been received
// - we reach a timeout.

containersAreCompleteCh := make(chan bool)

cancel()
// Wait for the containers
go func() {
err = runGroup.Wait()
containersAreCompleteCh <- true
}()

waitingForContainers := true
waitingForOutcomes := true

log.Info("Containers started, waiting for containers and outcome signals")
for waitingForContainers || waitingForOutcomes {
select {
case <-containersAreCompleteCh:
log.Infow("all containers are complete")
waitingForContainers = false
case <-outcomesCollectIsCompleteCh:
log.Infow("all outcomes are complete")
waitingForOutcomes = false
case <-ctx.Done():
log.Infow("test run is canceled")
return
}
}

// NOTE: we wait for the outcome done channel here,
// but it doesn't really wait for our containers results.
<-outcomesDoneCh
return
}

Expand Down

0 comments on commit f84592a

Please sign in to comment.