Skip to content

Commit

Permalink
fix: logging deadlock
Browse files Browse the repository at this point in the history
Fix logging deadlock, causing lots of test timeouts.

This refactors how logging shutdown is handled, eliminating unnecessary
captures, use idiomatic wait group to signal processor completion and
remove unnecessary nil initialisation.

Fix race condition in log testing which was reading Msg while the
processor was still running.

Switch to checking GITHUB_RUN_ID environment variable to detect GitHub
as XDG_RUNTIME_DIR can be present in other situations.
  • Loading branch information
stevenh committed Mar 13, 2024
1 parent fe0d3a8 commit bdd8886
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 91 deletions.
112 changes: 57 additions & 55 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,26 @@ type DockerContainer struct {
isRunning bool
imageWasBuilt bool
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
stopLogProductionCh chan bool
logProductionDone chan bool
logProductionError chan error
logProductionMutex sync.Mutex
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
logProductionError chan error

// TODO: Remove locking and wait group once the deprecated StartLogProducer and
// StopLogProducer have been removed and hence logging can only be started and
// stopped once.

// logProductionWaitGroup is used to signal when the log production has stopped.
// This allows stopLogProduction to safely set logProductionStop to nil.
logProductionWaitGroup sync.WaitGroup

// logProductionMutex protects logProductionStop channel so it can be started again.
logProductionMutex sync.Mutex
logProductionStop chan struct{}

logProductionTimeout *time.Duration
logger Logging
lifecycleHooks []ContainerLifecycleHooks
Expand Down Expand Up @@ -652,9 +662,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()

if c.stopLogProductionCh != nil {
if c.logProductionStop != nil {
return errors.New("log production already started")
}

c.logProductionStop = make(chan struct{})
c.logProductionWaitGroup.Add(1)
}

for _, opt := range opts {
Expand All @@ -676,21 +689,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
c.logProductionTimeout = &maxLogProductionTimeout
}

c.stopLogProductionCh = make(chan bool)
c.logProductionDone = make(chan bool)
c.logProductionError = make(chan error, 1)

go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
// signal the log production is done once go routine exits, this prevents race conditions around start/stop
// set c.stopLogProductionCh to nil so that it can be started again
go func() {
defer func() {
defer c.logProductionMutex.Unlock()
close(done)
close(errorCh)
{
c.logProductionMutex.Lock()
c.stopLogProductionCh = nil
}
close(c.logProductionError)
c.logProductionWaitGroup.Done()
}()

since := ""
Expand All @@ -708,15 +712,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
errorCh <- err
c.logProductionError <- err
return
}
defer c.provider.Close()

for {
select {
case <-stop:
errorCh <- r.Close()
case <-c.logProductionStop:
c.logProductionError <- r.Close()
return
default:
h := make([]byte, 8)
Expand Down Expand Up @@ -772,7 +776,7 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
}
}
}
}(c.stopLogProductionCh, c.logProductionDone, c.logProductionError)
}()

return nil
}
Expand All @@ -782,17 +786,18 @@ func (c *DockerContainer) StopLogProducer() error {
return c.stopLogProduction()
}

// StopLogProducer will stop the concurrent process that is reading logs
// stopLogProduction will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) stopLogProduction() error {
// TODO: Remove locking and wait group once StartLogProducer and StopLogProducer
// have been removed and hence logging can only be started / stopped once.
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()
if c.stopLogProductionCh != nil {
c.stopLogProductionCh <- true
// block until the log production is actually done in order to avoid strange races
<-c.logProductionDone
c.stopLogProductionCh = nil
c.logProductionDone = nil
if c.logProductionStop != nil {
close(c.logProductionStop)
c.logProductionWaitGroup.Wait()
// Set c.logProductionStop to nil so that it can be started again.
c.logProductionStop = nil
return <-c.logProductionError
}
return nil
Expand Down Expand Up @@ -1113,17 +1118,16 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

c := &DockerContainer{
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
}

err = c.createdHook(ctx)
Expand Down Expand Up @@ -1216,15 +1220,14 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain
}

dc := &DockerContainer{
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
}

err = dc.startedHook(ctx)
Expand Down Expand Up @@ -1526,7 +1529,6 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)

container.sessionID = core.SessionID()
container.consumers = []LogConsumer{}
container.stopLogProductionCh = nil
container.isRunning = response.State == "running"

// the termination signal should be obtained from the reaper
Expand Down

0 comments on commit bdd8886

Please sign in to comment.