Skip to content

Commit

Permalink
feat(wait)!: enhance LogWaitStrategy to wait for message appearance…
Browse files Browse the repository at this point in the history
… multiple times (#683)

The interface has been changed a bit to support advanced configuration
of the strategy. For example, to wait for a message to appear twice in
`stdout`:

```rs
WaitFor::log(
    LogWaitStrategy::stdout("server is ready")
    .with_times(2),
)
```

Closes #675
  • Loading branch information
DDtKey committed Jul 7, 2024
1 parent 59831b0 commit b7c5bbf
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 49 deletions.
4 changes: 2 additions & 2 deletions testcontainers/src/core/containers/async_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ where
match cmd_ready_condition {
CmdWaitFor::StdOutMessage { message } => {
exec.stdout()
.wait_for_message(&message)
.wait_for_message(&message, 1)
.await
.map_err(ExecError::from)?;
}
CmdWaitFor::StdErrMessage { message } => {
exec.stderr()
.wait_for_message(&message)
.wait_for_message(&message, 1)
.await
.map_err(ExecError::from)?;
}
Expand Down
4 changes: 1 addition & 3 deletions testcontainers/src/core/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ where
/// up.
///
/// The conditions returned from this method are evaluated **in the order** they are returned. Therefore
/// you most likely want to start with a [`WaitFor::StdOutMessage`] or [`WaitFor::StdErrMessage`] and
/// potentially follow up with a [`WaitFor::Duration`] in case the container usually needs a little
/// more time before it is ready.
/// you most likely want to start with a [`WaitFor::Log`] or [`WaitFor::Http`].
fn ready_conditions(&self) -> Vec<WaitFor>;

/// Returns the environment variables that needs to be set when a container is created.
Expand Down
43 changes: 29 additions & 14 deletions testcontainers/src/core/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum WaitLogError {

#[derive(Copy, Clone, Debug, parse_display::Display)]
#[display(style = "lowercase")]
pub(crate) enum LogSource {
pub enum LogSource {
StdOut,
StdErr,
}
Expand All @@ -42,6 +42,13 @@ impl LogSource {
}

impl LogFrame {
pub fn source(&self) -> LogSource {
match self {
LogFrame::StdOut(_) => LogSource::StdOut,
LogFrame::StdErr(_) => LogSource::StdErr,
}
}

pub fn bytes(&self) -> &Bytes {
match self {
LogFrame::StdOut(bytes) => bytes,
Expand Down Expand Up @@ -74,23 +81,29 @@ impl WaitingStreamWrapper {
pub(crate) async fn wait_for_message(
&mut self,
message: impl AsRef<[u8]>,
times: usize,
) -> Result<(), WaitLogError> {
let msg_finder = Finder::new(message.as_ref());
let mut messages = Vec::new();
let mut found_times = 0;
while let Some(message) = self.inner.next().await.transpose()? {
messages.push(message.clone());
if self.enable_cache {
self.cache.push(Ok(message.clone()));
}
let match_found = msg_finder.find(message.as_ref()).is_some();
if match_found {
log::debug!("Found message after comparing {} lines", messages.len());
found_times += usize::from(match_found); // can't overflow, because of check below
if found_times == times {
log::debug!(
"Message found {times} times after comparing {} lines",
messages.len()
);
return Ok(());
}
}

log::warn!(
"Failed to find message '{}' after comparing {} lines.",
"Failed to find message '{}' {times} times after comparing {} lines.",
String::from_utf8_lossy(message.as_ref()),
messages.len()
);
Expand Down Expand Up @@ -121,15 +134,17 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn given_logs_when_line_contains_message_should_find_it() {
let mut log_stream = WaitingStreamWrapper::new(Box::pin(futures::stream::iter([Ok(r"
Message one
Message two
Message three
"
.into())])));

let result = log_stream.wait_for_message("Message three").await;

assert!(result.is_ok())
let mut log_stream = WaitingStreamWrapper::new(Box::pin(futures::stream::iter([
Ok("Message one".into()),
Ok("Message two".into()),
Ok("Message three".into()),
Ok("Message three".into()),
])));

let result = log_stream.wait_for_message("Message three", 2).await;
assert!(result.is_ok());

let result = log_stream.wait_for_message("Message two", 2).await;
assert!(result.is_err());
}
}
68 changes: 68 additions & 0 deletions testcontainers/src/core/wait/log_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use bytes::Bytes;

use crate::{
core::{
client::Client,
error::WaitContainerError,
logs::{LogSource, WaitingStreamWrapper},
wait::WaitStrategy,
},
ContainerAsync, Image,
};

#[derive(Debug, Clone)]
pub struct LogWaitStrategy {
source: LogSource,
message: Bytes,
times: usize,
}

impl LogWaitStrategy {
/// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard output logs.
/// Shortcut for `LogWaitStrategy::new(LogSource::StdOut, message)`.
pub fn stdout(message: impl AsRef<[u8]>) -> Self {
Self::new(LogSource::StdOut, message)
}

/// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard error logs.
/// Shortcut for `LogWaitStrategy::new(LogSource::StdErr, message)`.
pub fn stderr(message: impl AsRef<[u8]>) -> Self {
Self::new(LogSource::StdErr, message)
}

/// Create a new `LogWaitStrategy` with the given log source and message.
/// The message is expected to appear in the logs exactly once by default.
pub fn new(source: LogSource, message: impl AsRef<[u8]>) -> Self {
Self {
source,
message: Bytes::from(message.as_ref().to_vec()),
times: 1,
}
}

/// Set the number of times the message should appear in the logs.
pub fn with_times(mut self, times: usize) -> Self {
self.times = times;
self
}
}

impl WaitStrategy for LogWaitStrategy {
async fn wait_until_ready<I: Image>(
self,
client: &Client,
container: &ContainerAsync<I>,
) -> crate::core::error::Result<()> {
let log_stream = match self.source {
LogSource::StdOut => client.stdout_logs(container.id(), true),
LogSource::StdErr => client.stderr_logs(container.id(), true),
};

WaitingStreamWrapper::new(log_stream)
.wait_for_message(self.message, self.times)
.await
.map_err(WaitContainerError::from)?;

Ok(())
}
}
40 changes: 14 additions & 26 deletions testcontainers/src/core/wait/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::{env::var, fmt::Debug, time::Duration};

use bytes::Bytes;
pub use health_strategy::HealthWaitStrategy;
pub use http_strategy::HttpWaitStrategy;
pub use log_strategy::LogWaitStrategy;

use crate::{
core::{client::Client, error::WaitContainerError, logs::WaitingStreamWrapper},
core::{client::Client, logs::LogSource},
ContainerAsync, Image,
};

pub(crate) mod cmd_wait;
pub(crate) mod health_strategy;
pub(crate) mod http_strategy;
pub(crate) mod log_strategy;

pub(crate) trait WaitStrategy {
async fn wait_until_ready<I: Image>(
Expand All @@ -26,10 +27,8 @@ pub(crate) trait WaitStrategy {
pub enum WaitFor {
/// An empty condition. Useful for default cases or fallbacks.
Nothing,
/// Wait for a message on the stdout stream of the container's logs.
StdOutMessage { message: Bytes },
/// Wait for a message on the stderr stream of the container's logs.
StdErrMessage { message: Bytes },
/// Wait for a certain message to appear in the container's logs.
Log(LogWaitStrategy),
/// Wait for a certain amount of time.
Duration { length: Duration },
/// Wait for the container's status to become `healthy`.
Expand All @@ -41,16 +40,17 @@ pub enum WaitFor {
impl WaitFor {
/// Wait for the message to appear on the container's stdout.
pub fn message_on_stdout(message: impl AsRef<[u8]>) -> WaitFor {
WaitFor::StdOutMessage {
message: Bytes::from(message.as_ref().to_vec()),
}
Self::log(LogWaitStrategy::new(LogSource::StdOut, message))
}

/// Wait for the message to appear on the container's stderr.
pub fn message_on_stderr(message: impl AsRef<[u8]>) -> WaitFor {
WaitFor::StdErrMessage {
message: Bytes::from(message.as_ref().to_vec()),
}
Self::log(LogWaitStrategy::new(LogSource::StdOut, message))
}

/// Wait for the message to appear on the container's stdout.
pub fn log(log_strategy: LogWaitStrategy) -> WaitFor {
WaitFor::Log(log_strategy)
}

/// Wait for the container to become healthy.
Expand Down Expand Up @@ -114,18 +114,7 @@ impl WaitStrategy for WaitFor {
container: &ContainerAsync<I>,
) -> crate::core::error::Result<()> {
match self {
WaitFor::StdOutMessage { message } => {
WaitingStreamWrapper::new(client.stdout_logs(container.id(), true))
.wait_for_message(message)
.await
.map_err(WaitContainerError::from)?
}
WaitFor::StdErrMessage { message } => {
WaitingStreamWrapper::new(client.stderr_logs(container.id(), true))
.wait_for_message(message)
.await
.map_err(WaitContainerError::from)?
}
WaitFor::Log(strategy) => strategy.wait_until_ready(client, container).await?,
WaitFor::Duration { length } => {
tokio::time::sleep(length).await;
}
Expand All @@ -135,8 +124,7 @@ impl WaitStrategy for WaitFor {
WaitFor::Http(strategy) => {
strategy.wait_until_ready(client, container).await?;
}
// WaitFor::Nothing => {}
_ => {}
WaitFor::Nothing => {}
}
Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions testcontainers/tests/async_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reqwest::StatusCode;
use testcontainers::{
core::{
logs::{consumer::logging_consumer::LoggingConsumer, LogFrame},
wait::HttpWaitStrategy,
wait::{HttpWaitStrategy, LogWaitStrategy},
CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor,
},
runners::AsyncRunner,
Expand Down Expand Up @@ -98,7 +98,9 @@ async fn async_run_exec() -> anyhow::Result<()> {
let _ = pretty_env_logger::try_init();

let image = GenericImage::new("simple_web_server", "latest")
.with_wait_for(WaitFor::message_on_stdout("server is ready"))
.with_wait_for(WaitFor::log(
LogWaitStrategy::stdout("server is ready").with_times(2),
))
.with_wait_for(WaitFor::seconds(1));
let container = image.start().await?;

Expand Down
6 changes: 4 additions & 2 deletions testcontainers/tests/sync_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use reqwest::StatusCode;
use testcontainers::{
core::{
logs::{consumer::logging_consumer::LoggingConsumer, LogFrame},
wait::HttpWaitStrategy,
wait::{HttpWaitStrategy, LogWaitStrategy},
CmdWaitFor, ExecCommand, Host, IntoContainerPort, WaitFor,
},
runners::SyncRunner,
Expand Down Expand Up @@ -150,7 +150,9 @@ fn sync_run_exec() -> anyhow::Result<()> {
let _ = pretty_env_logger::try_init();

let image = GenericImage::new("simple_web_server", "latest")
.with_wait_for(WaitFor::message_on_stdout("server is ready"))
.with_wait_for(WaitFor::log(
LogWaitStrategy::stdout("server is ready").with_times(2),
))
.with_wait_for(WaitFor::seconds(1));
let container = image.start()?;

Expand Down
1 change: 1 addition & 0 deletions testimages/src/bin/simple_web_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async fn main() {
let addr = SocketAddr::from(([0, 0, 0, 0], 80));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
println!("server is ready");
println!("server is ready"); // duplicate line to test `times` parameter of `WaitFor::Log`
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal())
.await
Expand Down

0 comments on commit b7c5bbf

Please sign in to comment.