Skip to content

Commit

Permalink
fix: return responses (#4343)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Feb 15, 2022
1 parent aebabad commit 2efe175
Show file tree
Hide file tree
Showing 50 changed files with 238 additions and 151 deletions.
2 changes: 1 addition & 1 deletion cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@
'--port',
'--https',
'--asyncio',
'--results-as-docarray',
'--return-responses',
'--protocol',
],
'export-api': ['--help', '--yaml-path', '--json-path', '--schema-path'],
Expand Down
4 changes: 2 additions & 2 deletions jina/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def Client(
port: Optional[int] = None,
protocol: Optional[str] = 'GRPC',
proxy: Optional[bool] = False,
results_as_docarray: Optional[bool] = False,
return_responses: Optional[bool] = False,
**kwargs
) -> Union[
'AsyncWebSocketClient',
Expand All @@ -40,7 +40,7 @@ def Client(
:param port: The port of the Gateway, which the client should connect to.
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param results_as_docarray: If set, return results as DocArray instead of Request.
:param return_responses: If set, return results as List of Requests instead of a reduced DocArray.
:return: the new Client object
.. # noqa: DAR202
Expand Down
2 changes: 1 addition & 1 deletion jina/clients/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def _get_results(*args, **kwargs):
result.append(resp)

if return_results:
if c.args.results_as_docarray:
if not c.args.return_responses:
docs = [r.data.docs for r in result]
if len(docs) < 1:
return docs
Expand Down
4 changes: 2 additions & 2 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __init__(
port: Optional[int] = None,
protocol: Optional[str] = 'GRPC',
proxy: Optional[bool] = False,
results_as_docarray: Optional[bool] = False,
return_responses: Optional[bool] = False,
**kwargs,
):
"""Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina client` CLI.
Expand All @@ -108,7 +108,7 @@ def __init__(
:param port: The port of the Gateway, which the client should connect to.
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param results_as_docarray: If set, return results as DocArray instead of Request.
:param return_responses: If set, return results as List of Requests instead of a reduced DocArray.
.. # noqa: DAR202
.. # noqa: DAR101
Expand Down
4 changes: 2 additions & 2 deletions jina/parsers/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def mixin_client_features_parser(parser):
)

parser.add_argument(
'--results-as-docarray',
'--return-responses',
action='store_true',
default=False,
help="If set, return results as DocArray instead of Request.",
help="If set, return results as List of Requests instead of a reduced DocArray.",
)
2 changes: 1 addition & 1 deletion tests/daemon/unit/api/endpoints/partial/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_flow_api(monkeypatch, partial_flow_client):

get_response = partial_flow_client.get(api)

endpoint_responses = Client(port=56789).post(
endpoint_responses = Client(port=56789, return_responses=True).post(
on='/any_endpoint', inputs=Document(), return_results=True
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_remote_executor_via_config_yaml(upload_files, config_yml):
host=CLOUD_HOST, uses=config_yml, upload_files=upload_files
)
with f:
resp = Client(port=exposed_port).post(
resp = Client(port=exposed_port, return_responses=True).post(
on='/',
inputs=Document(text=config_yml),
return_results=True,
Expand Down Expand Up @@ -126,7 +126,7 @@ def test_remote_executor_via_pymodules(upload_files, uses, py_modules):
upload_files=upload_files,
)
with f:
resp = Client(port=exposed_port).post(
resp = Client(port=exposed_port, return_responses=True).post(
on='/',
inputs=Document(text=py_modules),
return_results=True,
Expand Down
2 changes: 1 addition & 1 deletion tests/distributed/test_dir_structures/test_remote_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_remote_flow_with_directory(directory, filename, mul):
filename=filename,
envs={'PORT_EXPOSE': 12345},
):
resp = Client(port=12345).post(
resp = Client(port=12345, return_responses=True).post(
on='/',
inputs=Document(text=directory),
return_results=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ def doc_to_index():

@pytest.fixture
def client():
return Client(host='localhost', port=45678)
return Client(host='localhost', port=45678, return_responses=True)


@pytest.fixture
def grpc_client():
args = set_client_cli_parser().parse_args(
['--host', 'localhost', '--port', '45678']
['--host', 'localhost', '--port', '45678', '--return-responses']
)

return GRPCClient(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_local_flow_use_external_executor(
local_flow, documents_to_index, docker_compose
):
with local_flow as f:
responses = Client(port=exposed_port).index(
responses = Client(port=exposed_port, return_responses=True).index(
inputs=documents_to_index, return_results=True, request_size=100
)
assert len(responses) == 2
Expand Down
26 changes: 22 additions & 4 deletions tests/distributed/test_remote_flows/test_remote_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def test_remote_jinad_flow(jinad_client, flow_envs):
remote_flow_args = remote_flow_args['arguments']['object']['arguments']
assert remote_flow_args['port_expose'] == MINI_FLOW1_PORT
assert remote_flow_args['protocol'] == PROTOCOL
resp = Client(host=HOST, port=MINI_FLOW1_PORT, protocol='http').post(
resp = Client(
host=HOST, port=MINI_FLOW1_PORT, protocol='http', return_responses=True
).post(
on='/',
inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)],
return_results=True,
Expand Down Expand Up @@ -109,7 +111,13 @@ async def test_remote_jinad_flow_async(async_jinad_client, flow_envs):
remote_flow_args = remote_flow_args['arguments']['object']['arguments']
assert remote_flow_args['port_expose'] == MINI_FLOW1_PORT
assert remote_flow_args['protocol'] == PROTOCOL
resp = Client(host=HOST, port=MINI_FLOW1_PORT, protocol='http', asyncio=True).post(
resp = Client(
host=HOST,
port=MINI_FLOW1_PORT,
protocol='http',
asyncio=True,
return_responses=True,
).post(
on='/',
inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)],
return_results=True,
Expand Down Expand Up @@ -143,7 +151,13 @@ async def test_remote_jinad_flow_get_delete_all(async_jinad_client):
# get all flows
remote_flow_args = await async_jinad_client.flows.list()
assert len(remote_flow_args.keys()) == 2
resp = Client(host=HOST, port=MINI_FLOW2_PORT, protocol='http', asyncio=True).post(
resp = Client(
host=HOST,
port=MINI_FLOW2_PORT,
protocol='http',
asyncio=True,
return_responses=True,
).post(
on='/',
inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)],
return_results=True,
Expand Down Expand Up @@ -176,7 +190,11 @@ async def test_jinad_flow_container_runtime(async_jinad_client, executor_image):
remote_flow_args = await async_jinad_client.flows.get(DaemonID(flow_id))
assert remote_flow_args
resp = Client(
host=HOST, port=CONTAINER_FLOW_PORT, protocol='http', asyncio=True
host=HOST,
port=CONTAINER_FLOW_PORT,
protocol='http',
asyncio=True,
return_responses=True,
).post(
on='/',
inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)],
Expand Down
6 changes: 4 additions & 2 deletions tests/distributed/test_remote_flows/test_remote_flow_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_remote_flow_local_executors(replicas, jinad_client):
},
jinad_client=jinad_client,
):
resp = Client(host=HOST, port=FLOW_PORT).post(
resp = Client(host=HOST, port=FLOW_PORT, return_responses=True).post(
on='/',
inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)],
return_results=True,
Expand All @@ -69,7 +69,9 @@ def test_port_expose_env_var(port_expose, func, jinad_client):
envs={'PORT_EXPOSE': port_expose, 'FUNC': func},
)

r = Client(host=HOST, port=port_expose, protocol='http').post(
r = Client(
host=HOST, port=port_expose, protocol='http', return_responses=True
).post(
on='/blah',
inputs=(Document(text=f'text {i}') for i in range(2)),
return_results=True,
Expand Down
16 changes: 12 additions & 4 deletions tests/distributed/test_remote_flows/test_remote_flow_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def test_scale_remote_flow(docker_image_built, jinad_client, deployment_params):
)
assert flow_id

ret1 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=False).index(
ret1 = Client(
host=HOST, port=FLOW_PORT, protocol='http', asyncio=False, return_responses=True
).index(
inputs=DocumentArray([Document() for _ in range(200)]),
return_results=True,
request_size=10,
Expand All @@ -81,7 +83,9 @@ def test_scale_remote_flow(docker_image_built, jinad_client, deployment_params):
id=flow_id, deployment_name=SCALE_EXECUTOR, replicas=scale_to
)

ret2 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=False).index(
ret2 = Client(
host=HOST, port=FLOW_PORT, protocol='http', asyncio=False, return_responses=True
).index(
inputs=DocumentArray([Document() for _ in range(200)]),
return_results=True,
request_size=10,
Expand Down Expand Up @@ -117,7 +121,9 @@ async def test_scale_remote_flow_async(
)
assert flow_id

ret1 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=True).index(
ret1 = Client(
host=HOST, port=FLOW_PORT, protocol='http', asyncio=True, return_responses=True
).index(
inputs=DocumentArray([Document() for _ in range(1000)]),
return_results=True,
request_size=10,
Expand All @@ -134,7 +140,9 @@ async def test_scale_remote_flow_async(
id=flow_id, deployment_name=SCALE_EXECUTOR, replicas=scale_to
)

ret2 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=True).index(
ret2 = Client(
host=HOST, port=FLOW_PORT, protocol='http', asyncio=True, return_responses=True
).index(
inputs=DocumentArray([Document() for _ in range(1000)]),
return_results=True,
request_size=10,
Expand Down
10 changes: 6 additions & 4 deletions tests/distributed/test_remote_peas/test_remote_peas.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ async def test_pseudo_remote_pods_topologies(gateway, head, worker):
)

# send requests to the gateway
c = Client(host='127.0.0.1', port=port_expose, asyncio=True)
c = Client(
host='127.0.0.1', port=port_expose, asyncio=True, return_responses=True
)
responses = c.post(
'/', inputs=async_inputs, request_size=1, return_results=True
)
Expand Down Expand Up @@ -295,7 +297,7 @@ async def test_pseudo_remote_pods_shards(gateway, head, worker, polling):

await asyncio.sleep(1.0)

c = Client(host='localhost', port=port_expose, asyncio=True)
c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True)
responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True)
response_list = []
async for response in responses:
Expand Down Expand Up @@ -363,7 +365,7 @@ async def test_pseudo_remote_pods_replicas(gateway, head, worker):

await asyncio.sleep(1.0)

c = Client(host='localhost', port=port_expose, asyncio=True)
c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True)
responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True)
response_list = []
async for response in responses:
Expand Down Expand Up @@ -483,7 +485,7 @@ async def test_pseudo_remote_pods_executor(

await asyncio.sleep(1.0)

c = Client(host=HOST, port=port_expose, asyncio=True)
c = Client(host=HOST, port=port_expose, asyncio=True, return_responses=True)
responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True)
response_list = []
async for response in responses:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,18 @@ def test_dump_dbms_remote(executor_images, docker_compose):
dbms_flow_id, query_flow_id, workspace_id = _create_flows()

# check that there are no matches in Query Flow
r = Client(host=HOST, port=REST_PORT_QUERY, protocol='http').search(
inputs=[doc for doc in docs[:nr_search]], return_results=True
)
r = Client(
host=HOST, port=REST_PORT_QUERY, protocol='http', return_responses=True
).search(inputs=[doc for doc in docs[:nr_search]], return_results=True)
assert r[0].data.docs[0].matches is None or len(r[0].data.docs[0].matches) == 0

# index on DBMS flow
Client(host=HOST, port=REST_PORT_DBMS, protocol='http').index(
inputs=docs, return_results=True
)
Client(
host=HOST, port=REST_PORT_DBMS, protocol='http', return_responses=True
).index(inputs=docs, return_results=True)

# dump data for DBMS flow
Client(host=HOST, port=REST_PORT_DBMS, protocol='http').post(
Client(host=HOST, port=REST_PORT_DBMS, protocol='http', return_responses=True).post(
on='/dump',
parameters={'shards': SHARDS, 'dump_path': DUMP_PATH},
target_executor='indexer_dbms',
Expand All @@ -97,7 +97,9 @@ def test_dump_dbms_remote(executor_images, docker_compose):
)

# validate that there are matches now
r = Client(host=HOST, port=REST_PORT_QUERY, protocol='http').search(
r = Client(
host=HOST, port=REST_PORT_QUERY, protocol='http', return_responses=True
).search(
inputs=[doc for doc in docs[:nr_search]],
return_results=True,
parameters={'top_k': 10},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ def remote_flow_with_runtime(request):
def test_scale_success(remote_flow_with_runtime: Flow, deployment_params):
num_replicas, scale_to, shards = deployment_params
with remote_flow_with_runtime as f:
ret1 = Client(port=exposed_port).index(
ret1 = Client(port=exposed_port, return_responses=True).index(
inputs=DocumentArray([Document() for _ in range(200)]),
return_results=True,
request_size=10,
)
f.scale(deployment_name='executor', replicas=scale_to)
ret2 = Client(port=exposed_port).index(
ret2 = Client(port=exposed_port, return_responses=True).index(
inputs=DocumentArray([Document() for _ in range(200)]),
return_results=True,
request_size=10,
Expand Down Expand Up @@ -114,7 +114,7 @@ def test_scale_with_concurrent_client(
remote_flow_with_runtime: Flow, deployment_params, protocol
):
def peer_client(port, protocol, peer_hash, queue):
rv = Client(protocol=protocol, port=port).index(
rv = Client(protocol=protocol, port=port, return_responses=True).index(
[Document(text=peer_hash) for _ in range(NUM_DOCS_SENT_BY_CLIENTS)],
request_size=5,
return_results=True,
Expand Down Expand Up @@ -145,7 +145,7 @@ def peer_client(port, protocol, peer_hash, queue):
for t in thread_pool:
t.join()

c = Client(protocol=protocol, port=port_expose)
c = Client(protocol=protocol, port=port_expose, return_responses=True)
rv = c.index(
[Document() for _ in range(5)], request_size=1, return_results=True
)
Expand Down
6 changes: 5 additions & 1 deletion tests/distributed/test_topologies/test_topologies.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def test_remote_flow_local_executors(mocker, replicas):
host=__default_host__,
port=args['port_expose'],
protocol=args['protocol'],
return_responses=True,
).post(
on='/',
inputs=(
Expand Down Expand Up @@ -203,7 +204,10 @@ def test_remote_workspace_value():
)
args = client.flows.get(flow_id)['arguments']['object']['arguments']
response = Client(
host=HOST, port=args['port_expose'], protocol=args['protocol']
host=HOST,
port=args['port_expose'],
protocol=args['protocol'],
return_responses=True,
).post(on='/', inputs=[Document()], show_progress=True, return_results=True)
assert (
response[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def test_remote_flow_containerized_executors(docker_image, mocker):
host=__default_host__,
port=args['port_expose'],
protocol=args['protocol'],
return_responses=True,
).post(
on='/',
inputs=(
Expand Down

0 comments on commit 2efe175

Please sign in to comment.