From d8826953735d7731fdba357e956bc7c84ea79c9d Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Sat, 18 Oct 2025 17:02:50 +0200 Subject: [PATCH 1/2] feat(core): Add RunFunctionWaitStrategy This is usefull for converting old wait_container_is_ready which use container.exec() to check for a condition to become True. --- core/testcontainers/core/wait_strategies.py | 39 +++++++++++++++ core/tests/test_wait_strategies.py | 55 +++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/core/testcontainers/core/wait_strategies.py b/core/testcontainers/core/wait_strategies.py index cac2a2ef..d93b1703 100644 --- a/core/testcontainers/core/wait_strategies.py +++ b/core/testcontainers/core/wait_strategies.py @@ -703,6 +703,44 @@ def _get_status_compose_container(container: DockerCompose) -> str: raise NotImplementedError +class RunFunctionWaitStrategy(WaitStrategy): + """Runs a functions and waits until it succeeds. + + The function must take a single argument, the WaitStrategyTarget (= DockerContainer) + (use a lambda to capture other arguments) and must return a Boolean or raise an Exception. + + Args: + func: The function to run. It must return True when the wait is over. + """ + + def __init__( + self, + func: Callable[[WaitStrategyTarget], bool], + ): + super().__init__() + self.func = func + + def wait_until_ready(self, container: WaitStrategyTarget) -> Any: + start_time = time.time() + last_exception = None + while True: + try: + result = self.func(container) + if result: + return result + except tuple(self._transient_exceptions) as e: + logger.debug(f"Check attempt failed: {e!s}") + last_exception = str(e) + if time.time() - start_time > self._startup_timeout: + raise TimeoutError( + f"Wait time ({self._startup_timeout}s) exceeded for {self.func.__name__}" + f"Exception: {last_exception}. " + f"Hint: Check if the container is ready, " + f"and the expected conditions are met for the function to succeed." + ) + time.sleep(self._poll_interval) + + class CompositeWaitStrategy(WaitStrategy): """ Wait for multiple conditions to be satisfied in sequence. @@ -787,6 +825,7 @@ def wait_until_ready(self, container: WaitStrategyTarget) -> None: "HttpWaitStrategy", "LogMessageWaitStrategy", "PortWaitStrategy", + "RunFunctionWaitStrategy", "WaitStrategy", "WaitStrategyTarget", ] diff --git a/core/tests/test_wait_strategies.py b/core/tests/test_wait_strategies.py index da62f1fb..1eb198c8 100644 --- a/core/tests/test_wait_strategies.py +++ b/core/tests/test_wait_strategies.py @@ -15,6 +15,7 @@ LogMessageWaitStrategy, PortWaitStrategy, WaitStrategy, + RunFunctionWaitStrategy, ) @@ -550,6 +551,60 @@ def test_wait_until_ready(self, mock_sleep, mock_time, mock_is_file, file_exists strategy.wait_until_ready(mock_container) +class TestRunFunctionWaitStrategy: + """Test the RunFunctionWaitStrategy class.""" + + def test_run_function_wait_strategy_initialization(self): + func = lambda x: True + strategy = RunFunctionWaitStrategy(func) + assert strategy.func == func + + def test_run_function_wait_strategy_wait_until_ready(self): + returns = [False, False, True] + mock_container = object() + + def func(target) -> bool: + assert target is mock_container + return returns.pop(0) + + strategy = RunFunctionWaitStrategy(func).with_poll_interval(0) + strategy.wait_until_ready(mock_container) # type: ignore[arg-type] + + def test_run_function_wait_strategy_wait_until_ready_with_unknown_exception(self): + mock_container = object() + + def func(target) -> bool: + assert target is mock_container + raise RuntimeError("Unknown error, abort!") + + strategy = RunFunctionWaitStrategy(func).with_poll_interval(0) + with pytest.raises(RuntimeError, match="Unknown error, abort!"): + strategy.wait_until_ready(mock_container) # type: ignore[arg-type] + + @pytest.mark.parametrize("transient_exception", [ConnectionError, NotImplementedError]) + def test_run_function_wait_strategy_wait_until_ready_with_transient_exception(self, transient_exception): + mock_container = object() + returns = [False, False, True] + + def func(target) -> bool: + assert target is mock_container + if returns.pop(0): + return True + raise transient_exception("Go on") + + # ConnectionError should be in the default transient exceptions, but NotImplementedError ist not + strategy = ( + RunFunctionWaitStrategy(func).with_poll_interval(0.001).with_transient_exceptions(NotImplementedError) + ) + strategy.wait_until_ready(mock_container) # type: ignore[arg-type] + + def test_run_function_wait_strategy_wait_until_ready_with_timeout(self): + mock_container = object() + strategy = RunFunctionWaitStrategy(lambda x: False).with_poll_interval(0).with_startup_timeout(0) + with pytest.raises(TimeoutError, match=r"Wait time (.*) exceeded for"): + strategy.wait_until_ready(mock_container) # type: ignore[arg-type] + + class TestCompositeWaitStrategy: """Test the CompositeWaitStrategy class.""" From 531934a2574d344e08104c7911b373f0fab02641 Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Sat, 18 Oct 2025 17:05:12 +0200 Subject: [PATCH 2/2] fix(postgres): Convert postgres to new RunFunctionWaitStrategy This gets rid of a annoying depcreation warning. --- .../postgres/testcontainers/postgres/__init__.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/modules/postgres/testcontainers/postgres/__init__.py b/modules/postgres/testcontainers/postgres/__init__.py index bde21d5b..b92b259c 100644 --- a/modules/postgres/testcontainers/postgres/__init__.py +++ b/modules/postgres/testcontainers/postgres/__init__.py @@ -15,7 +15,8 @@ from testcontainers.core.generic import DbContainer from testcontainers.core.utils import raise_for_deprecated_parameter -from testcontainers.core.waiting_utils import wait_container_is_ready +from testcontainers.core.wait_strategies import RunFunctionWaitStrategy +from testcontainers.core.waiting_utils import WaitStrategyTarget _UNSET = object() @@ -64,6 +65,7 @@ def __init__( self.driver = f"+{driver}" if driver else "" self.with_exposed_ports(self.port) + self.waiting_for(RunFunctionWaitStrategy(_check_postgres_ready)) def _configure(self) -> None: self.with_env("POSTGRES_USER", self.username) @@ -87,7 +89,6 @@ def get_connection_url(self, host: Optional[str] = None, driver: Optional[str] = port=self.port, ) - @wait_container_is_ready() def _connect(self) -> None: escaped_single_password = self.password.replace("'", "'\"'\"'") result = self.exec( @@ -99,3 +100,11 @@ def _connect(self) -> None: ) if result.exit_code: raise ConnectionError("pg_isready is not ready yet") + + +def _check_postgres_ready(container: WaitStrategyTarget) -> bool: + if not isinstance(container, PostgresContainer): + raise AssertionError("This check can only wait for postgres containers to start up") + # This raises a ConnectionError if not ready yet + container._connect() + return True