From 4c3d760f511da2c8716bd01c2f4566040b8d500e Mon Sep 17 00:00:00 2001 From: Joan Fontanals Date: Wed, 3 Aug 2022 14:23:42 +0200 Subject: [PATCH] fix: do not send target_executor to Executors (#5041) --- .../runtimes/gateway/request_handling.py | 6 +- .../test_external_deployment.py | 80 ++++++++++++------- 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index 0028ef9f83916..3eb6b261d2cbd 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -140,6 +140,10 @@ def _handle_request(request: 'Request') -> 'Tuple[Future, Optional[Future]]': has_specific_params = True break + target_executor = request.header.target_executor + # reset it in case we send to an external gateway + request.header.target_executor = '' + for origin_node in request_graph.origin_nodes: leaf_tasks = origin_node.get_leaf_tasks( connection_pool=connection_pool, @@ -147,7 +151,7 @@ def _handle_request(request: 'Request') -> 'Tuple[Future, Optional[Future]]': previous_task=None, endpoint=endpoint, executor_endpoint_mapping=self._executor_endpoint_mapping, - target_executor_pattern=request.header.target_executor, + target_executor_pattern=target_executor or None, request_input_parameters=request_input_parameters, request_input_has_specific_params=has_specific_params, copy_request_at_send=num_outgoing_nodes > 1 and has_specific_params diff --git a/tests/integration/external_deployment/test_external_deployment.py b/tests/integration/external_deployment/test_external_deployment.py index a52c293939cf6..5c02498e4f01d 100644 --- a/tests/integration/external_deployment/test_external_deployment.py +++ b/tests/integration/external_deployment/test_external_deployment.py @@ -58,7 +58,7 @@ def foo(self, docs, *args, **kwargs): @pytest.mark.parametrize('num_shards', [1, 2], indirect=True) def test_flow_with_external_deployment( - external_deployment, external_deployment_args, input_docs, num_shards + external_deployment, external_deployment_args, input_docs, num_shards ): with external_deployment: external_args = vars(external_deployment_args) @@ -80,7 +80,7 @@ def test_flow_with_external_deployment( @pytest.mark.parametrize('num_shards', [2], indirect=True) def test_two_flow_with_shared_external_deployment( - external_deployment, external_deployment_args, input_docs, num_shards + external_deployment, external_deployment_args, input_docs, num_shards ): external_deployment.head_args.disable_reduce = True with external_deployment: @@ -96,8 +96,8 @@ def test_two_flow_with_shared_external_deployment( flow2 = ( Flow() - .add(name='foo') - .add( + .add(name='foo') + .add( **external_args, name='external_fake', external=True, @@ -161,12 +161,12 @@ def external_deployment_shards_2(external_deployment_shards_2_args): @pytest.mark.parametrize('num_shards', [1, 2], indirect=True) def test_flow_with_external_deployment_shards( - external_deployment_shards_1, - external_deployment_shards_2, - external_deployment_shards_1_args, - external_deployment_shards_2_args, - input_docs, - num_shards, + external_deployment_shards_1, + external_deployment_shards_2, + external_deployment_shards_1_args, + external_deployment_shards_2_args, + input_docs, + num_shards, ): with external_deployment_shards_1, external_deployment_shards_2: external_args_1 = vars(external_deployment_shards_1_args) @@ -179,20 +179,20 @@ def test_flow_with_external_deployment_shards( del external_args_2['deployment_role'] flow = ( Flow() - .add(name='executor1') - .add( + .add(name='executor1') + .add( **external_args_1, name='external_fake_1', external=True, needs=['executor1'], ) - .add( + .add( **external_args_2, name='external_fake_2', external=True, needs=['executor1'], ) - .needs(needs=['external_fake_1', 'external_fake_2'], port=random_port()) + .needs(needs=['external_fake_1', 'external_fake_2'], port=random_port()) ) with flow: @@ -226,10 +226,10 @@ def external_deployment_pre_shards(external_deployment_pre_shards_args): @pytest.mark.parametrize('num_shards', [1, 2], indirect=True) def test_flow_with_external_deployment_pre_shards( - external_deployment_pre_shards, - external_deployment_pre_shards_args, - input_docs, - num_shards, + external_deployment_pre_shards, + external_deployment_pre_shards_args, + input_docs, + num_shards, ): with external_deployment_pre_shards: external_args = vars(external_deployment_pre_shards_args) @@ -238,20 +238,20 @@ def test_flow_with_external_deployment_pre_shards( del external_args['deployment_role'] flow = ( Flow() - .add( + .add( **external_args, name='external_fake', external=True, ) - .add( + .add( name='executor1', needs=['external_fake'], ) - .add( + .add( name='executor2', needs=['external_fake'], ) - .needs(['executor1', 'executor2']) + .needs(['executor1', 'executor2']) ) with flow: resp = flow.index(inputs=input_docs) @@ -286,10 +286,10 @@ def external_deployment_join(external_deployment_join_args): @pytest.mark.parametrize('num_shards', [1, 2], indirect=True) def test_flow_with_external_deployment_join( - external_deployment_join, - external_deployment_join_args, - input_docs, - num_shards, + external_deployment_join, + external_deployment_join_args, + input_docs, + num_shards, ): with external_deployment_join: external_args = vars(external_deployment_join_args) @@ -298,19 +298,19 @@ def test_flow_with_external_deployment_join( del external_args['deployment_role'] flow = ( Flow() - .add( + .add( **external_args, external=True, ) - .add( + .add( name='executor1', needs=['executor0'], ) - .add( + .add( name='executor2', needs=['executor0'], ) - .needs( + .needs( **external_args, external=True, needs=['executor1', 'executor2'], @@ -321,3 +321,23 @@ def test_flow_with_external_deployment_join( # Reducing applied everywhere, expect 50 docs, same as the input validate_response(resp, len(input_docs)) + + +def test_external_flow_with_target_executor(): + class ExtExecutor(Executor): + + @requests + def foo(self, docs, **kwargs): + for doc in docs: + doc.text = 'external' + + external_flow = Flow().add(uses=ExtExecutor) + + with external_flow: + d = Document(text='sunset with green landscape by the river') + f = Flow().add(port=external_flow.port, external=True, name='external_executor') + with f: + response = f.post(on='/', inputs=d, target_executor='external_executor') + + + assert response[0].text == 'external'