diff --git a/docs/fundamentals/flow/executor-args.md b/docs/fundamentals/flow/executor-args.md index 5c53ba39f89cd..155e4b27c68e0 100644 --- a/docs/fundamentals/flow/executor-args.md +++ b/docs/fundamentals/flow/executor-args.md @@ -16,6 +16,7 @@ | `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` | | `native` | If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. | `boolean` | `False` | | `output_array_type` | The type of array `tensor` and `embedding` will be serialized to.

Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found
`here `.
Defaults to retaining whatever type is returned by the Executor. | `string` | `None` | +| `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` | | `entrypoint` | The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective. | `string` | `None` | | `docker_kwargs` | Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
container.

More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/ | `object` | `None` | | `volumes` | The path on the host to be mounted inside the container.

Note,
- If separated by `:`, then the first part will be considered as the local host path and the second part is the path in the container system.
- If no split provided, then the basename of that directory will be mounted into container's root path, e.g. `--volumes="/user/test/my-workspace"` will be mounted into `/my-workspace` inside the container.
- All volumes are mounted with read-write mode. | `array` | `None` | diff --git a/docs/fundamentals/flow/gateway-args.md b/docs/fundamentals/flow/gateway-args.md index 54e8c4df1616e..e94fca783b323 100644 --- a/docs/fundamentals/flow/gateway-args.md +++ b/docs/fundamentals/flow/gateway-args.md @@ -16,6 +16,7 @@ | `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` | | `native` | If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. | `boolean` | `False` | | `output_array_type` | The type of array `tensor` and `embedding` will be serialized to.

Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found
`here `.
Defaults to retaining whatever type is returned by the Executor. | `string` | `None` | +| `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` | | `prefetch` | Number of requests fetched from the client before feeding into the first Executor.

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default) | `number` | `1000` | | `title` | The title of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` | | `description` | The description of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` | @@ -24,7 +25,6 @@ | `no_crud_endpoints` | If set, `/index`, `/search`, `/update`, `/delete` endpoints are removed from HTTP interface.

Any executor that has `@requests(on=...)` bind with those values will receive data requests. | `boolean` | `False` | | `expose_endpoints` | A JSON string that represents a map from executor endpoints (`@requests(on=...)`) to HTTP endpoints. | `string` | `None` | | `uvicorn_kwargs` | Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server

