Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: logging deadlock #2346

Merged
merged 2 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
141 changes: 66 additions & 75 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,26 @@ type DockerContainer struct {
isRunning bool
imageWasBuilt bool
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
stopLogProductionCh chan bool
logProductionDone chan bool
logProductionError chan error
logProductionMutex sync.Mutex
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
logProductionError chan error

// TODO: Remove locking and wait group once the deprecated StartLogProducer and
// StopLogProducer have been removed and hence logging can only be started and
// stopped once.

// logProductionWaitGroup is used to signal when the log production has stopped.
// This allows stopLogProduction to safely set logProductionStop to nil.
logProductionWaitGroup sync.WaitGroup

// logProductionMutex protects logProductionStop channel so it can be started again.
logProductionMutex sync.Mutex
logProductionStop chan struct{}

logProductionTimeout *time.Duration
logger Logging
lifecycleHooks []ContainerLifecycleHooks
Expand Down Expand Up @@ -264,37 +274,26 @@ func (c *DockerContainer) Terminate(ctx context.Context) error {

defer c.provider.client.Close()

err := c.terminatingHook(ctx)
if err != nil {
return err
}

err = c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{
RemoveVolumes: true,
Force: true,
})
if err != nil {
return err
}

err = c.terminatedHook(ctx)
if err != nil {
return err
errs := []error{
c.terminatingHook(ctx),
c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{
RemoveVolumes: true,
Force: true,
}),
c.terminatedHook(ctx),
}

if c.imageWasBuilt && !c.keepBuiltImage {
_, err := c.provider.client.ImageRemove(ctx, c.Image, types.ImageRemoveOptions{
Force: true,
PruneChildren: true,
})
if err != nil {
return err
}
errs = append(errs, err)
}

c.sessionID = ""
c.isRunning = false
return nil
return errors.Join(errs...)
}

// update container raw info
Expand Down Expand Up @@ -675,9 +674,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()

if c.stopLogProductionCh != nil {
if c.logProductionStop != nil {
return errors.New("log production already started")
}

c.logProductionStop = make(chan struct{})
c.logProductionWaitGroup.Add(1)
}

for _, opt := range opts {
Expand All @@ -699,21 +701,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
c.logProductionTimeout = &maxLogProductionTimeout
}

c.stopLogProductionCh = make(chan bool)
c.logProductionDone = make(chan bool)
c.logProductionError = make(chan error, 1)

go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
// signal the log production is done once go routine exits, this prevents race conditions around start/stop
// set c.stopLogProductionCh to nil so that it can be started again
go func() {
defer func() {
defer c.logProductionMutex.Unlock()
close(done)
close(errorCh)
{
c.logProductionMutex.Lock()
c.stopLogProductionCh = nil
}
close(c.logProductionError)
c.logProductionWaitGroup.Done()
}()

since := ""
Expand All @@ -731,15 +724,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
errorCh <- err
c.logProductionError <- err
return
}
defer c.provider.Close()

for {
select {
case <-stop:
errorCh <- r.Close()
case <-c.logProductionStop:
c.logProductionError <- r.Close()
return
default:
h := make([]byte, 8)
Expand Down Expand Up @@ -795,7 +788,7 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
}
}
}
}(c.stopLogProductionCh, c.logProductionDone, c.logProductionError)
}()

return nil
}
Expand All @@ -805,17 +798,18 @@ func (c *DockerContainer) StopLogProducer() error {
return c.stopLogProduction()
}

// StopLogProducer will stop the concurrent process that is reading logs
// stopLogProduction will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) stopLogProduction() error {
// TODO: Remove locking and wait group once StartLogProducer and StopLogProducer
// have been removed and hence logging can only be started / stopped once.
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()
if c.stopLogProductionCh != nil {
c.stopLogProductionCh <- true
// block until the log production is actually done in order to avoid strange races
<-c.logProductionDone
c.stopLogProductionCh = nil
c.logProductionDone = nil
if c.logProductionStop != nil {
close(c.logProductionStop)
c.logProductionWaitGroup.Wait()
// Set c.logProductionStop to nil so that it can be started again.
c.logProductionStop = nil
return <-c.logProductionError
}
return nil
Expand Down Expand Up @@ -1122,17 +1116,16 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

c := &DockerContainer{
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
}

err = c.createdHook(ctx)
Expand Down Expand Up @@ -1225,15 +1218,14 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain
}

dc := &DockerContainer{
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
}

err = dc.startedHook(ctx)
Expand Down Expand Up @@ -1545,7 +1537,6 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)

container.sessionID = core.SessionID()
container.consumers = []LogConsumer{}
container.stopLogProductionCh = nil
container.isRunning = response.State == "running"

// the termination signal should be obtained from the reaper
Expand Down