Skip to content

Commit

Permalink
feat: expose grpc parameters and add production ready keepalive param…
Browse files Browse the repository at this point in the history
…eters (#5092)

* fix: add production ready keepalive grpc parameters

* feat: add test for long live flow

* feat: add grpc server options to argument

* style: fix overload and cli autocomplete

* feat: update description

* feat: add test for grpc server options

* fix: move parsing from gateway to worker

* style: fix overload and cli autocomplete

* feat: merge default values and new ones for grpc option

* refactor: cleaner dict to list unpacking

* refactor: reduce the time in test to wait

* docs: add example in docstring for grpc server options

* refactor: move the get grpc option from jina helper to runtime helper

* style: fix overload and cli autocomplete

* fix: fix typo

* style: fix overload and cli autocomplete

Co-authored-by: Jina Dev Bot <dev-bot@jina.ai>
  • Loading branch information
samsja and jina-bot committed Sep 1, 2022
1 parent 587f626 commit e794c06
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 54 deletions.
1 change: 1 addition & 0 deletions docs/fundamentals/flow/executor-args.md
Expand Up @@ -16,6 +16,7 @@
| `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` |
| `native` | If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. | `boolean` | `False` |
| `output_array_type` | The type of array `tensor` and `embedding` will be serialized to.<br><br>Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found <br>`here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>`.<br>Defaults to retaining whatever type is returned by the Executor. | `string` | `None` |
| `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` |
| `entrypoint` | The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective. | `string` | `None` |
| `docker_kwargs` | Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '<br>container. <br><br>More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/ | `object` | `None` |
| `volumes` | The path on the host to be mounted inside the container. <br><br>Note, <br>- If separated by `:`, then the first part will be considered as the local host path and the second part is the path in the container system. <br>- If no split provided, then the basename of that directory will be mounted into container's root path, e.g. `--volumes="/user/test/my-workspace"` will be mounted into `/my-workspace` inside the container. <br>- All volumes are mounted with read-write mode. | `array` | `None` |
Expand Down
2 changes: 1 addition & 1 deletion docs/fundamentals/flow/gateway-args.md
Expand Up @@ -16,6 +16,7 @@
| `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` |
| `native` | If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. | `boolean` | `False` |
| `output_array_type` | The type of array `tensor` and `embedding` will be serialized to.<br><br>Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found <br>`here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>`.<br>Defaults to retaining whatever type is returned by the Executor. | `string` | `None` |
| `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` |
| `prefetch` | Number of requests fetched from the client before feeding into the first Executor. <br> <br> Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default) | `number` | `1000` |
| `title` | The title of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` |
| `description` | The description of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` |
Expand All @@ -24,7 +25,6 @@
| `no_crud_endpoints` | If set, `/index`, `/search`, `/update`, `/delete` endpoints are removed from HTTP interface.<br><br> Any executor that has `@requests(on=...)` bind with those values will receive data requests. | `boolean` | `False` |
| `expose_endpoints` | A JSON string that represents a map from executor endpoints (`@requests(on=...)`) to HTTP endpoints. | `string` | `None` |
| `uvicorn_kwargs` | Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server<br><br>More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/ | `object` | `None` |
| `grpc_server_kwargs` | Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update | `object` | `None` |
| `ssl_certfile` | the path to the certificate file | `string` | `None` |
| `ssl_keyfile` | the path to the key file | `string` | `None` |
| `expose_graphql_endpoint` | If set, /graphql endpoint is added to HTTP interface. | `boolean` | `False` |
Expand Down
1 change: 0 additions & 1 deletion jina/helper.py
Expand Up @@ -17,7 +17,6 @@
from collections.abc import MutableMapping
from datetime import datetime
from itertools import islice
from pprint import pprint
from socket import AF_INET, SOCK_STREAM, socket
from types import SimpleNamespace
from typing import (
Expand Down
9 changes: 6 additions & 3 deletions jina/orchestrate/flow/base.py
Expand Up @@ -153,7 +153,7 @@ def __init__(
floating: Optional[bool] = False,
graph_conditions: Optional[str] = '{}',
graph_description: Optional[str] = '{}',
grpc_server_kwargs: Optional[dict] = None,
grpc_server_options: Optional[dict] = None,
host: Optional[str] = '0.0.0.0',
host_in: Optional[str] = '0.0.0.0',
log_config: Optional[str] = None,
Expand Down Expand Up @@ -203,7 +203,7 @@ def __init__(
:param floating: If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one.
:param graph_conditions: Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.
:param graph_description: Routing graph for the gateway
:param grpc_server_kwargs: Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update
:param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1}
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for binding to, by default it is 0.0.0.0
:param log_config: The YAML config of the logger used in this object.
Expand Down Expand Up @@ -376,7 +376,7 @@ def __init__(
:param floating: If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one.
:param graph_conditions: Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.
:param graph_description: Routing graph for the gateway
:param grpc_server_kwargs: Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update
:param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1}
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for binding to, by default it is 0.0.0.0
:param log_config: The YAML config of the logger used in this object.
Expand Down Expand Up @@ -793,6 +793,7 @@ def add(
floating: Optional[bool] = False,
force_update: Optional[bool] = False,
gpus: Optional[str] = None,
grpc_server_options: Optional[dict] = None,
host: Optional[str] = '0.0.0.0',
host_in: Optional[str] = '0.0.0.0',
install_requirements: Optional[bool] = False,
Expand Down Expand Up @@ -853,6 +854,7 @@ def add(
- To access specified gpus based on device id, use `--gpus device=[YOUR-GPU-DEVICE-ID]`
- To access specified gpus based on multiple device id, use `--gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2]`
- To specify more parameters, use `--gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display
:param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1}
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for binding to, by default it is 0.0.0.0
:param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local
Expand Down Expand Up @@ -999,6 +1001,7 @@ def add(
- To access specified gpus based on device id, use `--gpus device=[YOUR-GPU-DEVICE-ID]`
- To access specified gpus based on multiple device id, use `--gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2]`
- To specify more parameters, use `--gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display
:param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1}
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for binding to, by default it is 0.0.0.0
:param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local
Expand Down
10 changes: 0 additions & 10 deletions jina/parsers/orchestrate/runtimes/remote.py
Expand Up @@ -197,16 +197,6 @@ def mixin_http_gateway_parser(parser=None):
''',
)

gp.add_argument(
'--grpc-server-kwargs',
action=KVAppendAction,
metavar='KEY: VALUE',
nargs='*',
help='''
Dictionary of kwargs arguments that will be passed to the grpc server when starting the server # todo update
''',
)

gp.add_argument(
'--ssl-certfile',
type=str,
Expand Down
9 changes: 9 additions & 0 deletions jina/parsers/orchestrate/runtimes/worker.py
Expand Up @@ -103,3 +103,12 @@ def mixin_worker_runtime_parser(parser):
Defaults to retaining whatever type is returned by the Executor.
''',
)

gp.add_argument(
'--grpc-server-options',
action=KVAppendAction,
metavar='KEY: VALUE',
nargs='*',
help="Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1}",
default=None,
)
47 changes: 25 additions & 22 deletions jina/serve/runtimes/gateway/grpc/__init__.py
Expand Up @@ -9,8 +9,9 @@
from jina.excepts import PortAlreadyUsed
from jina.helper import get_full_version, is_port_free
from jina.proto import jina_pb2, jina_pb2_grpc
from jina.serve.runtimes.gateway import GatewayRuntime
from jina.serve.bff import GatewayBFF
from jina.serve.runtimes.gateway import GatewayRuntime
from jina.serve.runtimes.helper import _get_grpc_server_options
from jina.types.request.status import StatusMessage

__all__ = ['GRPCGatewayRuntime']
Expand All @@ -20,9 +21,9 @@ class GRPCGatewayRuntime(GatewayRuntime):
"""Gateway Runtime for gRPC."""

def __init__(
self,
args: argparse.Namespace,
**kwargs,
self,
args: argparse.Namespace,
**kwargs,
):
"""Initialize the runtime
:param args: args from CLI
Expand All @@ -45,10 +46,7 @@ async def async_setup(self):
raise PortAlreadyUsed(f'port:{self.args.port}')

self.server = grpc.aio.server(
options=[
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
]
options=_get_grpc_server_options(self.args.grpc_server_options)
)

await self._async_setup_server()
Expand All @@ -62,19 +60,23 @@ async def _async_setup_server(self):
deployments_addresses = json.loads(self.args.deployments_addresses)
deployments_disable_reduce = json.loads(self.args.deployments_disable_reduce)

self.gateway_bff = GatewayBFF(graph_representation=graph_description,
executor_addresses=deployments_addresses,
graph_conditions=graph_conditions,
deployments_disable_reduce=deployments_disable_reduce,
timeout_send=self.timeout_send,
retries=self.args.retries,
compression=self.args.compression,
runtime_name=self.name,
prefetch=self.args.prefetch,
logger=self.logger,
metrics_registry=self.metrics_registry)

jina_pb2_grpc.add_JinaRPCServicer_to_server(self.gateway_bff._streamer, self.server)
self.gateway_bff = GatewayBFF(
graph_representation=graph_description,
executor_addresses=deployments_addresses,
graph_conditions=graph_conditions,
deployments_disable_reduce=deployments_disable_reduce,
timeout_send=self.timeout_send,
retries=self.args.retries,
compression=self.args.compression,
runtime_name=self.name,
prefetch=self.args.prefetch,
logger=self.logger,
metrics_registry=self.metrics_registry,
)

jina_pb2_grpc.add_JinaRPCServicer_to_server(
self.gateway_bff._streamer, self.server
)
jina_pb2_grpc.add_JinaGatewayDryRunRPCServicer_to_server(self, self.server)
jina_pb2_grpc.add_JinaInfoRPCServicer_to_server(self, self.server)

Expand Down Expand Up @@ -109,7 +111,7 @@ async def _async_setup_server(self):
)
self.server.add_secure_port(bind_addr, server_credentials)
elif (
self.args.ssl_keyfile != self.args.ssl_certfile
self.args.ssl_keyfile != self.args.ssl_certfile
): # if we have only ssl_keyfile and not ssl_certfile or vice versa
raise ValueError(
f"you can't pass a ssl_keyfile without a ssl_certfile and vice versa"
Expand Down Expand Up @@ -144,6 +146,7 @@ async def dry_run(self, empty, context) -> jina_pb2.StatusProto:
:returns: the response request
"""
from docarray import DocumentArray

from jina.clients.request import request_generator
from jina.enums import DataInputType
from jina.serve.executors import __dry_run_endpoint__
Expand Down
6 changes: 2 additions & 4 deletions jina/serve/runtimes/head/__init__.py
Expand Up @@ -18,6 +18,7 @@
from jina.proto import jina_pb2, jina_pb2_grpc
from jina.serve.networking import GrpcConnectionPool
from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime
from jina.serve.runtimes.helper import _get_grpc_server_options
from jina.serve.runtimes.request_handlers.data_request_handler import DataRequestHandler
from jina.types.request.data import DataRequest, Response

Expand Down Expand Up @@ -142,10 +143,7 @@ def _default_polling_dict(self, default_polling):
async def async_setup(self):
"""Wait for the GRPC server to start"""
self._grpc_server = grpc.aio.server(
options=[
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
]
options=_get_grpc_server_options(self.args.grpc_server_options)
)

jina_pb2_grpc.add_JinaSingleDataRequestRPCServicer_to_server(
Expand Down
38 changes: 35 additions & 3 deletions jina/serve/runtimes/helper.py
@@ -1,7 +1,5 @@
import copy
from typing import Dict, Tuple

from jina.serve.runtimes.request_handlers.data_request_handler import DataRequestHandler
from typing import Any, Dict, List, Tuple

_SPECIFIC_EXECUTOR_SEPARATOR = '__'

Expand Down Expand Up @@ -77,3 +75,37 @@ def _parse_specific_params(parameters: Dict, executor_name: str):
parsed_params.update(**specific_parameters)

return parsed_params


_DEFAULT_GRPC_OPTION = {
'grpc.max_send_message_length': -1,
'grpc.max_receive_message_length': -1,
# for the following see this blog post for the choice of default value https://cs.mcgill.ca/~mxia3/2019/02/23/Using-gRPC-in-Production/
'grpc.keepalive_time_ms': 10000,
# send keepalive ping every 10 second, default is 2 hours.
'grpc.keepalive_timeout_ms': 5000,
# keepalive ping time out after 5 seconds, default is 20 seconds
'grpc.keepalive_permit_without_calls': True,
# allow keepalive pings when there's no gRPC calls
'grpc.http2.max_pings_without_data': 0,
# allow unlimited amount of keepalive pings without data
'grpc.http2.min_time_between_pings_ms': 10000,
# allow grpc pings from client every 10 seconds
'grpc.http2.min_ping_interval_without_data_ms': 5000,
# allow grpc pings from client without data every 5 seconds
}


def _get_grpc_server_options(option_from_args: Dict) -> List[Tuple[str, Any]]:
"""transform dict of args into grpc option, will merge the args wit the default args
:param option_from_args: a dict of argument
:return: grpc option i.e a list of tuple of key value
"""

option_from_args = (
{**_DEFAULT_GRPC_OPTION, **option_from_args}
if option_from_args
else _DEFAULT_GRPC_OPTION
) # merge new and default args

return list(option_from_args.items())
6 changes: 2 additions & 4 deletions jina/serve/runtimes/worker/__init__.py
Expand Up @@ -11,6 +11,7 @@
from jina.importer import ImportExtensions
from jina.proto import jina_pb2, jina_pb2_grpc
from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime
from jina.serve.runtimes.helper import _get_grpc_server_options
from jina.serve.runtimes.request_handlers.data_request_handler import DataRequestHandler
from jina.types.request.data import DataRequest

Expand Down Expand Up @@ -87,10 +88,7 @@ async def _async_setup_grpc_server(self):
"""

self._grpc_server = grpc.aio.server(
options=[
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
]
options=_get_grpc_server_options(self.args.grpc_server_options)
)

jina_pb2_grpc.add_JinaSingleDataRequestRPCServicer_to_server(
Expand Down
5 changes: 4 additions & 1 deletion jina_cli/autocomplete.py
Expand Up @@ -40,6 +40,7 @@
'--host-in',
'--native',
'--output-array-type',
'--grpc-server-options',
'--entrypoint',
'--docker-kwargs',
'--volumes',
Expand Down Expand Up @@ -112,6 +113,7 @@
'--host-in',
'--native',
'--output-array-type',
'--grpc-server-options',
'--prefetch',
'--title',
'--description',
Expand All @@ -120,7 +122,6 @@
'--no-crud-endpoints',
'--expose-endpoints',
'--uvicorn-kwargs',
'--grpc-server-kwargs',
'--ssl-certfile',
'--ssl-keyfile',
'--expose-graphql-endpoint',
Expand Down Expand Up @@ -231,6 +232,7 @@
'--host-in',
'--native',
'--output-array-type',
'--grpc-server-options',
'--entrypoint',
'--docker-kwargs',
'--volumes',
Expand Down Expand Up @@ -283,6 +285,7 @@
'--host-in',
'--native',
'--output-array-type',
'--grpc-server-options',
'--entrypoint',
'--docker-kwargs',
'--volumes',
Expand Down
Expand Up @@ -8,6 +8,7 @@
from jina.excepts import PortAlreadyUsed
from jina.helper import is_port_free
from jina.serve.runtimes.gateway.grpc import GRPCGatewayRuntime as _GRPCGatewayRuntime
from jina.serve.runtimes.helper import _get_grpc_server_options
from tests import random_docs


Expand Down Expand Up @@ -52,13 +53,9 @@ async def async_setup(self):

self.server = grpc.aio.server(
interceptors=(AuthInterceptor('access_key'),),
options=[
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
],
options=_get_grpc_server_options(self.args.grpc_server_options),
)


await self._async_setup_server()

monkeypatch.setattr(
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/gateway_clients/test_long_flow_keepalive.py
@@ -0,0 +1,29 @@
import time

import pytest
from docarray import DocumentArray

from jina import Executor, Flow, requests


@pytest.fixture()
def slow_executor() -> Executor:
class MySlowExec(Executor):
@requests
def slow(self, docs, **kwargs):
time.sleep(30)
for doc_ in docs:
doc_.text = 'process'

return MySlowExec


@pytest.mark.slow
def test_long_flow_keep_alive(slow_executor):
# it tests that the connection to a flow that take a lot of time to process will not be killed by the keepalive feature

with Flow().add(uses=slow_executor) as f:
docs = f.search(inputs=DocumentArray.empty(10))

for doc_ in docs:
assert doc_.text == 'process'
7 changes: 7 additions & 0 deletions tests/unit/test_gateway.py
Expand Up @@ -63,3 +63,10 @@ def _request(status_codes, durations, index):
# requests.
rate = failed / success
assert rate < 0.1


def test_grpc_custom_otpions():

f = Flow(grpc_server_options={'grpc.max_send_message_length': -1})
with f:
pass

0 comments on commit e794c06

Please sign in to comment.