More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/ | `object` | `None` | -| `grpc_server_kwargs` | Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update | `object` | `None` | | `ssl_certfile` | the path to the certificate file | `string` | `None` | | `ssl_keyfile` | the path to the key file | `string` | `None` | | `expose_graphql_endpoint` | If set, /graphql endpoint is added to HTTP interface. | `boolean` | `False` | diff --git a/jina/helper.py b/jina/helper.py index 8c955d33f9c99..81c65f8b59eaa 100644 --- a/jina/helper.py +++ b/jina/helper.py @@ -17,7 +17,6 @@ from collections.abc import MutableMapping from datetime import datetime from itertools import islice -from pprint import pprint from socket import AF_INET, SOCK_STREAM, socket from types import SimpleNamespace from typing import ( diff --git a/jina/orchestrate/flow/base.py b/jina/orchestrate/flow/base.py index bff125dc79162..71f7af0243026 100644 --- a/jina/orchestrate/flow/base.py +++ b/jina/orchestrate/flow/base.py @@ -153,7 +153,7 @@ def __init__( floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', - grpc_server_kwargs: Optional[dict] = None, + grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, @@ -203,7 +203,7 @@ def __init__( :param floating: If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. :param graph_conditions: Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents. :param graph_description: Routing graph for the gateway - :param grpc_server_kwargs: Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update + :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. @@ -376,7 +376,7 @@ def __init__( :param floating: If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. :param graph_conditions: Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents. :param graph_description: Routing graph for the gateway - :param grpc_server_kwargs: Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update + :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. @@ -793,6 +793,7 @@ def add( floating: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, + grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', install_requirements: Optional[bool] = False, @@ -853,6 +854,7 @@ def add( - To access specified gpus based on device id, use `--gpus device=[YOUR-GPU-DEVICE-ID]` - To access specified gpus based on multiple device id, use `--gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2]` - To specify more parameters, use `--gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display + :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local @@ -999,6 +1001,7 @@ def add( - To access specified gpus based on device id, use `--gpus device=[YOUR-GPU-DEVICE-ID]` - To access specified gpus based on multiple device id, use `--gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2]` - To specify more parameters, use `--gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display + :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local diff --git a/jina/parsers/orchestrate/runtimes/remote.py b/jina/parsers/orchestrate/runtimes/remote.py index f7e147ecf18b6..c3d4aab87e69b 100644 --- a/jina/parsers/orchestrate/runtimes/remote.py +++ b/jina/parsers/orchestrate/runtimes/remote.py @@ -197,16 +197,6 @@ def mixin_http_gateway_parser(parser=None): ''', ) - gp.add_argument( - '--grpc-server-kwargs', - action=KVAppendAction, - metavar='KEY: VALUE', - nargs='*', - help=''' - Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update - ''', - ) - gp.add_argument( '--ssl-certfile', type=str, diff --git a/jina/parsers/orchestrate/runtimes/worker.py b/jina/parsers/orchestrate/runtimes/worker.py index ceb5bedc251b2..c519463d8fbe5 100644 --- a/jina/parsers/orchestrate/runtimes/worker.py +++ b/jina/parsers/orchestrate/runtimes/worker.py @@ -103,3 +103,12 @@ def mixin_worker_runtime_parser(parser): Defaults to retaining whatever type is returned by the Executor. ''', ) + + gp.add_argument( + '--grpc-server-options', + action=KVAppendAction, + metavar='KEY: VALUE', + nargs='*', + help="Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1}", + default=None, + ) diff --git a/jina/serve/runtimes/gateway/grpc/__init__.py b/jina/serve/runtimes/gateway/grpc/__init__.py index 18c3de6a26f3c..f2d62c7590986 100644 --- a/jina/serve/runtimes/gateway/grpc/__init__.py +++ b/jina/serve/runtimes/gateway/grpc/__init__.py @@ -9,8 +9,9 @@ from jina.excepts import PortAlreadyUsed from jina.helper import get_full_version, is_port_free from jina.proto import jina_pb2, jina_pb2_grpc -from jina.serve.runtimes.gateway import GatewayRuntime from jina.serve.bff import GatewayBFF +from jina.serve.runtimes.gateway import GatewayRuntime +from jina.serve.runtimes.helper import _get_grpc_server_options from jina.types.request.status import StatusMessage __all__ = ['GRPCGatewayRuntime'] @@ -20,9 +21,9 @@ class GRPCGatewayRuntime(GatewayRuntime): """Gateway Runtime for gRPC.""" def __init__( - self, - args: argparse.Namespace, - **kwargs, + self, + args: argparse.Namespace, + **kwargs, ): """Initialize the runtime :param args: args from CLI @@ -45,10 +46,7 @@ async def async_setup(self): raise PortAlreadyUsed(f'port:{self.args.port}') self.server = grpc.aio.server( - options=[ - ('grpc.max_send_message_length', -1), - ('grpc.max_receive_message_length', -1), - ] + options=_get_grpc_server_options(self.args.grpc_server_options) ) await self._async_setup_server() @@ -62,19 +60,23 @@ async def _async_setup_server(self): deployments_addresses = json.loads(self.args.deployments_addresses) deployments_disable_reduce = json.loads(self.args.deployments_disable_reduce) - self.gateway_bff = GatewayBFF(graph_representation=graph_description, - executor_addresses=deployments_addresses, - graph_conditions=graph_conditions, - deployments_disable_reduce=deployments_disable_reduce, - timeout_send=self.timeout_send, - retries=self.args.retries, - compression=self.args.compression, - runtime_name=self.name, - prefetch=self.args.prefetch, - logger=self.logger, - metrics_registry=self.metrics_registry) - - jina_pb2_grpc.add_JinaRPCServicer_to_server(self.gateway_bff._streamer, self.server) + self.gateway_bff = GatewayBFF( + graph_representation=graph_description, + executor_addresses=deployments_addresses, + graph_conditions=graph_conditions, + deployments_disable_reduce=deployments_disable_reduce, + timeout_send=self.timeout_send, + retries=self.args.retries, + compression=self.args.compression, + runtime_name=self.name, + prefetch=self.args.prefetch, + logger=self.logger, + metrics_registry=self.metrics_registry, + ) + + jina_pb2_grpc.add_JinaRPCServicer_to_server( + self.gateway_bff._streamer, self.server + ) jina_pb2_grpc.add_JinaGatewayDryRunRPCServicer_to_server(self, self.server) jina_pb2_grpc.add_JinaInfoRPCServicer_to_server(self, self.server) @@ -109,7 +111,7 @@ async def _async_setup_server(self): ) self.server.add_secure_port(bind_addr, server_credentials) elif ( - self.args.ssl_keyfile != self.args.ssl_certfile + self.args.ssl_keyfile != self.args.ssl_certfile ): # if we have only ssl_keyfile and not ssl_certfile or vice versa raise ValueError( f"you can't pass a ssl_keyfile without a ssl_certfile and vice versa" @@ -144,6 +146,7 @@ async def dry_run(self, empty, context) -> jina_pb2.StatusProto: :returns: the response request """ from docarray import DocumentArray + from jina.clients.request import request_generator from jina.enums import DataInputType from jina.serve.executors import __dry_run_endpoint__ diff --git a/jina/serve/runtimes/head/__init__.py b/jina/serve/runtimes/head/__init__.py index e44a6052fe618..8912c40db393e 100644 --- a/jina/serve/runtimes/head/__init__.py +++ b/jina/serve/runtimes/head/__init__.py @@ -18,6 +18,7 @@ from jina.proto import jina_pb2, jina_pb2_grpc from jina.serve.networking import GrpcConnectionPool from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime +from jina.serve.runtimes.helper import _get_grpc_server_options from jina.serve.runtimes.request_handlers.data_request_handler import DataRequestHandler from jina.types.request.data import DataRequest, Response @@ -142,10 +143,7 @@ def _default_polling_dict(self, default_polling): async def async_setup(self): """Wait for the GRPC server to start""" self._grpc_server = grpc.aio.server( - options=[ - ('grpc.max_send_message_length', -1), - ('grpc.max_receive_message_length', -1), - ] + options=_get_grpc_server_options(self.args.grpc_server_options) ) jina_pb2_grpc.add_JinaSingleDataRequestRPCServicer_to_server( diff --git a/jina/serve/runtimes/helper.py b/jina/serve/runtimes/helper.py index 7a4a39235cd05..1aefad75f2e14 100644 --- a/jina/serve/runtimes/helper.py +++ b/jina/serve/runtimes/helper.py @@ -1,7 +1,5 @@ import copy -from typing import Dict, Tuple - -from jina.serve.runtimes.request_handlers.data_request_handler import DataRequestHandler +from typing import Any, Dict, List, Tuple _SPECIFIC_EXECUTOR_SEPARATOR = '__' @@ -77,3 +75,37 @@ def _parse_specific_params(parameters: Dict, executor_name: str): parsed_params.update(**specific_parameters) return parsed_params + + +_DEFAULT_GRPC_OPTION = { + 'grpc.max_send_message_length': -1, + 'grpc.max_receive_message_length': -1, + # for the following see this blog post for the choice of default value https://cs.mcgill.ca/~mxia3/2019/02/23/Using-gRPC-in-Production/ + 'grpc.keepalive_time_ms': 10000, + # send keepalive ping every 10 second, default is 2 hours. + 'grpc.keepalive_timeout_ms': 5000, + # keepalive ping time out after 5 seconds, default is 20 seconds + 'grpc.keepalive_permit_without_calls': True, + # allow keepalive pings when there's no gRPC calls + 'grpc.http2.max_pings_without_data': 0, + # allow unlimited amount of keepalive pings without data + 'grpc.http2.min_time_between_pings_ms': 10000, + # allow grpc pings from client every 10 seconds + 'grpc.http2.min_ping_interval_without_data_ms': 5000, + # allow grpc pings from client without data every 5 seconds +} + + +def _get_grpc_server_options(option_from_args: Dict) -> List[Tuple[str, Any]]: + """transform dict of args into grpc option, will merge the args wit the default args + :param option_from_args: a dict of argument + :return: grpc option i.e a list of tuple of key value + """ + + option_from_args = ( + {**_DEFAULT_GRPC_OPTION, **option_from_args} + if option_from_args + else _DEFAULT_GRPC_OPTION + ) # merge new and default args + + return list(option_from_args.items()) diff --git a/jina/serve/runtimes/worker/__init__.py b/jina/serve/runtimes/worker/__init__.py index 51fa86997d3ca..024f680a836c8 100644 --- a/jina/serve/runtimes/worker/__init__.py +++ b/jina/serve/runtimes/worker/__init__.py @@ -11,6 +11,7 @@ from jina.importer import ImportExtensions from jina.proto import jina_pb2, jina_pb2_grpc from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime +from jina.serve.runtimes.helper import _get_grpc_server_options from jina.serve.runtimes.request_handlers.data_request_handler import DataRequestHandler from jina.types.request.data import DataRequest @@ -87,10 +88,7 @@ async def _async_setup_grpc_server(self): """ self._grpc_server = grpc.aio.server( - options=[ - ('grpc.max_send_message_length', -1), - ('grpc.max_receive_message_length', -1), - ] + options=_get_grpc_server_options(self.args.grpc_server_options) ) jina_pb2_grpc.add_JinaSingleDataRequestRPCServicer_to_server( diff --git a/jina_cli/autocomplete.py b/jina_cli/autocomplete.py index ee903addc0291..f9283ad149b32 100644 --- a/jina_cli/autocomplete.py +++ b/jina_cli/autocomplete.py @@ -40,6 +40,7 @@ '--host-in', '--native', '--output-array-type', + '--grpc-server-options', '--entrypoint', '--docker-kwargs', '--volumes', @@ -112,6 +113,7 @@ '--host-in', '--native', '--output-array-type', + '--grpc-server-options', '--prefetch', '--title', '--description', @@ -120,7 +122,6 @@ '--no-crud-endpoints', '--expose-endpoints', '--uvicorn-kwargs', - '--grpc-server-kwargs', '--ssl-certfile', '--ssl-keyfile', '--expose-graphql-endpoint', @@ -231,6 +232,7 @@ '--host-in', '--native', '--output-array-type', + '--grpc-server-options', '--entrypoint', '--docker-kwargs', '--volumes', @@ -283,6 +285,7 @@ '--host-in', '--native', '--output-array-type', + '--grpc-server-options', '--entrypoint', '--docker-kwargs', '--volumes', diff --git a/tests/integration/clients_extra_kwargs/test_clients_post_extra_kwargs.py b/tests/integration/clients_extra_kwargs/test_clients_post_extra_kwargs.py index 8e325371081df..21032a8814a5d 100644 --- a/tests/integration/clients_extra_kwargs/test_clients_post_extra_kwargs.py +++ b/tests/integration/clients_extra_kwargs/test_clients_post_extra_kwargs.py @@ -8,6 +8,7 @@ from jina.excepts import PortAlreadyUsed from jina.helper import is_port_free from jina.serve.runtimes.gateway.grpc import GRPCGatewayRuntime as _GRPCGatewayRuntime +from jina.serve.runtimes.helper import _get_grpc_server_options from tests import random_docs @@ -52,13 +53,9 @@ async def async_setup(self): self.server = grpc.aio.server( interceptors=(AuthInterceptor('access_key'),), - options=[ - ('grpc.max_send_message_length', -1), - ('grpc.max_receive_message_length', -1), - ], + options=_get_grpc_server_options(self.args.grpc_server_options), ) - await self._async_setup_server() monkeypatch.setattr( diff --git a/tests/integration/gateway_clients/test_long_flow_keepalive.py b/tests/integration/gateway_clients/test_long_flow_keepalive.py new file mode 100644 index 0000000000000..3b7ba0af4c4a1 --- /dev/null +++ b/tests/integration/gateway_clients/test_long_flow_keepalive.py @@ -0,0 +1,29 @@ +import time + +import pytest +from docarray import DocumentArray + +from jina import Executor, Flow, requests + + +@pytest.fixture() +def slow_executor() -> Executor: + class MySlowExec(Executor): + @requests + def slow(self, docs, **kwargs): + time.sleep(30) + for doc_ in docs: + doc_.text = 'process' + + return MySlowExec + + +@pytest.mark.slow +def test_long_flow_keep_alive(slow_executor): + # it tests that the connection to a flow that take a lot of time to process will not be killed by the keepalive feature + + with Flow().add(uses=slow_executor) as f: + docs = f.search(inputs=DocumentArray.empty(10)) + + for doc_ in docs: + assert doc_.text == 'process' diff --git a/tests/unit/test_gateway.py b/tests/unit/test_gateway.py index 68db05b30444e..39da60ecfa26c 100644 --- a/tests/unit/test_gateway.py +++ b/tests/unit/test_gateway.py @@ -63,3 +63,10 @@ def _request(status_codes, durations, index): # requests. rate = failed / success assert rate < 0.1 + + +def test_grpc_custom_otpions(): + + f = Flow(grpc_server_options={'grpc.max_send_message_length': -1}) + with f: + pass