Skip to content

Commit

Permalink
fix: do not send target_executor to Executors (#5041)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Aug 3, 2022
1 parent a51b42c commit 4c3d760
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 31 deletions.
6 changes: 5 additions & 1 deletion jina/serve/runtimes/gateway/request_handling.py
Expand Up @@ -140,14 +140,18 @@ def _handle_request(request: 'Request') -> 'Tuple[Future, Optional[Future]]':
has_specific_params = True
break

target_executor = request.header.target_executor
# reset it in case we send to an external gateway
request.header.target_executor = ''

for origin_node in request_graph.origin_nodes:
leaf_tasks = origin_node.get_leaf_tasks(
connection_pool=connection_pool,
request_to_send=request,
previous_task=None,
endpoint=endpoint,
executor_endpoint_mapping=self._executor_endpoint_mapping,
target_executor_pattern=request.header.target_executor,
target_executor_pattern=target_executor or None,
request_input_parameters=request_input_parameters,
request_input_has_specific_params=has_specific_params,
copy_request_at_send=num_outgoing_nodes > 1 and has_specific_params
Expand Down
80 changes: 50 additions & 30 deletions tests/integration/external_deployment/test_external_deployment.py
Expand Up @@ -58,7 +58,7 @@ def foo(self, docs, *args, **kwargs):

@pytest.mark.parametrize('num_shards', [1, 2], indirect=True)
def test_flow_with_external_deployment(
external_deployment, external_deployment_args, input_docs, num_shards
external_deployment, external_deployment_args, input_docs, num_shards
):
with external_deployment:
external_args = vars(external_deployment_args)
Expand All @@ -80,7 +80,7 @@ def test_flow_with_external_deployment(

@pytest.mark.parametrize('num_shards', [2], indirect=True)
def test_two_flow_with_shared_external_deployment(
external_deployment, external_deployment_args, input_docs, num_shards
external_deployment, external_deployment_args, input_docs, num_shards
):
external_deployment.head_args.disable_reduce = True
with external_deployment:
Expand All @@ -96,8 +96,8 @@ def test_two_flow_with_shared_external_deployment(

flow2 = (
Flow()
.add(name='foo')
.add(
.add(name='foo')
.add(
**external_args,
name='external_fake',
external=True,
Expand Down Expand Up @@ -161,12 +161,12 @@ def external_deployment_shards_2(external_deployment_shards_2_args):

@pytest.mark.parametrize('num_shards', [1, 2], indirect=True)
def test_flow_with_external_deployment_shards(
external_deployment_shards_1,
external_deployment_shards_2,
external_deployment_shards_1_args,
external_deployment_shards_2_args,
input_docs,
num_shards,
external_deployment_shards_1,
external_deployment_shards_2,
external_deployment_shards_1_args,
external_deployment_shards_2_args,
input_docs,
num_shards,
):
with external_deployment_shards_1, external_deployment_shards_2:
external_args_1 = vars(external_deployment_shards_1_args)
Expand All @@ -179,20 +179,20 @@ def test_flow_with_external_deployment_shards(
del external_args_2['deployment_role']
flow = (
Flow()
.add(name='executor1')
.add(
.add(name='executor1')
.add(
**external_args_1,
name='external_fake_1',
external=True,
needs=['executor1'],
)
.add(
.add(
**external_args_2,
name='external_fake_2',
external=True,
needs=['executor1'],
)
.needs(needs=['external_fake_1', 'external_fake_2'], port=random_port())
.needs(needs=['external_fake_1', 'external_fake_2'], port=random_port())
)

with flow:
Expand Down Expand Up @@ -226,10 +226,10 @@ def external_deployment_pre_shards(external_deployment_pre_shards_args):

@pytest.mark.parametrize('num_shards', [1, 2], indirect=True)
def test_flow_with_external_deployment_pre_shards(
external_deployment_pre_shards,
external_deployment_pre_shards_args,
input_docs,
num_shards,
external_deployment_pre_shards,
external_deployment_pre_shards_args,
input_docs,
num_shards,
):
with external_deployment_pre_shards:
external_args = vars(external_deployment_pre_shards_args)
Expand All @@ -238,20 +238,20 @@ def test_flow_with_external_deployment_pre_shards(
del external_args['deployment_role']
flow = (
Flow()
.add(
.add(
**external_args,
name='external_fake',
external=True,
)
.add(
.add(
name='executor1',
needs=['external_fake'],
)
.add(
.add(
name='executor2',
needs=['external_fake'],
)
.needs(['executor1', 'executor2'])
.needs(['executor1', 'executor2'])
)
with flow:
resp = flow.index(inputs=input_docs)
Expand Down Expand Up @@ -286,10 +286,10 @@ def external_deployment_join(external_deployment_join_args):

@pytest.mark.parametrize('num_shards', [1, 2], indirect=True)
def test_flow_with_external_deployment_join(
external_deployment_join,
external_deployment_join_args,
input_docs,
num_shards,
external_deployment_join,
external_deployment_join_args,
input_docs,
num_shards,
):
with external_deployment_join:
external_args = vars(external_deployment_join_args)
Expand All @@ -298,19 +298,19 @@ def test_flow_with_external_deployment_join(
del external_args['deployment_role']
flow = (
Flow()
.add(
.add(
**external_args,
external=True,
)
.add(
.add(
name='executor1',
needs=['executor0'],
)
.add(
.add(
name='executor2',
needs=['executor0'],
)
.needs(
.needs(
**external_args,
external=True,
needs=['executor1', 'executor2'],
Expand All @@ -321,3 +321,23 @@ def test_flow_with_external_deployment_join(

# Reducing applied everywhere, expect 50 docs, same as the input
validate_response(resp, len(input_docs))


def test_external_flow_with_target_executor():
class ExtExecutor(Executor):

@requests
def foo(self, docs, **kwargs):
for doc in docs:
doc.text = 'external'

external_flow = Flow().add(uses=ExtExecutor)

with external_flow:
d = Document(text='sunset with green landscape by the river')
f = Flow().add(port=external_flow.port, external=True, name='external_executor')
with f:
response = f.post(on='/', inputs=d, target_executor='external_executor')


assert response[0].text == 'external'

0 comments on commit 4c3d760

Please sign in to comment.