Skip to content

Commit

Permalink
fix: proper synchronisation for start/stop log production (#2576)
Browse files Browse the repository at this point in the history
* fix: proper synchronisation for start/stop log production

* fix: protect channel close

* fix: properly close the log channel

* chore: do not close the channel
  • Loading branch information
mdelapenya committed Jun 17, 2024
1 parent 9a93db4 commit 77da736
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 26 deletions.
36 changes: 10 additions & 26 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ type DockerContainer struct {

// logProductionWaitGroup is used to signal when the log production has stopped.
// This allows stopLogProduction to safely set logProductionStop to nil.
// See simplification in https://go.dev/play/p/x0pOElF2Vjf
logProductionWaitGroup sync.WaitGroup

// logProductionMutex protects logProductionStop channel so it can be started again.
logProductionMutex sync.Mutex
logProductionStop chan struct{}
logProductionStop chan struct{}

logProductionTimeout *time.Duration
logger Logging
Expand Down Expand Up @@ -697,17 +696,8 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu
// Use functional option WithLogProductionTimeout() to override default timeout. If it's
// lower than 5s and greater than 60s it will be set to 5s or 60s respectively.
func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error {
{
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()

if c.logProductionStop != nil {
return errors.New("log production already started")
}

c.logProductionStop = make(chan struct{})
c.logProductionWaitGroup.Add(1)
}
c.logProductionStop = make(chan struct{}, 1) // buffered channel to avoid blocking
c.logProductionWaitGroup.Add(1)

for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -828,18 +818,12 @@ func (c *DockerContainer) StopLogProducer() error {
// stopLogProduction will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) stopLogProduction() error {
// TODO: Remove locking and wait group once StartLogProducer and StopLogProducer
// have been removed and hence logging can only be started / stopped once.
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()
if c.logProductionStop != nil {
close(c.logProductionStop)
c.logProductionWaitGroup.Wait()
// Set c.logProductionStop to nil so that it can be started again.
c.logProductionStop = nil
return <-c.logProductionError
}
return nil
// signal the log production to stop
c.logProductionStop <- struct{}{}

c.logProductionWaitGroup.Wait()

return <-c.logProductionError
}

// GetLogProductionErrorChannel exposes the only way for the consumer
Expand Down
55 changes: 55 additions & 0 deletions logconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,58 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) {
// the multiple containers.
assert.False(t, strings.Contains(actual, logStoppedForOutOfSyncMessage))
}

type FooLogConsumer struct {
LogChannel chan string
}

func (c FooLogConsumer) Accept(rawLog Log) {
log := string(rawLog.Content)
c.LogChannel <- log
}

func NewFooLogConsumer() *FooLogConsumer {
return &FooLogConsumer{
LogChannel: make(chan string),
}
}

func TestRestartContainerWithLogConsumer(t *testing.T) {
logConsumer := NewFooLogConsumer()

ctx := context.Background()
container, err := GenericContainer(ctx, GenericContainerRequest{
ContainerRequest: ContainerRequest{
Image: "hello-world",
AlwaysPullImage: true,
LogConsumerCfg: &LogConsumerConfig{
Consumers: []LogConsumer{logConsumer},
},
},
Started: false,
})
if err != nil {
t.Fatalf("Cant create container: %s", err.Error())
}

err = container.Start(ctx)
if err != nil {
t.Fatalf("Cant start container: %s", err.Error())
}

d := 30 * time.Second
err = container.Stop(ctx, &d)
if err != nil {
t.Fatalf("Cant stop container: %s", err.Error())
}
err = container.Start(ctx)
if err != nil {
t.Fatalf("Cant start container: %s", err.Error())
}

for s := range logConsumer.LogChannel {
if strings.Contains(s, "Hello from Docker!") {
break
}
}
}

0 comments on commit 77da736

Please sign in to comment.