diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs index 3b497219..267df147 100644 --- a/testcontainers/src/core/containers/async_container.rs +++ b/testcontainers/src/core/containers/async_container.rs @@ -204,13 +204,13 @@ where match cmd_ready_condition { CmdWaitFor::StdOutMessage { message } => { exec.stdout() - .wait_for_message(&message) + .wait_for_message(&message, 1) .await .map_err(ExecError::from)?; } CmdWaitFor::StdErrMessage { message } => { exec.stderr() - .wait_for_message(&message) + .wait_for_message(&message, 1) .await .map_err(ExecError::from)?; } diff --git a/testcontainers/src/core/image.rs b/testcontainers/src/core/image.rs index 95e68d92..e0235761 100644 --- a/testcontainers/src/core/image.rs +++ b/testcontainers/src/core/image.rs @@ -39,9 +39,7 @@ where /// up. /// /// The conditions returned from this method are evaluated **in the order** they are returned. Therefore - /// you most likely want to start with a [`WaitFor::StdOutMessage`] or [`WaitFor::StdErrMessage`] and - /// potentially follow up with a [`WaitFor::Duration`] in case the container usually needs a little - /// more time before it is ready. + /// you most likely want to start with a [`WaitFor::Log`] or [`WaitFor::Http`]. fn ready_conditions(&self) -> Vec; /// Returns the environment variables that needs to be set when a container is created. diff --git a/testcontainers/src/core/logs.rs b/testcontainers/src/core/logs.rs index 611f7221..769f58e2 100644 --- a/testcontainers/src/core/logs.rs +++ b/testcontainers/src/core/logs.rs @@ -26,7 +26,7 @@ pub enum WaitLogError { #[derive(Copy, Clone, Debug, parse_display::Display)] #[display(style = "lowercase")] -pub(crate) enum LogSource { +pub enum LogSource { StdOut, StdErr, } @@ -42,6 +42,13 @@ impl LogSource { } impl LogFrame { + pub fn source(&self) -> LogSource { + match self { + LogFrame::StdOut(_) => LogSource::StdOut, + LogFrame::StdErr(_) => LogSource::StdErr, + } + } + pub fn bytes(&self) -> &Bytes { match self { LogFrame::StdOut(bytes) => bytes, @@ -74,23 +81,29 @@ impl WaitingStreamWrapper { pub(crate) async fn wait_for_message( &mut self, message: impl AsRef<[u8]>, + times: usize, ) -> Result<(), WaitLogError> { let msg_finder = Finder::new(message.as_ref()); let mut messages = Vec::new(); + let mut found_times = 0; while let Some(message) = self.inner.next().await.transpose()? { messages.push(message.clone()); if self.enable_cache { self.cache.push(Ok(message.clone())); } let match_found = msg_finder.find(message.as_ref()).is_some(); - if match_found { - log::debug!("Found message after comparing {} lines", messages.len()); + found_times += usize::from(match_found); // can't overflow, because of check below + if found_times == times { + log::debug!( + "Message found {times} times after comparing {} lines", + messages.len() + ); return Ok(()); } } log::warn!( - "Failed to find message '{}' after comparing {} lines.", + "Failed to find message '{}' {times} times after comparing {} lines.", String::from_utf8_lossy(message.as_ref()), messages.len() ); @@ -121,15 +134,17 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn given_logs_when_line_contains_message_should_find_it() { - let mut log_stream = WaitingStreamWrapper::new(Box::pin(futures::stream::iter([Ok(r" - Message one - Message two - Message three - " - .into())]))); - - let result = log_stream.wait_for_message("Message three").await; - - assert!(result.is_ok()) + let mut log_stream = WaitingStreamWrapper::new(Box::pin(futures::stream::iter([ + Ok("Message one".into()), + Ok("Message two".into()), + Ok("Message three".into()), + Ok("Message three".into()), + ]))); + + let result = log_stream.wait_for_message("Message three", 2).await; + assert!(result.is_ok()); + + let result = log_stream.wait_for_message("Message two", 2).await; + assert!(result.is_err()); } } diff --git a/testcontainers/src/core/wait/log_strategy.rs b/testcontainers/src/core/wait/log_strategy.rs new file mode 100644 index 00000000..6347bdbf --- /dev/null +++ b/testcontainers/src/core/wait/log_strategy.rs @@ -0,0 +1,68 @@ +use bytes::Bytes; + +use crate::{ + core::{ + client::Client, + error::WaitContainerError, + logs::{LogSource, WaitingStreamWrapper}, + wait::WaitStrategy, + }, + ContainerAsync, Image, +}; + +#[derive(Debug, Clone)] +pub struct LogWaitStrategy { + source: LogSource, + message: Bytes, + times: usize, +} + +impl LogWaitStrategy { + /// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard output logs. + /// Shortcut for `LogWaitStrategy::new(LogSource::StdOut, message)`. + pub fn stdout(message: impl AsRef<[u8]>) -> Self { + Self::new(LogSource::StdOut, message) + } + + /// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard error logs. + /// Shortcut for `LogWaitStrategy::new(LogSource::StdErr, message)`. + pub fn stderr(message: impl AsRef<[u8]>) -> Self { + Self::new(LogSource::StdErr, message) + } + + /// Create a new `LogWaitStrategy` with the given log source and message. + /// The message is expected to appear in the logs exactly once by default. + pub fn new(source: LogSource, message: impl AsRef<[u8]>) -> Self { + Self { + source, + message: Bytes::from(message.as_ref().to_vec()), + times: 1, + } + } + + /// Set the number of times the message should appear in the logs. + pub fn with_times(mut self, times: usize) -> Self { + self.times = times; + self + } +} + +impl WaitStrategy for LogWaitStrategy { + async fn wait_until_ready( + self, + client: &Client, + container: &ContainerAsync, + ) -> crate::core::error::Result<()> { + let log_stream = match self.source { + LogSource::StdOut => client.stdout_logs(container.id(), true), + LogSource::StdErr => client.stderr_logs(container.id(), true), + }; + + WaitingStreamWrapper::new(log_stream) + .wait_for_message(self.message, self.times) + .await + .map_err(WaitContainerError::from)?; + + Ok(()) + } +} diff --git a/testcontainers/src/core/wait/mod.rs b/testcontainers/src/core/wait/mod.rs index 9d081ca8..603b17e9 100644 --- a/testcontainers/src/core/wait/mod.rs +++ b/testcontainers/src/core/wait/mod.rs @@ -1,17 +1,18 @@ use std::{env::var, fmt::Debug, time::Duration}; -use bytes::Bytes; pub use health_strategy::HealthWaitStrategy; pub use http_strategy::HttpWaitStrategy; +pub use log_strategy::LogWaitStrategy; use crate::{ - core::{client::Client, error::WaitContainerError, logs::WaitingStreamWrapper}, + core::{client::Client, logs::LogSource}, ContainerAsync, Image, }; pub(crate) mod cmd_wait; pub(crate) mod health_strategy; pub(crate) mod http_strategy; +pub(crate) mod log_strategy; pub(crate) trait WaitStrategy { async fn wait_until_ready( @@ -26,10 +27,8 @@ pub(crate) trait WaitStrategy { pub enum WaitFor { /// An empty condition. Useful for default cases or fallbacks. Nothing, - /// Wait for a message on the stdout stream of the container's logs. - StdOutMessage { message: Bytes }, - /// Wait for a message on the stderr stream of the container's logs. - StdErrMessage { message: Bytes }, + /// Wait for a certain message to appear in the container's logs. + Log(LogWaitStrategy), /// Wait for a certain amount of time. Duration { length: Duration }, /// Wait for the container's status to become `healthy`. @@ -41,16 +40,17 @@ pub enum WaitFor { impl WaitFor { /// Wait for the message to appear on the container's stdout. pub fn message_on_stdout(message: impl AsRef<[u8]>) -> WaitFor { - WaitFor::StdOutMessage { - message: Bytes::from(message.as_ref().to_vec()), - } + Self::log(LogWaitStrategy::new(LogSource::StdOut, message)) } /// Wait for the message to appear on the container's stderr. pub fn message_on_stderr(message: impl AsRef<[u8]>) -> WaitFor { - WaitFor::StdErrMessage { - message: Bytes::from(message.as_ref().to_vec()), - } + Self::log(LogWaitStrategy::new(LogSource::StdOut, message)) + } + + /// Wait for the message to appear on the container's stdout. + pub fn log(log_strategy: LogWaitStrategy) -> WaitFor { + WaitFor::Log(log_strategy) } /// Wait for the container to become healthy. @@ -114,18 +114,7 @@ impl WaitStrategy for WaitFor { container: &ContainerAsync, ) -> crate::core::error::Result<()> { match self { - WaitFor::StdOutMessage { message } => { - WaitingStreamWrapper::new(client.stdout_logs(container.id(), true)) - .wait_for_message(message) - .await - .map_err(WaitContainerError::from)? - } - WaitFor::StdErrMessage { message } => { - WaitingStreamWrapper::new(client.stderr_logs(container.id(), true)) - .wait_for_message(message) - .await - .map_err(WaitContainerError::from)? - } + WaitFor::Log(strategy) => strategy.wait_until_ready(client, container).await?, WaitFor::Duration { length } => { tokio::time::sleep(length).await; } @@ -135,8 +124,7 @@ impl WaitStrategy for WaitFor { WaitFor::Http(strategy) => { strategy.wait_until_ready(client, container).await?; } - // WaitFor::Nothing => {} - _ => {} + WaitFor::Nothing => {} } Ok(()) } diff --git a/testcontainers/tests/async_runner.rs b/testcontainers/tests/async_runner.rs index 3c52f782..9ff13b1f 100644 --- a/testcontainers/tests/async_runner.rs +++ b/testcontainers/tests/async_runner.rs @@ -5,7 +5,7 @@ use reqwest::StatusCode; use testcontainers::{ core::{ logs::{consumer::logging_consumer::LoggingConsumer, LogFrame}, - wait::HttpWaitStrategy, + wait::{HttpWaitStrategy, LogWaitStrategy}, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor, }, runners::AsyncRunner, @@ -98,7 +98,9 @@ async fn async_run_exec() -> anyhow::Result<()> { let _ = pretty_env_logger::try_init(); let image = GenericImage::new("simple_web_server", "latest") - .with_wait_for(WaitFor::message_on_stdout("server is ready")) + .with_wait_for(WaitFor::log( + LogWaitStrategy::stdout("server is ready").with_times(2), + )) .with_wait_for(WaitFor::seconds(1)); let container = image.start().await?; diff --git a/testcontainers/tests/sync_runner.rs b/testcontainers/tests/sync_runner.rs index b882b4fb..75b71d8b 100644 --- a/testcontainers/tests/sync_runner.rs +++ b/testcontainers/tests/sync_runner.rs @@ -4,7 +4,7 @@ use reqwest::StatusCode; use testcontainers::{ core::{ logs::{consumer::logging_consumer::LoggingConsumer, LogFrame}, - wait::HttpWaitStrategy, + wait::{HttpWaitStrategy, LogWaitStrategy}, CmdWaitFor, ExecCommand, Host, IntoContainerPort, WaitFor, }, runners::SyncRunner, @@ -150,7 +150,9 @@ fn sync_run_exec() -> anyhow::Result<()> { let _ = pretty_env_logger::try_init(); let image = GenericImage::new("simple_web_server", "latest") - .with_wait_for(WaitFor::message_on_stdout("server is ready")) + .with_wait_for(WaitFor::log( + LogWaitStrategy::stdout("server is ready").with_times(2), + )) .with_wait_for(WaitFor::seconds(1)); let container = image.start()?; diff --git a/testimages/src/bin/simple_web_server.rs b/testimages/src/bin/simple_web_server.rs index f004a433..7127e6c2 100644 --- a/testimages/src/bin/simple_web_server.rs +++ b/testimages/src/bin/simple_web_server.rs @@ -13,6 +13,7 @@ async fn main() { let addr = SocketAddr::from(([0, 0, 0, 0], 80)); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); println!("server is ready"); + println!("server is ready"); // duplicate line to test `times` parameter of `WaitFor::Log` axum::serve(listener, app.into_make_service()) .with_graceful_shutdown(shutdown_signal()) .await