diff --git a/docker.go b/docker.go index c946103556..6d10cf92a0 100644 --- a/docker.go +++ b/docker.go @@ -63,16 +63,26 @@ type DockerContainer struct { isRunning bool imageWasBuilt bool // keepBuiltImage makes Terminate not remove the image if imageWasBuilt. - keepBuiltImage bool - provider *DockerProvider - sessionID string - terminationSignal chan bool - consumers []LogConsumer - raw *types.ContainerJSON - stopLogProductionCh chan bool - logProductionDone chan bool - logProductionError chan error - logProductionMutex sync.Mutex + keepBuiltImage bool + provider *DockerProvider + sessionID string + terminationSignal chan bool + consumers []LogConsumer + raw *types.ContainerJSON + logProductionError chan error + + // TODO: Remove locking and wait group once the deprecated StartLogProducer and + // StopLogProducer have been removed and hence logging can only be started and + // stopped once. + + // logProductionWaitGroup is used to signal when the log production has stopped. + // This allows stopLogProduction to safely set logProductionStop to nil. + logProductionWaitGroup sync.WaitGroup + + // logProductionMutex protects logProductionStop channel so it can be started again. + logProductionMutex sync.Mutex + logProductionStop chan struct{} + logProductionTimeout *time.Duration logger Logging lifecycleHooks []ContainerLifecycleHooks @@ -264,22 +274,13 @@ func (c *DockerContainer) Terminate(ctx context.Context) error { defer c.provider.client.Close() - err := c.terminatingHook(ctx) - if err != nil { - return err - } - - err = c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{ - RemoveVolumes: true, - Force: true, - }) - if err != nil { - return err - } - - err = c.terminatedHook(ctx) - if err != nil { - return err + errs := []error{ + c.terminatingHook(ctx), + c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{ + RemoveVolumes: true, + Force: true, + }), + c.terminatedHook(ctx), } if c.imageWasBuilt && !c.keepBuiltImage { @@ -287,14 +288,12 @@ func (c *DockerContainer) Terminate(ctx context.Context) error { Force: true, PruneChildren: true, }) - if err != nil { - return err - } + errs = append(errs, err) } c.sessionID = "" c.isRunning = false - return nil + return errors.Join(errs...) } // update container raw info @@ -675,9 +674,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro c.logProductionMutex.Lock() defer c.logProductionMutex.Unlock() - if c.stopLogProductionCh != nil { + if c.logProductionStop != nil { return errors.New("log production already started") } + + c.logProductionStop = make(chan struct{}) + c.logProductionWaitGroup.Add(1) } for _, opt := range opts { @@ -699,21 +701,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro c.logProductionTimeout = &maxLogProductionTimeout } - c.stopLogProductionCh = make(chan bool) - c.logProductionDone = make(chan bool) c.logProductionError = make(chan error, 1) - go func(stop <-chan bool, done chan<- bool, errorCh chan error) { - // signal the log production is done once go routine exits, this prevents race conditions around start/stop - // set c.stopLogProductionCh to nil so that it can be started again + go func() { defer func() { - defer c.logProductionMutex.Unlock() - close(done) - close(errorCh) - { - c.logProductionMutex.Lock() - c.stopLogProductionCh = nil - } + close(c.logProductionError) + c.logProductionWaitGroup.Done() }() since := "" @@ -731,15 +724,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) if err != nil { - errorCh <- err + c.logProductionError <- err return } defer c.provider.Close() for { select { - case <-stop: - errorCh <- r.Close() + case <-c.logProductionStop: + c.logProductionError <- r.Close() return default: h := make([]byte, 8) @@ -795,7 +788,7 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro } } } - }(c.stopLogProductionCh, c.logProductionDone, c.logProductionError) + }() return nil } @@ -805,17 +798,18 @@ func (c *DockerContainer) StopLogProducer() error { return c.stopLogProduction() } -// StopLogProducer will stop the concurrent process that is reading logs +// stopLogProduction will stop the concurrent process that is reading logs // and sending them to each added LogConsumer func (c *DockerContainer) stopLogProduction() error { + // TODO: Remove locking and wait group once StartLogProducer and StopLogProducer + // have been removed and hence logging can only be started / stopped once. c.logProductionMutex.Lock() defer c.logProductionMutex.Unlock() - if c.stopLogProductionCh != nil { - c.stopLogProductionCh <- true - // block until the log production is actually done in order to avoid strange races - <-c.logProductionDone - c.stopLogProductionCh = nil - c.logProductionDone = nil + if c.logProductionStop != nil { + close(c.logProductionStop) + c.logProductionWaitGroup.Wait() + // Set c.logProductionStop to nil so that it can be started again. + c.logProductionStop = nil return <-c.logProductionError } return nil @@ -1122,17 +1116,16 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque } c := &DockerContainer{ - ID: resp.ID, - WaitingFor: req.WaitingFor, - Image: imageName, - imageWasBuilt: req.ShouldBuildImage(), - keepBuiltImage: req.ShouldKeepBuiltImage(), - sessionID: core.SessionID(), - provider: p, - terminationSignal: termSignal, - stopLogProductionCh: nil, - logger: p.Logger, - lifecycleHooks: req.LifecycleHooks, + ID: resp.ID, + WaitingFor: req.WaitingFor, + Image: imageName, + imageWasBuilt: req.ShouldBuildImage(), + keepBuiltImage: req.ShouldKeepBuiltImage(), + sessionID: core.SessionID(), + provider: p, + terminationSignal: termSignal, + logger: p.Logger, + lifecycleHooks: req.LifecycleHooks, } err = c.createdHook(ctx) @@ -1225,15 +1218,14 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain } dc := &DockerContainer{ - ID: c.ID, - WaitingFor: req.WaitingFor, - Image: c.Image, - sessionID: sessionID, - provider: p, - terminationSignal: termSignal, - stopLogProductionCh: nil, - logger: p.Logger, - lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)}, + ID: c.ID, + WaitingFor: req.WaitingFor, + Image: c.Image, + sessionID: sessionID, + provider: p, + terminationSignal: termSignal, + logger: p.Logger, + lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)}, } err = dc.startedHook(ctx) @@ -1545,7 +1537,6 @@ func containerFromDockerResponse(ctx context.Context, response types.Container) container.sessionID = core.SessionID() container.consumers = []LogConsumer{} - container.stopLogProductionCh = nil container.isRunning = response.State == "running" // the termination signal should be obtained from the reaper diff --git a/lifecycle.go b/lifecycle.go index 4a10a90842..1cd0a8beb7 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -2,6 +2,7 @@ package testcontainers import ( "context" + "errors" "fmt" "io" "strings" @@ -225,65 +226,40 @@ var defaultReadinessHook = func() ContainerLifecycleHooks { // creatingHook is a hook that will be called before a container is created. func (req ContainerRequest) creatingHook(ctx context.Context) error { - for _, lifecycleHooks := range req.LifecycleHooks { - err := lifecycleHooks.Creating(ctx)(req) - if err != nil { - return err - } + errs := make([]error, len(req.LifecycleHooks)) + for i, lifecycleHooks := range req.LifecycleHooks { + errs[i] = lifecycleHooks.Creating(ctx)(req) } - return nil + return errors.Join(errs...) } -// createdHook is a hook that will be called after a container is created +// createdHook is a hook that will be called after a container is created. func (c *DockerContainer) createdHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostCreates)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostCreates + }) } -// startingHook is a hook that will be called before a container is started +// startingHook is a hook that will be called before a container is started. func (c *DockerContainer) startingHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PreStarts)(c) - if err != nil { - c.printLogs(ctx, err) - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, true, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PreStarts + }) } -// startedHook is a hook that will be called after a container is started +// startedHook is a hook that will be called after a container is started. func (c *DockerContainer) startedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostStarts)(c) - if err != nil { - c.printLogs(ctx, err) - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, true, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostStarts + }) } -// readiedHook is a hook that will be called after a container is ready +// readiedHook is a hook that will be called after a container is ready. func (c *DockerContainer) readiedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostReadies)(c) - if err != nil { - c.printLogs(ctx, err) - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, true, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostReadies + }) } // printLogs is a helper function that will print the logs of a Docker container @@ -304,49 +280,47 @@ func (c *DockerContainer) printLogs(ctx context.Context, cause error) { c.logger.Printf("container logs (%s):\n%s", cause, b) } -// stoppingHook is a hook that will be called before a container is stopped +// stoppingHook is a hook that will be called before a container is stopped. func (c *DockerContainer) stoppingHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PreStops)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PreStops + }) } -// stoppedHook is a hook that will be called after a container is stopped +// stoppedHook is a hook that will be called after a container is stopped. func (c *DockerContainer) stoppedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostStops)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostStops + }) } -// terminatingHook is a hook that will be called before a container is terminated +// terminatingHook is a hook that will be called before a container is terminated. func (c *DockerContainer) terminatingHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PreTerminates)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PreTerminates + }) } -// terminatedHook is a hook that will be called after a container is terminated +// terminatedHook is a hook that will be called after a container is terminated. func (c *DockerContainer) terminatedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostTerminates)(c) - if err != nil { - return err + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostTerminates + }) +} + +// applyLifecycleHooks applies all lifecycle hooks reporting the container logs on error if logError is true. +func (c *DockerContainer) applyLifecycleHooks(ctx context.Context, logError bool, hooks func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook) error { + errs := make([]error, len(c.lifecycleHooks)) + for i, lifecycleHooks := range c.lifecycleHooks { + errs[i] = containerHookFn(ctx, hooks(lifecycleHooks))(c) + } + + if err := errors.Join(errs...); err != nil { + if logError { + c.printLogs(ctx, err) } + + return err } return nil @@ -369,13 +343,12 @@ func (c ContainerLifecycleHooks) Creating(ctx context.Context) func(req Containe // container lifecycle hooks. The created function will iterate over all the hooks and call them one by one. func containerHookFn(ctx context.Context, containerHook []ContainerHook) func(container Container) error { return func(container Container) error { - for _, hook := range containerHook { - if err := hook(ctx, container); err != nil { - return err - } + errs := make([]error, len(containerHook)) + for i, hook := range containerHook { + errs[i] = hook(ctx, container) } - return nil + return errors.Join(errs...) } } diff --git a/logconsumer_test.go b/logconsumer_test.go index 9c9b25fa09..192a00f954 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "strings" + "sync" "testing" "time" @@ -23,8 +24,9 @@ import ( const lastMessage = "DONE" type TestLogConsumer struct { - Msgs []string - Done chan bool + mtx sync.Mutex + msgs []string + Done chan struct{} // Accepted provides a blocking way of ensuring the logs messages have been consumed. // This allows for proper synchronization during Test_StartStop in particular. @@ -35,11 +37,21 @@ type TestLogConsumer struct { func (g *TestLogConsumer) Accept(l Log) { s := string(l.Content) if s == fmt.Sprintf("echo %s\n", lastMessage) { - g.Done <- true + close(g.Done) return } g.Accepted <- s - g.Msgs = append(g.Msgs, s) + + g.mtx.Lock() + defer g.mtx.Unlock() + g.msgs = append(g.msgs, s) +} + +func (g *TestLogConsumer) Msgs() []string { + g.mtx.Lock() + defer g.mtx.Unlock() + + return g.msgs } // devNullAcceptorChan returns string channel that essentially sends all strings to dev null @@ -57,8 +69,8 @@ func Test_LogConsumerGetsCalled(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -100,7 +112,7 @@ func Test_LogConsumerGetsCalled(t *testing.T) { t.Fatal("never received final log message") } - assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) + assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs()) terminateContainerOnEnd(t, ctx, c) } @@ -172,13 +184,13 @@ func Test_MultipleLogConsumers(t *testing.T) { ctx := context.Background() first := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } second := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -214,13 +226,13 @@ func Test_MultipleLogConsumers(t *testing.T) { <-first.Done <-second.Done - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs) - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs()) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs()) require.NoError(t, c.Terminate(ctx)) } func TestContainerLogWithErrClosed(t *testing.T) { - if os.Getenv("XDG_RUNTIME_DIR") != "" { + if os.Getenv("GITHUB_RUN_ID") != "" { t.Skip("Skipping as flaky on GitHub Actions, Please see https://github.com/testcontainers/testcontainers-go/issues/1924") } @@ -290,8 +302,8 @@ func TestContainerLogWithErrClosed(t *testing.T) { } consumer := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -317,7 +329,7 @@ func TestContainerLogWithErrClosed(t *testing.T) { // Gather the initial container logs time.Sleep(time.Second * 1) - existingLogs := len(consumer.Msgs) + existingLogs := len(consumer.Msgs()) hitNginx := func() { i, _, err := dind.Exec(ctx, []string{"wget", "--spider", "localhost:" + port.Port()}) @@ -328,10 +340,11 @@ func TestContainerLogWithErrClosed(t *testing.T) { hitNginx() time.Sleep(time.Second * 1) - if len(consumer.Msgs)-existingLogs != 1 { - t.Fatalf("logConsumer should have 1 new log message, instead has: %v", consumer.Msgs[existingLogs:]) + msgs := consumer.Msgs() + if len(msgs)-existingLogs != 1 { + t.Fatalf("logConsumer should have 1 new log message, instead has: %v", msgs[existingLogs:]) } - existingLogs = len(consumer.Msgs) + existingLogs = len(consumer.Msgs()) iptableArgs := []string{ "INPUT", "-p", "tcp", "--dport", "2375", @@ -351,10 +364,11 @@ func TestContainerLogWithErrClosed(t *testing.T) { hitNginx() hitNginx() time.Sleep(time.Second * 1) - if len(consumer.Msgs)-existingLogs != 2 { + msgs = consumer.Msgs() + if len(msgs)-existingLogs != 2 { t.Fatalf( "LogConsumer should have 2 new log messages after detecting closed connection and"+ - " re-requesting logs. Instead has:\n%s", consumer.Msgs[existingLogs:], + " re-requesting logs. Instead has:\n%s", msgs[existingLogs:], ) } } @@ -389,8 +403,8 @@ func TestContainerLogsShouldBeWithoutStreamHeader(t *testing.T) { func TestContainerLogsEnableAtStart(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -434,7 +448,7 @@ func TestContainerLogsEnableAtStart(t *testing.T) { case <-time.After(10 * time.Second): t.Fatal("never received final log message") } - assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) + assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs()) terminateContainerOnEnd(t, ctx, c) } @@ -443,8 +457,8 @@ func Test_StartLogProductionStillStartsWithTooLowTimeout(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -475,8 +489,8 @@ func Test_StartLogProductionStillStartsWithTooHighTimeout(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -518,10 +532,11 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // Context with cancellation functionality for simulating user interruption ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Ensure it gets called. first := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -555,8 +570,8 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { require.NoError(t, err) second := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -592,7 +607,7 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // Handling the termination of the containers defer func() { shutdownCtx, shutdownCancel := context.WithTimeout( - context.Background(), 60*time.Second, + context.Background(), 10*time.Second, ) defer shutdownCancel() _ = c.Terminate(shutdownCtx) @@ -604,8 +619,8 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // We check log size due to context cancellation causing // varying message counts, leading to test failure. - assert.GreaterOrEqual(t, len(first.Msgs), 2) - assert.GreaterOrEqual(t, len(second.Msgs), 2) + assert.GreaterOrEqual(t, len(first.Msgs()), 2) + assert.GreaterOrEqual(t, len(second.Msgs()), 2) // Restore stderr w.Close()