From 16c689a9fcafddfa7205db585a55f1e3ebb5a4f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Espino?= Date: Thu, 25 Jan 2024 09:37:08 +0100 Subject: [PATCH] Adding LogConsumers start as part of the ContainerRequest (#2073) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adding LogConsumers and LogProducer start as part of the GenericRequest * Apply suggestions from code review Co-authored-by: Manuel de la Peña * Addressing PR review comments * Adding docs and global option * Moving deprecation comments to the right place * chore: define a StdoutLogConsumer to be exposed * feat: support configuring the producer at the container request * chore: deprecate StartLogProducer * chore: deprecate StopLogProducer and FollowOutput * docs: document the follow logs feature * chore: update tests * doc: add warning regarding timeouts * chore: move to the container lifecycle * chore: typo * chore: remove StartStop test The lifecycle will be removed * docs: wording * fix: handle error * docs: replace producer word with production * chore: remove the concept of log producer * fix: lint * fix: wrong assertion * chore: use fixed mysql 8.0.36 version in test * chore: use NotEmpty assertion --------- Co-authored-by: Manuel de la Peña Co-authored-by: Manuel de la Peña --- container.go | 25 ++- docker.go | 215 ++++++++++-------- docs/features/common_functional_options.md | 18 ++ docs/features/follow_logs.md | 119 ++++++---- docs/modules/pulsar.md | 18 -- logconsumer.go | 12 + logconsumer_test.go | 242 ++++++++++----------- modules/pulsar/pulsar.go | 3 +- modules/pulsar/pulsar_test.go | 34 +-- options.go | 11 + options_test.go | 34 +++ testing.go | 12 + 12 files changed, 435 insertions(+), 308 deletions(-) diff --git a/container.go b/container.go index 8497c86d8b..7168e72370 100644 --- a/container.go +++ b/container.go @@ -43,17 +43,17 @@ type Container interface { Ports(context.Context) (nat.PortMap, error) // get all exposed ports SessionID() string // get session id IsRunning() bool - Start(context.Context) error // start the container - Stop(context.Context, *time.Duration) error // stop the container - Terminate(context.Context) error // terminate the container - Logs(context.Context) (io.ReadCloser, error) // Get logs of the container - FollowOutput(LogConsumer) - StartLogProducer(context.Context, ...LogProducerOption) error - StopLogProducer() error - Name(context.Context) (string, error) // get container name - State(context.Context) (*types.ContainerState, error) // returns container's running state - Networks(context.Context) ([]string, error) // get container networks - NetworkAliases(context.Context) (map[string][]string, error) // get container network aliases for a network + Start(context.Context) error // start the container + Stop(context.Context, *time.Duration) error // stop the container + Terminate(context.Context) error // terminate the container + Logs(context.Context) (io.ReadCloser, error) // Get logs of the container + FollowOutput(LogConsumer) // Deprecated: it will be removed in the next major release + StartLogProducer(context.Context, ...LogProductionOption) error // Deprecated: Use the ContainerRequest instead + StopLogProducer() error // Deprecated: it will be removed in the next major release + Name(context.Context) (string, error) // get container name + State(context.Context) (*types.ContainerState, error) // returns container's running state + Networks(context.Context) ([]string, error) // get container networks + NetworkAliases(context.Context) (map[string][]string, error) // get container network aliases for a network Exec(ctx context.Context, cmd []string, options ...tcexec.ProcessOption) (int, io.Reader, error) ContainerIP(context.Context) (string, error) // get container ip ContainerIPs(context.Context) ([]string, error) // get all container IPs @@ -61,7 +61,7 @@ type Container interface { CopyDirToContainer(ctx context.Context, hostDirPath string, containerParentPath string, fileMode int64) error CopyFileToContainer(ctx context.Context, hostFilePath string, containerFilePath string, fileMode int64) error CopyFileFromContainer(ctx context.Context, filePath string) (io.ReadCloser, error) - GetLogProducerErrorChannel() <-chan error + GetLogProductionErrorChannel() <-chan error } // ImageBuildInfo defines what is needed to build an image @@ -143,6 +143,7 @@ type ContainerRequest struct { HostConfigModifier func(*container.HostConfig) // Modifier for the host config before container creation EnpointSettingsModifier func(map[string]*network.EndpointSettings) // Modifier for the network settings before container creation LifecycleHooks []ContainerLifecycleHooks // define hooks to be executed during container lifecycle + LogConsumerCfg *LogConsumerConfig // define the configuration for the log producer and its log consumers to follow the logs } // containerOptions functional options for a container diff --git a/docker.go b/docker.go index 9657f28f55..9af552c3dd 100644 --- a/docker.go +++ b/docker.go @@ -59,19 +59,19 @@ type DockerContainer struct { isRunning bool imageWasBuilt bool // keepBuiltImage makes Terminate not remove the image if imageWasBuilt. - keepBuiltImage bool - provider *DockerProvider - sessionID string - terminationSignal chan bool - consumers []LogConsumer - raw *types.ContainerJSON - stopProducer chan bool - producerDone chan bool - producerError chan error - producerMutex sync.Mutex - producerTimeout *time.Duration - logger Logging - lifecycleHooks []ContainerLifecycleHooks + keepBuiltImage bool + provider *DockerProvider + sessionID string + terminationSignal chan bool + consumers []LogConsumer + raw *types.ContainerJSON + stopLogProductionCh chan bool + logProductionDone chan bool + logProductionError chan error + logProductionMutex sync.Mutex + logProductionTimeout *time.Duration + logger Logging + lifecycleHooks []ContainerLifecycleHooks } // SetLogger sets the logger for the container @@ -258,11 +258,6 @@ func (c *DockerContainer) Terminate(ctx context.Context) error { return err } - err = c.StopLogProducer() - if err != nil { - return err - } - err = c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{ RemoveVolumes: true, Force: true, @@ -367,9 +362,14 @@ func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) { return pr, nil } -// FollowOutput adds a LogConsumer to be sent logs from the container's -// STDOUT and STDERR +// Deprecated: use the ContainerRequest.LogConsumerConfig field instead. func (c *DockerContainer) FollowOutput(consumer LogConsumer) { + c.followOutput(consumer) +} + +// followOutput adds a LogConsumer to be sent logs from the container's +// STDOUT and STDERR +func (c *DockerContainer) followOutput(consumer LogConsumer) { c.consumers = append(c.consumers, consumer) } @@ -615,30 +615,35 @@ func (c *DockerContainer) CopyToContainer(ctx context.Context, fileContent []byt return nil } -type LogProducerOption func(*DockerContainer) +type LogProductionOption func(*DockerContainer) -// WithLogProducerTimeout is a functional option that sets the timeout for the log producer. +// WithLogProductionTimeout is a functional option that sets the timeout for the log production. // If the timeout is lower than 5s or greater than 60s it will be set to 5s or 60s respectively. -func WithLogProducerTimeout(timeout time.Duration) LogProducerOption { +func WithLogProductionTimeout(timeout time.Duration) LogProductionOption { return func(c *DockerContainer) { - c.producerTimeout = &timeout + c.logProductionTimeout = &timeout } } -// StartLogProducer will start a concurrent process that will continuously read logs +// Deprecated: use the ContainerRequest.LogConsumerConfig field instead. +func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProductionOption) error { + return c.startLogProduction(ctx, opts...) +} + +// startLogProduction will start a concurrent process that will continuously read logs // from the container and will send them to each added LogConsumer. -// Default log producer timeout is 5s. It is used to set the context timeout +// Default log production timeout is 5s. It is used to set the context timeout // which means that each log-reading loop will last at least the specified timeout // and that it cannot be cancelled earlier. -// Use functional option WithLogProducerTimeout() to override default timeout. If it's +// Use functional option WithLogProductionTimeout() to override default timeout. If it's // lower than 5s and greater than 60s it will be set to 5s or 60s respectively. -func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProducerOption) error { +func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error { { - c.producerMutex.Lock() - defer c.producerMutex.Unlock() + c.logProductionMutex.Lock() + defer c.logProductionMutex.Unlock() - if c.stopProducer != nil { - return errors.New("log producer already started") + if c.stopLogProductionCh != nil { + return errors.New("log production already started") } } @@ -646,35 +651,35 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu opt(c) } - minProducerTimeout := time.Duration(5 * time.Second) - maxProducerTimeout := time.Duration(60 * time.Second) + minLogProductionTimeout := time.Duration(5 * time.Second) + maxLogProductionTimeout := time.Duration(60 * time.Second) - if c.producerTimeout == nil { - c.producerTimeout = &minProducerTimeout + if c.logProductionTimeout == nil { + c.logProductionTimeout = &minLogProductionTimeout } - if *c.producerTimeout < minProducerTimeout { - c.producerTimeout = &minProducerTimeout + if *c.logProductionTimeout < minLogProductionTimeout { + c.logProductionTimeout = &minLogProductionTimeout } - if *c.producerTimeout > maxProducerTimeout { - c.producerTimeout = &maxProducerTimeout + if *c.logProductionTimeout > maxLogProductionTimeout { + c.logProductionTimeout = &maxLogProductionTimeout } - c.stopProducer = make(chan bool) - c.producerDone = make(chan bool) - c.producerError = make(chan error, 1) + c.stopLogProductionCh = make(chan bool) + c.logProductionDone = make(chan bool) + c.logProductionError = make(chan error, 1) go func(stop <-chan bool, done chan<- bool, errorCh chan error) { - // signal the producer is done once go routine exits, this prevents race conditions around start/stop - // set c.stopProducer to nil so that it can be started again + // signal the log production is done once go routine exits, this prevents race conditions around start/stop + // set c.stopLogProductionCh to nil so that it can be started again defer func() { - defer c.producerMutex.Unlock() + defer c.logProductionMutex.Unlock() close(done) close(errorCh) { - c.producerMutex.Lock() - c.stopProducer = nil + c.logProductionMutex.Lock() + c.stopLogProductionCh = nil } }() @@ -688,7 +693,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu Since: since, } - ctx, cancel := context.WithTimeout(ctx, *c.producerTimeout) + ctx, cancel := context.WithTimeout(ctx, *c.logProductionTimeout) defer cancel() r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) @@ -757,31 +762,36 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu } } } - }(c.stopProducer, c.producerDone, c.producerError) + }(c.stopLogProductionCh, c.logProductionDone, c.logProductionError) return nil } +// Deprecated: it will be removed in the next major release. +func (c *DockerContainer) StopLogProducer() error { + return c.stopLogProduction() +} + // StopLogProducer will stop the concurrent process that is reading logs // and sending them to each added LogConsumer -func (c *DockerContainer) StopLogProducer() error { - c.producerMutex.Lock() - defer c.producerMutex.Unlock() - if c.stopProducer != nil { - c.stopProducer <- true - // block until the producer is actually done in order to avoid strange races - <-c.producerDone - c.stopProducer = nil - c.producerDone = nil - return <-c.producerError +func (c *DockerContainer) stopLogProduction() error { + c.logProductionMutex.Lock() + defer c.logProductionMutex.Unlock() + if c.stopLogProductionCh != nil { + c.stopLogProductionCh <- true + // block until the log production is actually done in order to avoid strange races + <-c.logProductionDone + c.stopLogProductionCh = nil + c.logProductionDone = nil + return <-c.logProductionError } return nil } -// GetLogProducerErrorChannel exposes the only way for the consumer +// GetLogProductionErrorChannel exposes the only way for the consumer // to be able to listen to errors and react to them. -func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error { - return c.producerError +func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error { + return c.logProductionError } // DockerNetwork represents a network started using Docker @@ -1076,7 +1086,25 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque }, }, PostStarts: []ContainerHook{ - // first post-start hook is to wait for the container to be ready + // first post-start hook is to produce logs and start log consumers + func(ctx context.Context, c Container) error { + dockerContainer := c.(*DockerContainer) + + logConsumerConfig := req.LogConsumerCfg + if logConsumerConfig == nil { + return nil + } + + for _, consumer := range logConsumerConfig.Consumers { + dockerContainer.followOutput(consumer) + } + + if len(logConsumerConfig.Consumers) > 0 { + return dockerContainer.startLogProduction(ctx, logConsumerConfig.Opts...) + } + return nil + }, + // second post-start hook is to wait for the container to be ready func(ctx context.Context, c Container) error { dockerContainer := c.(*DockerContainer) @@ -1096,6 +1124,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque return nil }, }, + PreTerminates: []ContainerHook{ + // first pre-terminate hook is to stop the log production + func(ctx context.Context, c Container) error { + logConsumerConfig := req.LogConsumerCfg + + if logConsumerConfig == nil { + return nil + } + if len(logConsumerConfig.Consumers) == 0 { + return nil + } + + dockerContainer := c.(*DockerContainer) + + return dockerContainer.stopLogProduction() + }, + }, }, } @@ -1131,17 +1176,17 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque } c := &DockerContainer{ - ID: resp.ID, - WaitingFor: req.WaitingFor, - Image: imageName, - imageWasBuilt: req.ShouldBuildImage(), - keepBuiltImage: req.ShouldKeepBuiltImage(), - sessionID: core.SessionID(), - provider: p, - terminationSignal: termSignal, - stopProducer: nil, - logger: p.Logger, - lifecycleHooks: req.LifecycleHooks, + ID: resp.ID, + WaitingFor: req.WaitingFor, + Image: imageName, + imageWasBuilt: req.ShouldBuildImage(), + keepBuiltImage: req.ShouldKeepBuiltImage(), + sessionID: core.SessionID(), + provider: p, + terminationSignal: termSignal, + stopLogProductionCh: nil, + logger: p.Logger, + lifecycleHooks: req.LifecycleHooks, } err = c.createdHook(ctx) @@ -1200,15 +1245,15 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain } dc := &DockerContainer{ - ID: c.ID, - WaitingFor: req.WaitingFor, - Image: c.Image, - sessionID: sessionID, - provider: p, - terminationSignal: termSignal, - stopProducer: nil, - logger: p.Logger, - isRunning: c.State == "running", + ID: c.ID, + WaitingFor: req.WaitingFor, + Image: c.Image, + sessionID: sessionID, + provider: p, + terminationSignal: termSignal, + stopLogProductionCh: nil, + logger: p.Logger, + isRunning: c.State == "running", } return dc, nil @@ -1498,7 +1543,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container) container.sessionID = core.SessionID() container.consumers = []LogConsumer{} - container.stopProducer = nil + container.stopLogProductionCh = nil container.isRunning = response.State == "running" // the termination signal should be obtained from the reaper diff --git a/docs/features/common_functional_options.md b/docs/features/common_functional_options.md index 82205cb5c2..3cc5d902aa 100644 --- a/docs/features/common_functional_options.md +++ b/docs/features/common_functional_options.md @@ -15,6 +15,24 @@ _Testcontainers for Go_ exposes an interface to perform this operations: `ImageS Using the `WithImageSubstitutors` options, you could define your own substitutions to the container images. E.g. adding a prefix to the images so that they can be pulled from a Docker registry other than Docker Hub. This is the usual mechanism for using Docker image proxies, caches, etc. +#### WithLogConsumers + +- Not available until the next release of testcontainers-go :material-tag: main + +If you need to consume the logs of the container, you can use `testcontainers.WithLogConsumers` with a valid log consumer. An example of a log consumer is the following: + +```golang +type TestLogConsumer struct { + Msgs []string +} + +func (g *TestLogConsumer) Accept(l Log) { + g.Msgs = append(g.Msgs, string(l.Content)) +} +``` + +Please read the [Following Container Logs](/features/follow_logs) documentation for more information about creating log consumers. + #### Wait Strategies If you need to set a different wait strategy for the container, you can use `testcontainers.WithWaitStrategy` with a valid wait strategy. diff --git a/docs/features/follow_logs.md b/docs/features/follow_logs.md index 4269e27162..6ba01cb734 100644 --- a/docs/features/follow_logs.md +++ b/docs/features/follow_logs.md @@ -1,30 +1,84 @@ # Following Container Logs -If you wish to follow container logs, you can set up `LogConsumer`s. The log -following functionality follows a producer-consumer model. As logs are written to either `stdout`, -or `stderr` (`stdin` is not supported) they will be forwarded (produced) to any -associated `LogConsumer`s. You can associate `LogConsumer`s with the -`.FollowOutput` function. +The log-following functionality follows a producer-consumer model: the container produces logs, and your code consumes them. +So if you wish to follow container logs, you have to do two things: -For example, this consumer will just add logs to a slice +1. set up log consumers. +2. configure the log production of the container (e.g. timeout for the logs). + +As logs are written to either `stdout`, or `stderr` (`stdin` is not supported) they will be forwarded (produced) to any associated log consumer. + +## Creating a LogConsumer + +A `LogConsumer` must implement the `LogConsumer` interface, and it could be as simple as directly printing the log to `stdout`, +as in the following example: + + +[The LogConsumer Interface](../../logconsumer.go) inside_block:logConsumerInterface +[The Log struct](../../logconsumer.go) inside_block:logStruct +[Example LogConsumer](../../testing.go) inside_block:exampleLogConsumer + + +You can associate `LogConsumer`s in two manners: + +1. as part of the `ContainerRequest` struct. +2. with the `FollowOutput` function (deprecated). + +## Passing the LogConsumers in the ContainerRequest + +This will represent the current way for associating `LogConsumer`s. You simply define your consumers, and attach them as a slice to the `ContainerRequest` in the +`LogConsumerCfg` field. See the following example, where `g` is an instance of a given `LogConsumer` struct. + + +[Passing LogConsumers](../../logconsumer_test.go) inside_block:logConsumersAtRequest + + +Please check that it's possible to configure the log production with an slice of functional options. These options must be of the `LogProductionOption` type: + +```go +type LogProductionOption func(*DockerContainer) +``` + +At the moment, _Testcontainers for Go_ exposes an option to set log production timeout, using the `WithLogProductionTimeout` function. + +_Testcontainers for Go_ will read this log producer/consumer configuration to automatically start producing logs if an only if the consumers slice contains at least one valid `LogConsumer`. + +## Manually using the FollowOutput function + +!!!warning + This method is not recommended, as it requires you to manually manage the `LogConsumer` lifecycle. + We recommend using the `ContainerRequest` struct to associate `LogConsumer`s, as it's the simplest and most straightforward method. + If you use both methods, you can get an error, as the `StartLogProducer` function could be called twice, which is not allowed. + + As a consequence, this lifecycle (`StartLogProducer`, `FollowOutput` and `StopLogProducer) will be **deprecated** in the future, delegating the control to the library. + +Instead of passing the `LogConsumer` as part of the `ContainerRequest` struct, you can manually call the `FollowOutput` function on a `Container` instance. +This allows you to dynamically add `LogConsumer`s to a running container, although it forces you to manually manage the `LogConsumer` lifecycle, +calling `StartLogProducer` **after** the `FollowOutput` function, and do it just once. + +You can define a log consumer like so: ```go type TestLogConsumer struct { - Msgs []string + Msgs []string // store the logs as a slice of strings } func (g *TestLogConsumer) Accept(l Log) { g.Msgs = append(g.Msgs, string(l.Content)) } ``` -This can be used like so: + +And then associate it with a container like so: + ```go g := TestLogConsumer{ Msgs: []string{}, } +// Remember that this method will be deprecated in the future c.FollowOutput(&g) // must be called before StarLogProducer +// Remember that this method will be deprecated in the future err := c.StartLogProducer(ctx) if err != nil { // do something with err @@ -32,58 +86,47 @@ if err != nil { // some stuff happens... +// Remember that this method will be deprecated in the future err = c.StopLogProducer() if err != nil { // do something with err } ``` -`LogProducer` is stopped in `c.Terminate()`. It can be done manually during container lifecycle -using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time. +## Stopping the Log Production -`StartLogProducer()` also accepts a functional parameter now used to set log producer timeout: -```golang -type LogProducerOption func(*DockerContainer) +The production of logs is automatically stopped in `c.Terminate()`, so you don't have to worry about that. -func WithLogProducerTimeout(timeout time.Duration) LogProducerOption { - return func(c *DockerContainer) { - c.producerTimeout = &timeout - } -} - -// usage -err := c.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second)) -if err != nil { - // do something with err -} -``` - -If no parameter is passed a default timeout of 5 seconds will be used. Values below 5 seconds and above 60 seconds will -be coerced to these boundary values. +!!! warning + It can be done manually during container lifecycle using `c.StopLogProducer()`, but it's not recommended, as it will be deprecated in the future. ## Listening to errors -When log producer fails to start within given timeout (causing a context deadline) or there's an error returned while closing the reader it will no longer panic, but instead will return an error over a channel. You can listen to it using `DockerContainer.GetLogProducerErrorChannel()` method: -```golang -func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error { +When the log production fails to start within given timeout (causing a context deadline) or there's an error returned while closing the reader it will no longer panic, but instead will return an error over a channel. You can listen to it using `DockerContainer.GetLogProductionErrorChannel()` method: + +```go +func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error { return c.producerError } ``` -This allows you to, for example, retry restarting log producer if it fails to start the first time. For example: +This allows you to, for example, retry restarting the log production if it fails to start the first time. -```golang -// start log producer normally -err = container.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second)) +For example, you would start the log production normally, defining the log production configuration at the `ContainerRequest` struct, and then: + +```go +// start log production normally, using the ContainerRequest struct, or +// using the deprecated c.StartLogProducer method. +// err = container.StartLogProducer(ctx, WithLogProductionTimeout(10*time.Second)) // listen to errors in a detached goroutine -go func(done chan struct{}, timeout time.Duration, retryLimit int) { +go func(done chan struct{}, timeout time.Duration) { for { select { - case logErr := <-container.GetLogProducerErrorChannel(): + case logErr := <-container.GetLogProductionErrorChannel(): if logErr != nil { // do something with error - // for example, retry starting log producer + // for example, retry starting the log production // (here we retry it once, in real life you might want to retry it more times) startErr := container.StartLogProducer(ctx, timeout) if startErr != nil { diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md index a9a47a8501..f4d93621e4 100644 --- a/docs/modules/pulsar.md +++ b/docs/modules/pulsar.md @@ -81,24 +81,6 @@ If you need to test Pulsar Transactions you can enable the transactions feature: [Create a Pulsar container with transactions](../../modules/pulsar/pulsar_test.go) inside_block:withTransactions -#### Log consumers -If you need to collect the logs from the Pulsar container, you can add your own LogConsumer with the `WithLogConsumers` function, which accepts a variadic argument of LogConsumers. - - -[Adding LogConsumers](../../modules/pulsar/pulsar_test.go) inside_block:withLogConsumers - - -An example of a LogConsumer could be the following: - - -[Example LogConsumer](../../modules/pulsar/pulsar_test.go) inside_block:logConsumerForTesting - - -!!!warning - You will need to explicitly stop the producer in your tests. - -If you want to know more about LogConsumers, please check the [Following Container Logs](../features/follow_logs.md) documentation. - ### Container methods Once you have a Pulsar container, then you can retrieve the broker and the admin url: diff --git a/logconsumer.go b/logconsumer.go index c5a2e29cf9..0b9b1bd025 100644 --- a/logconsumer.go +++ b/logconsumer.go @@ -6,6 +6,7 @@ const StdoutLog = "STDOUT" // StderrLog is the log type for STDERR const StderrLog = "STDERR" +// logStruct { // Log represents a message that was created by a process, // LogType is either "STDOUT" or "STDERR", // Content is the byte contents of the message itself @@ -14,9 +15,20 @@ type Log struct { Content []byte } +// } + +// logConsumerInterface { // LogConsumer represents any object that can // handle a Log, it is up to the LogConsumer instance // what to do with the log type LogConsumer interface { Accept(Log) } + +// } + +// LogConsumerConfig is a configuration object for the producer/consumer pattern +type LogConsumerConfig struct { + Opts []LogProductionOption // options for the production of logs + Consumers []LogConsumer // consumers for the logs +} diff --git a/logconsumer_test.go b/logconsumer_test.go index 8dc067da07..3416c73f79 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -54,6 +54,13 @@ func devNullAcceptorChan() chan string { func Test_LogConsumerGetsCalled(t *testing.T) { ctx := context.Background() + + g := TestLogConsumer{ + Msgs: []string{}, + Done: make(chan bool), + Accepted: devNullAcceptorChan(), + } + req := ContainerRequest{ FromDockerfile: FromDockerfile{ Context: "./testdata/", @@ -61,6 +68,9 @@ func Test_LogConsumerGetsCalled(t *testing.T) { }, ExposedPorts: []string{"8080/tcp"}, WaitingFor: wait.ForLog("ready"), + LogConsumerCfg: &LogConsumerConfig{ + Consumers: []LogConsumer{&g}, + }, } gReq := GenericContainerRequest{ @@ -74,17 +84,6 @@ func Test_LogConsumerGetsCalled(t *testing.T) { ep, err := c.Endpoint(ctx, "http") require.NoError(t, err) - g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), - Accepted: devNullAcceptorChan(), - } - - c.FollowOutput(&g) - - err = c.StartLogProducer(ctx) - require.NoError(t, err) - _, err = http.Get(ep + "/stdout?echo=hello") require.NoError(t, err) @@ -99,7 +98,7 @@ func Test_LogConsumerGetsCalled(t *testing.T) { case <-time.After(5 * time.Second): t.Fatal("never received final log message") } - require.NoError(t, c.StopLogProducer()) + assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) terminateContainerOnEnd(t, ctx, c) @@ -121,6 +120,12 @@ func (t *TestLogTypeConsumer) Accept(l Log) { func Test_ShouldRecognizeLogTypes(t *testing.T) { ctx := context.Background() + + g := TestLogTypeConsumer{ + LogTypes: map[string]string{}, + Ack: make(chan bool), + } + req := ContainerRequest{ FromDockerfile: FromDockerfile{ Context: "./testdata/", @@ -128,6 +133,9 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) { }, ExposedPorts: []string{"8080/tcp"}, WaitingFor: wait.ForLog("ready"), + LogConsumerCfg: &LogConsumerConfig{ + Consumers: []LogConsumer{&g}, + }, } gReq := GenericContainerRequest{ @@ -142,16 +150,6 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) { ep, err := c.Endpoint(ctx, "http") require.NoError(t, err) - g := TestLogTypeConsumer{ - LogTypes: map[string]string{}, - Ack: make(chan bool), - } - - c.FollowOutput(&g) - - err = c.StartLogProducer(ctx) - require.NoError(t, err) - _, err = http.Get(ep + "/stdout?echo=this-is-stdout") require.NoError(t, err) @@ -162,7 +160,6 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) { require.NoError(t, err) <-g.Ack - require.NoError(t, c.StopLogProducer()) assert.Equal(t, map[string]string{ StdoutLog: "echo this-is-stdout\n", @@ -172,25 +169,6 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) { func Test_MultipleLogConsumers(t *testing.T) { ctx := context.Background() - req := ContainerRequest{ - FromDockerfile: FromDockerfile{ - Context: "./testdata/", - Dockerfile: "echoserver.Dockerfile", - }, - ExposedPorts: []string{"8080/tcp"}, - WaitingFor: wait.ForLog("ready"), - } - - gReq := GenericContainerRequest{ - ContainerRequest: req, - Started: true, - } - - c, err := GenericContainer(ctx, gReq) - require.NoError(t, err) - - ep, err := c.Endpoint(ctx, "http") - require.NoError(t, err) first := TestLogConsumer{ Msgs: []string{}, @@ -203,29 +181,6 @@ func Test_MultipleLogConsumers(t *testing.T) { Accepted: devNullAcceptorChan(), } - c.FollowOutput(&first) - c.FollowOutput(&second) - - err = c.StartLogProducer(ctx) - require.NoError(t, err) - - _, err = http.Get(ep + "/stdout?echo=mlem") - require.NoError(t, err) - - _, err = http.Get(ep + "/stdout?echo=" + lastMessage) - require.NoError(t, err) - - <-first.Done - <-second.Done - require.NoError(t, c.StopLogProducer()) - - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs) - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs) - require.NoError(t, c.Terminate(ctx)) -} - -func Test_StartStop(t *testing.T) { - ctx := context.Background() req := ContainerRequest{ FromDockerfile: FromDockerfile{ Context: "./testdata/", @@ -233,6 +188,9 @@ func Test_StartStop(t *testing.T) { }, ExposedPorts: []string{"8080/tcp"}, WaitingFor: wait.ForLog("ready"), + LogConsumerCfg: &LogConsumerConfig{ + Consumers: []LogConsumer{&first, &second}, + }, } gReq := GenericContainerRequest{ @@ -246,47 +204,17 @@ func Test_StartStop(t *testing.T) { ep, err := c.Endpoint(ctx, "http") require.NoError(t, err) - g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), - Accepted: make(chan string), - } - c.FollowOutput(&g) - - require.NoError(t, c.StopLogProducer(), "nothing should happen even if the producer is not started") - - require.NoError(t, c.StartLogProducer(ctx)) - require.Equal(t, "ready\n", <-g.Accepted) - - require.Error(t, c.StartLogProducer(ctx), "log producer is already started") - _, err = http.Get(ep + "/stdout?echo=mlem") require.NoError(t, err) - require.Equal(t, "echo mlem\n", <-g.Accepted) - - require.NoError(t, c.StopLogProducer()) - - require.NoError(t, c.StartLogProducer(ctx)) - require.Equal(t, "ready\n", <-g.Accepted) - require.Equal(t, "echo mlem\n", <-g.Accepted) - - _, err = http.Get(ep + "/stdout?echo=mlem2") - require.NoError(t, err) - require.Equal(t, "echo mlem2\n", <-g.Accepted) _, err = http.Get(ep + "/stdout?echo=" + lastMessage) require.NoError(t, err) - <-g.Done - // Do not close producer here, let's delegate it to c.Terminate + <-first.Done + <-second.Done - assert.Equal(t, []string{ - "ready\n", - "echo mlem\n", - "ready\n", - "echo mlem\n", - "echo mlem2\n", - }, g.Msgs) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs) require.NoError(t, c.Terminate(ctx)) } @@ -360,7 +288,19 @@ func TestContainerLogWithErrClosed(t *testing.T) { }, } - nginx, err := provider.CreateContainer(ctx, ContainerRequest{Image: "nginx", ExposedPorts: []string{"80/tcp"}}) + consumer := TestLogConsumer{ + Msgs: []string{}, + Done: make(chan bool), + Accepted: devNullAcceptorChan(), + } + + nginx, err := provider.CreateContainer(ctx, ContainerRequest{ + Image: "nginx", + ExposedPorts: []string{"80/tcp"}, + LogConsumerCfg: &LogConsumerConfig{ + Consumers: []LogConsumer{&consumer}, + }, + }) if err != nil { t.Fatal(err) } @@ -374,20 +314,6 @@ func TestContainerLogWithErrClosed(t *testing.T) { t.Fatal(err) } - consumer := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), - Accepted: devNullAcceptorChan(), - } - - if err = nginx.StartLogProducer(ctx); err != nil { - t.Fatal(err) - } - defer func() { - _ = nginx.StopLogProducer() - }() - nginx.FollowOutput(&consumer) - // Gather the initial container logs time.Sleep(time.Second * 1) existingLogs := len(consumer.Msgs) @@ -413,11 +339,11 @@ func TestContainerLogWithErrClosed(t *testing.T) { // Simulate a transient closed connection to the docker daemon i, _, err := dind.Exec(ctx, append([]string{"iptables", "-A"}, iptableArgs...)) if err != nil || i > 0 { - t.Fatalf("Failed to close connection to dind daemon") + t.Fatalf("Failed to close connection to dind daemon: i(%d), err %v", i, err) } i, _, err = dind.Exec(ctx, append([]string{"iptables", "-D"}, iptableArgs...)) if err != nil || i > 0 { - t.Fatalf("Failed to re-open connection to dind daemon") + t.Fatalf("Failed to re-open connection to dind daemon: i(%d), err %v", i, err) } time.Sleep(time.Second * 3) @@ -459,8 +385,15 @@ func TestContainerLogsShouldBeWithoutStreamHeader(t *testing.T) { assert.Equal(t, "0", strings.TrimSpace(string(b))) } -func Test_StartLogProducerStillStartsWithTooLowTimeout(t *testing.T) { +func TestContainerLogsEnableAtStart(t *testing.T) { ctx := context.Background() + g := TestLogConsumer{ + Msgs: []string{}, + Done: make(chan bool), + Accepted: devNullAcceptorChan(), + } + + // logConsumersAtRequest { req := ContainerRequest{ FromDockerfile: FromDockerfile{ Context: "./testdata/", @@ -468,7 +401,12 @@ func Test_StartLogProducerStillStartsWithTooLowTimeout(t *testing.T) { }, ExposedPorts: []string{"8080/tcp"}, WaitingFor: wait.ForLog("ready"), + LogConsumerCfg: &LogConsumerConfig{ + Opts: []LogProductionOption{WithLogProductionTimeout(10 * time.Second)}, + Consumers: []LogConsumer{&g}, + }, } + // } gReq := GenericContainerRequest{ ContainerRequest: req, @@ -477,7 +415,31 @@ func Test_StartLogProducerStillStartsWithTooLowTimeout(t *testing.T) { c, err := GenericContainer(ctx, gReq) require.NoError(t, err) + + ep, err := c.Endpoint(ctx, "http") + require.NoError(t, err) + + _, err = http.Get(ep + "/stdout?echo=hello") + require.NoError(t, err) + + _, err = http.Get(ep + "/stdout?echo=there") + require.NoError(t, err) + + _, err = http.Get(ep + "/stdout?echo=" + lastMessage) + require.NoError(t, err) + + select { + case <-g.Done: + case <-time.After(10 * time.Second): + t.Fatal("never received final log message") + } + assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) + terminateContainerOnEnd(t, ctx, c) +} + +func Test_StartLogProductionStillStartsWithTooLowTimeout(t *testing.T) { + ctx := context.Background() g := TestLogConsumer{ Msgs: []string{}, @@ -485,14 +447,6 @@ func Test_StartLogProducerStillStartsWithTooLowTimeout(t *testing.T) { Accepted: devNullAcceptorChan(), } - c.FollowOutput(&g) - - err = c.StartLogProducer(ctx, WithLogProducerTimeout(4*time.Second)) - require.NoError(t, err, "should still start with too low timeout") -} - -func Test_StartLogProducerStillStartsWithTooHighTimeout(t *testing.T) { - ctx := context.Background() req := ContainerRequest{ FromDockerfile: FromDockerfile{ Context: "./testdata/", @@ -500,6 +454,10 @@ func Test_StartLogProducerStillStartsWithTooHighTimeout(t *testing.T) { }, ExposedPorts: []string{"8080/tcp"}, WaitingFor: wait.ForLog("ready"), + LogConsumerCfg: &LogConsumerConfig{ + Opts: []LogProductionOption{WithLogProductionTimeout(4 * time.Second)}, + Consumers: []LogConsumer{&g}, + }, } gReq := GenericContainerRequest{ @@ -510,6 +468,10 @@ func Test_StartLogProducerStillStartsWithTooHighTimeout(t *testing.T) { c, err := GenericContainer(ctx, gReq) require.NoError(t, err) terminateContainerOnEnd(t, ctx, c) +} + +func Test_StartLogProductionStillStartsWithTooHighTimeout(t *testing.T) { + ctx := context.Background() g := TestLogConsumer{ Msgs: []string{}, @@ -517,8 +479,32 @@ func Test_StartLogProducerStillStartsWithTooHighTimeout(t *testing.T) { Accepted: devNullAcceptorChan(), } - c.FollowOutput(&g) + req := ContainerRequest{ + FromDockerfile: FromDockerfile{ + Context: "./testdata/", + Dockerfile: "echoserver.Dockerfile", + }, + ExposedPorts: []string{"8080/tcp"}, + WaitingFor: wait.ForLog("ready"), + LogConsumerCfg: &LogConsumerConfig{ + Opts: []LogProductionOption{WithLogProductionTimeout(61 * time.Second)}, + Consumers: []LogConsumer{&g}, + }, + } + + gReq := GenericContainerRequest{ + ContainerRequest: req, + Started: true, + } + + c, err := GenericContainer(ctx, gReq) + require.NoError(t, err) + require.NotNil(t, c) - err = c.StartLogProducer(ctx, WithLogProducerTimeout(61*time.Second)) - require.NoError(t, err, "should still start with too high timeout") + // because the log production timeout is too high, the container should have already been terminated + // so no need to terminate it again with "terminateContainerOnEnd(t, ctx, c)" + dc := c.(*DockerContainer) + require.NoError(t, dc.stopLogProduction()) + + terminateContainerOnEnd(t, ctx, c) } diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 5044d8d3e2..f8ad59dfd9 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -32,7 +32,7 @@ var defaultWaitStrategies = wait.ForAll( type Container struct { testcontainers.Container - LogConsumers []testcontainers.LogConsumer // Needs to be exported to control the stop from the caller + LogConsumers []testcontainers.LogConsumer // Deprecated. Use the ContainerRequest instead. Needs to be exported to control the stop from the caller } func (c *Container) BrokerURL(ctx context.Context) (string, error) { @@ -84,6 +84,7 @@ func WithFunctionsWorker() testcontainers.CustomizeRequestOption { } } +// Deprecated: use the testcontainers.WithLogConsumers functional option instead // WithLogConsumers allows to add log consumers to the container. // They will be automatically started and they will follow the container logs, // but it's a responsibility of the caller to stop them calling StopLogProducer diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 64f0ca39bd..30ef07aa74 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -21,17 +21,6 @@ import ( tcnetwork "github.com/testcontainers/testcontainers-go/network" ) -// logConsumerForTesting { -// logConsumer is a testcontainers.LogConsumer that prints the log to stdout -type testLogConsumer struct{} - -// Accept prints the log to stdout -func (lc *testLogConsumer) Accept(l testcontainers.Log) { - fmt.Print(string(l.Content)) -} - -// } - func TestPulsar(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -42,9 +31,8 @@ func TestPulsar(t *testing.T) { nwName := nw.Name tests := []struct { - name string - opts []testcontainers.ContainerCustomizer - logConsumers []testcontainers.LogConsumer + name string + opts []testcontainers.ContainerCustomizer }{ { name: "default", @@ -90,8 +78,12 @@ func TestPulsar(t *testing.T) { }, }, { - name: "with log consumers", - logConsumers: []testcontainers.LogConsumer{&testLogConsumer{}}, + name: "with log consumers", + opts: []testcontainers.ContainerCustomizer{ + // withLogconsumers { + testcontainers.WithLogConsumers(&testcontainers.StdoutLogConsumer{}), + // } + }, }, } @@ -107,16 +99,6 @@ func TestPulsar(t *testing.T) { require.NoError(t, err) }() - // withLogConsumers { - if len(c.LogConsumers) > 0 { - c.WithLogConsumers(ctx, tt.logConsumers...) - defer func() { - // not handling the error because it will never return an error: it's satisfying the current API - _ = c.StopLogProducer() - }() - } - // } - // getBrokerURL { brokerURL, err := c.BrokerURL(ctx) // } diff --git a/options.go b/options.go index 573499d6ea..40bf671bcc 100644 --- a/options.go +++ b/options.go @@ -128,6 +128,17 @@ func WithImageSubstitutors(fn ...ImageSubstitutor) CustomizeRequestOption { } } +// WithLogConsumers sets the log consumers for a container +func WithLogConsumers(consumer ...LogConsumer) CustomizeRequestOption { + return func(req *GenericContainerRequest) { + if req.LogConsumerCfg == nil { + req.LogConsumerCfg = &LogConsumerConfig{} + } + + req.LogConsumerCfg.Consumers = consumer + } +} + // Executable represents an executable command to be sent to a container, including options, // as part of the different lifecycle hooks. type Executable interface { diff --git a/options_test.go b/options_test.go index 7dee492435..f402d42b7c 100644 --- a/options_test.go +++ b/options_test.go @@ -67,6 +67,40 @@ func TestOverrideContainerRequest(t *testing.T) { assert.Equal(t, wait.ForLog("foo"), req.WaitingFor) } +type msgsLogConsumer struct { + msgs []string +} + +// Accept prints the log to stdout +func (lc *msgsLogConsumer) Accept(l testcontainers.Log) { + lc.msgs = append(lc.msgs, string(l.Content)) +} + +func TestWithLogConsumers(t *testing.T) { + req := testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "mysql:8.0.36", + WaitingFor: wait.ForLog("port: 3306 MySQL Community Server - GPL"), + }, + Started: true, + } + + lc := &msgsLogConsumer{} + + testcontainers.WithLogConsumers(lc)(&req) + + c, err := testcontainers.GenericContainer(context.Background(), req) + // we expect an error because the MySQL environment variables are not set + // but this is expected because we just want to test the log consumer + require.Error(t, err) + defer func() { + err = c.Terminate(context.Background()) + require.NoError(t, err) + }() + + assert.NotEmpty(t, lc.msgs) +} + func TestWithStartupCommand(t *testing.T) { req := testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ diff --git a/testing.go b/testing.go index 5cba53e79f..6d23952952 100644 --- a/testing.go +++ b/testing.go @@ -2,6 +2,7 @@ package testcontainers import ( "context" + "fmt" "testing" ) @@ -38,3 +39,14 @@ func SkipIfDockerDesktop(t *testing.T, ctx context.Context) { t.Skip("Skipping test that requires host network access when running in Docker Desktop") } } + +// exampleLogConsumer { +// StdoutLogConsumer is a LogConsumer that prints the log to stdout +type StdoutLogConsumer struct{} + +// Accept prints the log to stdout +func (lc *StdoutLogConsumer) Accept(l Log) { + fmt.Print(string(l.Content)) +} + +// }