diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index f8171b42a200e..a70aaf9dc1140 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -271,7 +271,7 @@ jobs: export LINKERD2_VERSION=stable-2.11.4 curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s.py ./tests/k8s/test_graceful_request_handling.py - timeout-minutes: 30 + timeout-minutes: 45 env: JINA_K8S_USE_TEST_PIP: 1 - name: Check codecov file @@ -307,13 +307,13 @@ jobs: export JINA_LOG_LEVEL="ERROR" env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Test k8s + - name: Test k8s failures run: | export LINKERD2_VERSION=stable-2.11.4 curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh curl --proto '=https' --tlsv1.2 -sSfL https://linkerd.github.io/linkerd-smi/install | sh pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_failures.py - timeout-minutes: 30 + timeout-minutes: 45 env: JINA_K8S_USE_TEST_PIP: 1 - name: Check codecov file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd0b3d88e034e..e8963715b8459 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -194,7 +194,7 @@ jobs: export LINKERD2_VERSION=stable-2.11.4 curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s.py ./tests/k8s/test_graceful_request_handling.py - timeout-minutes: 30 + timeout-minutes: 45 env: JINA_K8S_USE_TEST_PIP: 1 - name: Check codecov file @@ -230,13 +230,13 @@ jobs: export JINA_LOG_LEVEL="ERROR" env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Test k8s + - name: Test k8s failures run: | export LINKERD2_VERSION=stable-2.11.4 curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh curl --proto '=https' --tlsv1.2 -sSfL https://linkerd.github.io/linkerd-smi/install | sh pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_failures.py - timeout-minutes: 30 + timeout-minutes: 45 env: JINA_K8S_USE_TEST_PIP: 1 - name: Check codecov file diff --git a/extra-requirements.txt b/extra-requirements.txt index f21a2b8e06d84..6326801caec69 100644 --- a/extra-requirements.txt +++ b/extra-requirements.txt @@ -27,9 +27,9 @@ numpy: core protobuf>=3.20.0: core -grpcio>=1.46.0,<1.48.1: core -grpcio-reflection>=1.46.0,<1.48.1: core -grpcio-health-checking>=1.46.0,<1.48.1: core +grpcio>=1.46.0,<=1.47.0: core +grpcio-reflection>=1.46.0,<=1.47.0: core +grpcio-health-checking>=1.46.0,<=1.47.0: core pyyaml>=5.3.1: core packaging>=20.0: core docarray>=0.16.4: core diff --git a/jina/resources/extra-requirements.txt b/jina/resources/extra-requirements.txt index f21a2b8e06d84..6326801caec69 100644 --- a/jina/resources/extra-requirements.txt +++ b/jina/resources/extra-requirements.txt @@ -27,9 +27,9 @@ numpy: core protobuf>=3.20.0: core -grpcio>=1.46.0,<1.48.1: core -grpcio-reflection>=1.46.0,<1.48.1: core -grpcio-health-checking>=1.46.0,<1.48.1: core +grpcio>=1.46.0,<=1.47.0: core +grpcio-reflection>=1.46.0,<=1.47.0: core +grpcio-health-checking>=1.46.0,<=1.47.0: core pyyaml>=5.3.1: core packaging>=20.0: core docarray>=0.16.4: core diff --git a/jina/serve/executors/__init__.py b/jina/serve/executors/__init__.py index 8cc9f2d24441a..c09a25170c21c 100644 --- a/jina/serve/executors/__init__.py +++ b/jina/serve/executors/__init__.py @@ -2,9 +2,12 @@ import copy import inspect import multiprocessing +import functools import os +import asyncio import threading import warnings +import contextlib from types import SimpleNamespace from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union @@ -21,6 +24,7 @@ from jina.serve.executors.metas import get_executor_taboo from jina.serve.helper import store_init_kwargs, wrap_func from jina.serve.instrumentation import MetricsTimer +from jina.helper import get_or_reuse_loop if TYPE_CHECKING: # pragma: no cover from opentelemetry.context.context import Context @@ -60,7 +64,7 @@ def register_class(cls): arg_spec = inspect.getfullargspec(cls.__init__) if not arg_spec.varkw and not __args_executor_init__.issubset( - arg_spec.args + arg_spec.args ): raise TypeError( f'{cls.__init__} does not follow the full signature of `Executor.__init__`, ' @@ -120,12 +124,12 @@ def __init__(awesomeness=5): """ def __init__( - self, - metas: Optional[Dict] = None, - requests: Optional[Dict] = None, - runtime_args: Optional[Dict] = None, - workspace: Optional[str] = None, - **kwargs, + self, + metas: Optional[Dict] = None, + requests: Optional[Dict] = None, + runtime_args: Optional[Dict] = None, + workspace: Optional[str] = None, + **kwargs, ): """`metas` and `requests` are always auto-filled with values from YAML config. @@ -152,17 +156,22 @@ def __init__( if type(self) == BaseExecutor: self.requests[__default_endpoint__] = self._dry_run_func + try: + self._lock = asyncio.Lock() # Lock to run in Executor non async methods in a way that does not block the event loop to do health checks without the fear of having race conditions or multithreading issues. + except RuntimeError: + self._lock = contextlib.AsyncExitStack() + def _dry_run_func(self, *args, **kwargs): pass def _init_monitoring(self): if ( - hasattr(self.runtime_args, 'metrics_registry') - and self.runtime_args.metrics_registry + hasattr(self.runtime_args, 'metrics_registry') + and self.runtime_args.metrics_registry ): with ImportExtensions( - required=True, - help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina', + required=True, + help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina', ): from prometheus_client import Summary @@ -293,7 +302,7 @@ def _add_metas(self, _metas: Optional[Dict]): if not hasattr(target, k): if isinstance(v, str): if not ( - env_var_regex.findall(v) or internal_var_regex.findall(v) + env_var_regex.findall(v) or internal_var_regex.findall(v) ): setattr(target, k, v) else: @@ -345,18 +354,21 @@ async def __acall__(self, req_endpoint: str, **kwargs): return await self.__acall_endpoint__(__default_endpoint__, **kwargs) async def __acall_endpoint__( - self, req_endpoint, tracing_context: Optional['Context'], **kwargs + self, req_endpoint, tracing_context: Optional['Context'], **kwargs ): + func = self.requests[req_endpoint] + async def exec_func( - summary, histogram, histogram_metric_labels, tracing_context + summary, histogram, histogram_metric_labels, tracing_context ): with MetricsTimer(summary, histogram, histogram_metric_labels): if iscoroutinefunction(func): return await func(self, tracing_context=tracing_context, **kwargs) else: - return func(self, tracing_context=tracing_context, **kwargs) - - func = self.requests[req_endpoint] + async with self._lock: + return await get_or_reuse_loop().run_in_executor(None, functools.partial(func, self, + tracing_context=tracing_context, + **kwargs)) runtime_name = ( self.runtime_args.name if hasattr(self.runtime_args, 'name') else None @@ -377,7 +389,7 @@ async def exec_func( if self.tracer: with self.tracer.start_as_current_span( - req_endpoint, context=tracing_context + req_endpoint, context=tracing_context ): from opentelemetry.propagate import extract from opentelemetry.trace.propagation.tracecontext import ( @@ -408,10 +420,10 @@ def workspace(self) -> Optional[str]: :return: returns the workspace of the current shard of this Executor. """ workspace = ( - getattr(self.runtime_args, 'workspace', None) - or getattr(self.metas, 'workspace') - or self._init_workspace - or __cache_path__ + getattr(self.runtime_args, 'workspace', None) + or getattr(self.metas, 'workspace') + or self._init_workspace + or __cache_path__ ) if workspace: complete_workspace = os.path.join(workspace, self.metas.name) @@ -434,13 +446,13 @@ def __exit__(self, exc_type, exc_val, exc_tb): @classmethod def from_hub( - cls: Type[T], - uri: str, - context: Optional[Dict[str, Any]] = None, - uses_with: Optional[Dict] = None, - uses_metas: Optional[Dict] = None, - uses_requests: Optional[Dict] = None, - **kwargs, + cls: Type[T], + uri: str, + context: Optional[Dict[str, Any]] = None, + uses_with: Optional[Dict] = None, + uses_metas: Optional[Dict] = None, + uses_requests: Optional[Dict] = None, + **kwargs, ) -> T: """Construct an Executor from Hub. @@ -492,12 +504,12 @@ def from_hub( @classmethod def serve( - cls, - uses_with: Optional[Dict] = None, - uses_metas: Optional[Dict] = None, - uses_requests: Optional[Dict] = None, - stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None, - **kwargs, + cls, + uses_with: Optional[Dict] = None, + uses_metas: Optional[Dict] = None, + uses_requests: Optional[Dict] = None, + stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None, + **kwargs, ): """Serve this Executor in a temporary Flow. Useful in testing an Executor in remote settings. @@ -530,16 +542,16 @@ class StandaloneExecutorType(BetterEnum): @staticmethod def to_kubernetes_yaml( - uses: str, - output_base_path: str, - k8s_namespace: Optional[str] = None, - executor_type: Optional[ - StandaloneExecutorType - ] = StandaloneExecutorType.EXTERNAL, - uses_with: Optional[Dict] = None, - uses_metas: Optional[Dict] = None, - uses_requests: Optional[Dict] = None, - **kwargs, + uses: str, + output_base_path: str, + k8s_namespace: Optional[str] = None, + executor_type: Optional[ + StandaloneExecutorType + ] = StandaloneExecutorType.EXTERNAL, + uses_with: Optional[Dict] = None, + uses_metas: Optional[Dict] = None, + uses_requests: Optional[Dict] = None, + **kwargs, ): """ Converts the Executor into a set of yaml deployments to deploy in Kubernetes. @@ -567,23 +579,23 @@ def to_kubernetes_yaml( output_base_path=output_base_path, k8s_namespace=k8s_namespace, include_gateway=executor_type - == BaseExecutor.StandaloneExecutorType.EXTERNAL, + == BaseExecutor.StandaloneExecutorType.EXTERNAL, ) to_k8s_yaml = to_kubernetes_yaml @staticmethod def to_docker_compose_yaml( - uses: str, - output_path: Optional[str] = None, - network_name: Optional[str] = None, - executor_type: Optional[ - StandaloneExecutorType - ] = StandaloneExecutorType.EXTERNAL, - uses_with: Optional[Dict] = None, - uses_metas: Optional[Dict] = None, - uses_requests: Optional[Dict] = None, - **kwargs, + uses: str, + output_path: Optional[str] = None, + network_name: Optional[str] = None, + executor_type: Optional[ + StandaloneExecutorType + ] = StandaloneExecutorType.EXTERNAL, + uses_with: Optional[Dict] = None, + uses_metas: Optional[Dict] = None, + uses_requests: Optional[Dict] = None, + **kwargs, ): """ Converts the Executor into a yaml file to run with `docker-compose up` @@ -608,11 +620,11 @@ def to_docker_compose_yaml( output_path=output_path, network_name=network_name, include_gateway=executor_type - == BaseExecutor.StandaloneExecutorType.EXTERNAL, + == BaseExecutor.StandaloneExecutorType.EXTERNAL, ) def monitor( - self, name: Optional[str] = None, documentation: Optional[str] = None + self, name: Optional[str] = None, documentation: Optional[str] = None ) -> Optional[MetricsTimer]: """ Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer. diff --git a/jina/serve/runtimes/worker/__init__.py b/jina/serve/runtimes/worker/__init__.py index 47c14a7195a1f..554cb04a5e520 100644 --- a/jina/serve/runtimes/worker/__init__.py +++ b/jina/serve/runtimes/worker/__init__.py @@ -135,14 +135,10 @@ async def _async_setup_grpc_server(self): reflection.SERVICE_NAME, ) # Mark all services as healthy. - health_pb2_grpc.add_HealthServicer_to_server( - self._health_servicer, self._grpc_server - ) + health_pb2_grpc.add_HealthServicer_to_server(self._health_servicer, self._grpc_server) for service in service_names: - await self._health_servicer.set( - service, health_pb2.HealthCheckResponse.SERVING - ) + await self._health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING) reflection.enable_server_reflection(service_names, self._grpc_server) bind_addr = f'{self.args.host}:{self.args.port}' self.logger.debug(f'start listening on {bind_addr}') @@ -187,11 +183,11 @@ async def endpoint_discovery(self, empty, context) -> jina_pb2.EndpointsProto: :returns: the response request """ self.logger.debug('got an endpoint discovery request') - endpointsProto = jina_pb2.EndpointsProto() - endpointsProto.endpoints.extend( + endpoints_proto = jina_pb2.EndpointsProto() + endpoints_proto.endpoints.extend( list(self._request_handler._executor.requests.keys()) ) - return endpointsProto + return endpoints_proto def _extract_tracing_context( self, metadata: grpc.aio.Metadata @@ -268,10 +264,10 @@ async def _status(self, empty, context) -> jina_pb2.JinaInfoProto: :param context: grpc context :returns: the response request """ - infoProto = jina_pb2.JinaInfoProto() + info_proto = jina_pb2.JinaInfoProto() version, env_info = get_full_version() for k, v in version.items(): - infoProto.jina[k] = str(v) + info_proto.jina[k] = str(v) for k, v in env_info.items(): - infoProto.envs[k] = str(v) - return infoProto + info_proto.envs[k] = str(v) + return info_proto diff --git a/tests/integration/runtimes/test_runtimes.py b/tests/integration/runtimes/test_runtimes.py index 6337a5df7f4ee..d2b28055dd954 100644 --- a/tests/integration/runtimes/test_runtimes.py +++ b/tests/integration/runtimes/test_runtimes.py @@ -115,7 +115,7 @@ def complete_graph_dict(): @pytest.mark.parametrize('uses_after', [True, False]) # test gateway, head and worker runtime by creating them manually in a more Flow like topology with branching/merging async def test_runtimes_flow_topology( - complete_graph_dict, uses_before, uses_after, port_generator + complete_graph_dict, uses_before, uses_after, port_generator ): pods = [ pod_name for pod_name in complete_graph_dict.keys() if 'gateway' not in pod_name @@ -443,7 +443,7 @@ async def test_runtimes_with_executor(port_generator): assert len(response_list) == 20 assert ( - len(response_list[0]) == (1 + 1 + 1) * 10 + 1 + len(response_list[0]) == (1 + 1 + 1) * 10 + 1 ) # 1 starting doc + 1 uses_before + every exec adds 1 * 10 shards + 1 doc uses_after doc_texts = [doc.text for doc in response_list[0]] @@ -693,13 +693,13 @@ def _create_worker_runtime(port, name='', executor=None): def _create_head_runtime( - port, - connection_list_dict, - name='', - polling='ANY', - uses_before=None, - uses_after=None, - retries=-1, + port, + connection_list_dict, + name='', + polling='ANY', + uses_before=None, + uses_after=None, + retries=-1, ): args = set_pod_parser().parse_args([]) args.port = port @@ -717,23 +717,23 @@ def _create_head_runtime( def _create_gateway_runtime( - graph_description, pod_addresses, port, protocol='grpc', retries=-1 + graph_description, pod_addresses, port, protocol='grpc', retries=-1 ): with GatewayRuntime( - set_gateway_parser().parse_args( - [ - '--graph-description', - graph_description, - '--deployments-addresses', - pod_addresses, - '--port', - str(port), - '--retries', - str(retries), - '--protocol', - protocol, - ] - ) + set_gateway_parser().parse_args( + [ + '--graph-description', + graph_description, + '--deployments-addresses', + pod_addresses, + '--port', + str(port), + '--retries', + str(retries), + '--protocol', + protocol, + ] + ) ) as runtime: runtime.run_forever() @@ -784,8 +784,8 @@ async def test_head_runtime_with_offline_shards(port_generator): ) with grpc.insecure_channel( - f'0.0.0.0:{head_port}', - options=GrpcConnectionPool.get_default_grpc_options(), + f'0.0.0.0:{head_port}', + options=GrpcConnectionPool.get_default_grpc_options(), ) as channel: stub = jina_pb2_grpc.JinaSingleDataRequestRPCStub(channel) _, call = stub.process_single_data.with_call( @@ -804,3 +804,53 @@ async def test_head_runtime_with_offline_shards(port_generator): head_process.join() for shard_process in shard_processes: shard_process.join() + + +def test_runtime_slow_processing_readiness(port_generator): + class SlowProcessingExecutor(Executor): + @requests + def foo(self, **kwargs): + time.sleep(10) + + worker_port = port_generator() + # create a single worker runtime + worker_process = multiprocessing.Process( + target=_create_worker_runtime, args=(worker_port, f'pod0', 'SlowProcessingExecutor') + ) + try: + worker_process.start() + AsyncNewLoopRuntime.wait_for_ready_or_shutdown( + timeout=5.0, + ctrl_address=f'0.0.0.0:{worker_port}', + ready_or_shutdown_event=multiprocessing.Event(), + ) + + def _send_messages(): + with grpc.insecure_channel( + f'0.0.0.0:{worker_port}', + options=GrpcConnectionPool.get_default_grpc_options(), + ) as channel: + stub = jina_pb2_grpc.JinaSingleDataRequestRPCStub(channel) + resp, _ = stub.process_single_data.with_call( + list(request_generator('/', DocumentArray([Document(text='abc')])))[0] + ) + assert resp.docs[0].text == 'abc' + + send_message_process = multiprocessing.Process( + target=_send_messages + ) + send_message_process.start() + + for _ in range(50): + is_ready = WorkerRuntime.is_ready(f'0.0.0.0:{worker_port}') + assert is_ready + time.sleep(0.5) + except Exception: + raise + finally: + worker_process.terminate() + send_message_process.terminate() + worker_process.join() + send_message_process.join() + assert worker_process.exitcode == 0 + assert send_message_process.exitcode == 0 diff --git a/tests/k8s/slow-process-executor/debug_executor.py b/tests/k8s/slow-process-executor/debug_executor.py index fb4c4e7068fda..c52d962449ffd 100644 --- a/tests/k8s/slow-process-executor/debug_executor.py +++ b/tests/k8s/slow-process-executor/debug_executor.py @@ -5,15 +5,13 @@ class SlowProcessExecutor(Executor): - def __init__(self, *args, **kwargs): + def __init__(self, time_sleep=1.0, *args, **kwargs): super().__init__(*args, **kwargs) - from jina.logging.logger import JinaLogger - - self.logger = JinaLogger(self.__class__.__name__) + self.time_sleep = time_sleep @requests def process(self, docs: DocumentArray, *args, **kwargs): - time.sleep(1.0) + time.sleep(self.time_sleep) for doc in docs: doc.tags['replica_uid'] = os.environ['POD_UID'] doc.tags['time'] = time.time() diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index d4f355c18e0cb..6893a395bf67c 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -1140,9 +1140,55 @@ async def test_flow_with_stateful_executor( core_client=core_client, endpoint='/len', ) + except Exception as exc: + logger.error(f' Exception raised {exc}') + raise exc + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'docker_images', [['slow-process-executor', 'jinaai/jina']], indirect=True +) +async def test_slow_executor_readiness_probe_works(docker_images, tmpdir, logger): + try: + dump_path = os.path.join(str(tmpdir), 'test-flow-slow-process-executor') + namespace = f'test-flow-slow-process-executor'.lower() + flow = Flow(name='test-flow-slow-process-executor',).add( + name='slow_process_executor', + uses=f'docker://{docker_images[0]}', + uses_with={'time_sleep': 20}, + replicas=2, + ) + + flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) + + from kubernetes import client + + api_client = client.ApiClient() + core_client = client.CoreV1Api(api_client=api_client) + app_client = client.AppsV1Api(api_client=api_client) + await create_all_flow_deployments_and_wait_ready( + dump_path, + namespace=namespace, + api_client=api_client, + app_client=app_client, + core_client=core_client, + deployment_replicas_expected={ + 'gateway': 1, + 'slow-process-executor': 2, + }, + logger=logger + ) + + resp = await run_test( + flow=flow, + namespace=namespace, + core_client=core_client, + n_docs=10, + request_size=1, + endpoint='/', + ) - assert len(resp) == 1 - assert resp[0].parameters == {'__results__': {'statefulexecutor': {'length': 10.0}}} + assert len(resp) == 10 except Exception as exc: logger.error(f' Exception raised {exc}') raise exc diff --git a/tests/k8s/test_k8s_failures.py b/tests/k8s/test_k8s_failures.py index ff6f1c7a9bbc2..bff2ed02a0f85 100644 --- a/tests/k8s/test_k8s_failures.py +++ b/tests/k8s/test_k8s_failures.py @@ -3,7 +3,6 @@ import functools import os import subprocess -import time from typing import Set import pytest @@ -16,12 +15,12 @@ async def scale( - deployment_name: str, - desired_replicas: int, - app_client, - k8s_namespace, - core_client, - logger, + deployment_name: str, + desired_replicas: int, + app_client, + k8s_namespace, + core_client, + logger, ): app_client.patch_namespaced_deployment_scale( deployment_name, @@ -46,7 +45,7 @@ async def scale( async def restart_deployment( - deployment, app_client, core_client, k8s_namespace, logger + deployment, app_client, core_client, k8s_namespace, logger ): now = datetime.datetime.utcnow() now = str(now.isoformat("T") + "Z") @@ -85,7 +84,7 @@ async def delete_pod(deployment, core_client, k8s_namespace, logger): namespace=k8s_namespace, label_selector=f'app={deployment}', ) - api_response = core_client.delete_namespaced_pod( + _ = core_client.delete_namespaced_pod( pods.items[0].metadata.name, k8s_namespace ) while True: @@ -119,7 +118,7 @@ async def delete_pod(deployment, core_client, k8s_namespace, logger): async def run_test_until_event( - flow, core_client, namespace, endpoint, stop_event, logger, sleep_time=0.05 + flow, core_client, namespace, endpoint, stop_event, logger, sleep_time=0.05 ): # start port forwarding from jina.clients import Client @@ -128,14 +127,14 @@ async def run_test_until_event( core_client.list_namespaced_pod( namespace=namespace, label_selector='app=gateway' ) - .items[0] - .metadata.name + .items[0] + .metadata.name ) config_path = os.environ['KUBECONFIG'] import portforward with portforward.forward( - namespace, gateway_pod_name, flow.port, flow.port, config_path + namespace, gateway_pod_name, flow.port, flow.port, config_path ): client_kwargs = dict( host='localhost', @@ -162,10 +161,10 @@ async def async_inputs(sent_ids: Set[int], sleep_time: float = 0.05): responses = [] sent_ids = set() async for resp in client.post( - endpoint, - inputs=functools.partial(async_inputs, sent_ids, sleep_time), - request_size=1, - return_responses=True + endpoint, + inputs=functools.partial(async_inputs, sent_ids, sleep_time), + request_size=1, + return_responses=True ): responses.append(resp) @@ -240,6 +239,7 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster): sleep_time=None, ) ) + logger.info(f' Sending task has been scheduled') await asyncio.sleep(5.0) # Scale down the Executor to 2 replicas await scale( @@ -250,6 +250,7 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster): k8s_namespace=namespace, logger=logger, ) + logger.info(f' Scaling to 2 replicas has been done') # Scale back up to 3 replicas await scale( deployment_name='executor0', @@ -259,6 +260,7 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster): k8s_namespace=namespace, logger=logger, ) + logger.info(f' Scaling to 3 replicas has been done') await asyncio.sleep(5.0) # restart all pods in the deployment await restart_deployment( @@ -268,6 +270,7 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster): k8s_namespace=namespace, logger=logger, ) + logger.info(f' Restarting deployment has been done') await asyncio.sleep(5.0) await delete_pod( deployment='executor0', @@ -275,10 +278,13 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster): k8s_namespace=namespace, logger=logger, ) + logger.info(f'Deleting pod has been done') await asyncio.sleep(5.0) stop_event.set() responses, sent_ids = await send_task + logger.info(f'Sending tag has finished') + logger.info(f'Sending tag has finished: {len(sent_ids)} vs {len(responses)}') assert len(sent_ids) == len(responses) doc_ids = set() pod_ids = set() @@ -313,4 +319,4 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster): assert len(sent_ids) == len(responses) except Exception as exc: logger.error(f' Exception raised {exc}') - raise exc + raise exc \ No newline at end of file