diff --git a/jina/__init__.py b/jina/__init__.py index ceb3c42b23603..33ba9658191ef 100644 --- a/jina/__init__.py +++ b/jina/__init__.py @@ -119,6 +119,7 @@ def _warning_on_one_line(message, category, filename, lineno, *args, **kwargs): __default_http_gateway__ = 'HTTPGateway' __default_websocket_gateway__ = 'WebSocketGateway' __default_grpc_gateway__ = 'GRPCGateway' +__default_composite_gateway__ = 'CompositeGateway' __default_endpoint__ = '/default' __ready_msg__ = 'ready and listening' __stop_msg__ = 'terminated' diff --git a/jina/orchestrate/deployments/config/docker_compose.py b/jina/orchestrate/deployments/config/docker_compose.py index 7d70d67c95d4e..ec655b781aa37 100644 --- a/jina/orchestrate/deployments/config/docker_compose.py +++ b/jina/orchestrate/deployments/config/docker_compose.py @@ -4,6 +4,7 @@ from typing import Dict, List, Optional, Tuple, Union from jina import ( + __default_composite_gateway__, __default_executor__, __default_grpc_gateway__, __default_http_gateway__, @@ -80,6 +81,7 @@ def get_gateway_config( __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: cargs.uses = 'config.yml' @@ -92,9 +94,7 @@ def get_gateway_config( protocol = str(non_defaults.get('protocol', ['grpc'])[0]).lower() - ports = cargs.port + ( - [cargs.port_monitoring] if cargs.monitoring else [] - ) + ports = cargs.port + ([cargs.port_monitoring] if cargs.monitoring else []) envs = [f'JINA_LOG_LEVEL={os.getenv("JINA_LOG_LEVEL", "INFO")}'] if cargs.env: @@ -125,6 +125,7 @@ def _get_image_name(self, uses: Optional[str]): __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: image_name = get_image_name(uses) diff --git a/jina/orchestrate/deployments/config/helper.py b/jina/orchestrate/deployments/config/helper.py index e99d5bf171c6c..10b5d91bd444d 100644 --- a/jina/orchestrate/deployments/config/helper.py +++ b/jina/orchestrate/deployments/config/helper.py @@ -5,6 +5,7 @@ from hubble.executor.hubio import HubIO from jina import ( + __default_composite_gateway__, __default_executor__, __default_grpc_gateway__, __default_http_gateway__, @@ -132,6 +133,7 @@ def validate_uses(uses: str): __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, __default_executor__, ] or uses.startswith('docker://') diff --git a/jina/orchestrate/deployments/config/k8s.py b/jina/orchestrate/deployments/config/k8s.py index da5764689f05d..caa816a4f6085 100644 --- a/jina/orchestrate/deployments/config/k8s.py +++ b/jina/orchestrate/deployments/config/k8s.py @@ -3,6 +3,7 @@ from typing import Dict, List, Optional, Tuple, Union from jina import ( + __default_composite_gateway__, __default_executor__, __default_grpc_gateway__, __default_http_gateway__, @@ -78,6 +79,7 @@ def get_gateway_yamls( __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: cargs.uses = 'config.yml' @@ -114,6 +116,7 @@ def _get_image_name(self, uses: Optional[str]): __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: image_name = get_image_name(uses) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index 5d49d6b10d422..0d16ff5fce925 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -1224,18 +1224,18 @@ async def test_flow_with_custom_gateway(logger, docker_images, tmpdir): [['multiprotocol-gateway']], indirect=True, ) -async def test_flow_multiple_protocols_gateway( +async def test_flow_multiple_protocols_custom_gateway( logger, docker_images, tmpdir, k8s_cluster ): from kubernetes import client - namespace = 'flow-multiprotocol-gateway'.lower() api_client = client.ApiClient() core_client = client.CoreV1Api(api_client=api_client) app_client = client.AppsV1Api(api_client=api_client) try: http_port = random_port() grpc_port = random_port() + flow = Flow().config_gateway( uses=f'docker://{docker_images[0]}', port=[http_port, grpc_port], @@ -1276,8 +1276,83 @@ async def test_flow_multiple_protocols_gateway( import requests resp = requests.get(f'http://localhost:{http_port}').json() - assert resp['protocol'] == 'http' + assert resp == {'protocol': 'http'} + + # test portforwarding the gateway pod and service using grpc + forward_args = [ + [gateway_pod_name, grpc_port, grpc_port, namespace], + ['service/gateway-1-grpc', grpc_port, grpc_port, namespace], + ] + for forward in forward_args: + with shell_portforward(k8s_cluster._cluster.kubectl_path, *forward): + grpc_client = Client(protocol='grpc', port=grpc_port, asyncio=True) + async for _ in grpc_client.post('/', inputs=DocumentArray.empty(5)): + pass + assert AsyncNewLoopRuntime.is_ready(f'localhost:{grpc_port}') + except Exception as exc: + logger.error(f' Exception raised {exc}') + raise exc + + +@pytest.mark.asyncio +@pytest.mark.timeout(3600) +@pytest.mark.parametrize( + 'docker_images', + [['multiprotocol-gateway']], + indirect=True, +) +async def test_flow_multiple_protocols_built_in( + logger, docker_images, tmpdir, k8s_cluster +): + from kubernetes import client + + api_client = client.ApiClient() + core_client = client.CoreV1Api(api_client=api_client) + app_client = client.AppsV1Api(api_client=api_client) + try: + http_port = random_port() + grpc_port = random_port() + + flow = Flow().config_gateway( + port=[http_port, grpc_port], + protocol=['http', 'grpc'], + ) + + dump_path = os.path.join(str(tmpdir), 'k8s-flow-multiprotocol-gateway-builtin') + namespace = 'flow-multiprotocol-gateway-builtin' + flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) + + await create_all_flow_deployments_and_wait_ready( + dump_path, + namespace=namespace, + api_client=api_client, + app_client=app_client, + core_client=core_client, + deployment_replicas_expected={ + 'gateway': 1, + }, + logger=logger, + ) + gateway_pod_name = ( + core_client.list_namespaced_pod( + namespace=namespace, label_selector='app=gateway' + ) + .items[0] + .metadata.name + ) + + # test portforwarding the gateway pod and service using http + forward_args = [ + [gateway_pod_name, http_port, http_port, namespace], + ['service/gateway', http_port, http_port, namespace], + ] + for forward in forward_args: + with shell_portforward(k8s_cluster._cluster.kubectl_path, *forward): + import requests + + resp = requests.get(f'http://localhost:{http_port}').json() + assert resp == {} # test portforwarding the gateway pod and service using grpc forward_args = [ [gateway_pod_name, grpc_port, grpc_port, namespace], diff --git a/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py b/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py index 43706e5085215..26cc488528665 100644 --- a/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py +++ b/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py @@ -367,15 +367,25 @@ def test_worker_services(name: str, shards: str): @pytest.mark.parametrize('deployments_addresses', [None, {'1': 'executor-head:8081'}]) +@pytest.mark.parametrize( + 'port,protocol', + [ + (['12345'], None), + (['12345'], ['grpc']), + (['12345', '12344'], ['grpc', 'http']), + (['12345', '12344', '12343'], ['grpc', 'http', 'websocket']), + ], +) @pytest.mark.parametrize('custom_gateway', ['jinaai/jina:custom-gateway', None]) -def test_docker_compose_gateway(deployments_addresses, custom_gateway): +def test_docker_compose_gateway(deployments_addresses, custom_gateway, port, protocol): if custom_gateway: os.environ['JINA_GATEWAY_IMAGE'] = custom_gateway elif 'JINA_GATEWAY_IMAGE' in os.environ: del os.environ['JINA_GATEWAY_IMAGE'] - args = set_gateway_parser().parse_args( - ['--env', 'ENV_VAR:ENV_VALUE', '--port', '32465'] - ) # envs are + args_list = ['--env', 'ENV_VAR:ENV_VALUE', '--port', *port] + if protocol: + args_list.extend(['--protocol', *protocol]) + args = set_gateway_parser().parse_args(args_list) # envs are # ignored for gateway deployment_config = DockerComposeConfig( args, deployments_addresses=deployments_addresses @@ -388,12 +398,15 @@ def test_docker_compose_gateway(deployments_addresses, custom_gateway): else f'jinaai/jina:{deployment_config.worker_services[0].version}-py38-standard' ) assert gateway_config['entrypoint'] == ['jina'] - assert gateway_config['ports'] == [f'{args.port[0]}:{args.port[0]}'] - assert gateway_config['expose'] == [args.port[0]] + assert gateway_config['ports'] == [f'{_port}:{_port}' for _port in args.port] + assert gateway_config['expose'] == args.port args = gateway_config['command'] assert args[0] == 'gateway' assert '--port' in args - assert args[args.index('--port') + 1] == '32465' + + for i, _port in enumerate(port): + assert args[args.index('--port') + i + 1] == _port + assert '--env' not in args if deployments_addresses is not None: assert '--deployments-addresses' in args diff --git a/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py b/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py index a801b87b3dbbc..6740da78ccde9 100644 --- a/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py +++ b/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py @@ -276,15 +276,25 @@ def assert_config_map_config( @pytest.mark.parametrize('deployments_addresses', [None, {'1': 'address.svc'}]) +@pytest.mark.parametrize( + 'port,protocol', + [ + (['12345'], None), + (['12345'], ['grpc']), + (['12345', '12344'], ['grpc', 'http']), + (['12345', '12344', '12343'], ['grpc', 'http', 'websocket']), + ], +) @pytest.mark.parametrize('custom_gateway', ['jinaai/jina:custom-gateway', None]) -def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): +def test_k8s_yaml_gateway(deployments_addresses, custom_gateway, port, protocol): if custom_gateway: os.environ['JINA_GATEWAY_IMAGE'] = custom_gateway elif 'JINA_GATEWAY_IMAGE' in os.environ: del os.environ['JINA_GATEWAY_IMAGE'] - args = set_gateway_parser().parse_args( - ['--env', 'ENV_VAR:ENV_VALUE', '--port', '32465'] - ) # envs are + args_list = ['--env', 'ENV_VAR:ENV_VALUE', '--port', *port] + if protocol: + args_list.extend(['--protocol', *protocol]) + args = set_gateway_parser().parse_args(args_list) # envs are # ignored for gateway deployment_config = K8sDeploymentConfig( args, 'default-namespace', deployments_addresses @@ -293,7 +303,7 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): assert len(yaml_configs) == 1 name, configs = yaml_configs[0] assert name == 'gateway' - assert len(configs) == 3 # 3 configs per yaml (configmap, service and deployment) + assert len(configs) == 2 + len(port) # configmap, deployment and 1 service per port config_map = configs[0] assert_config_map_config( config_map, @@ -306,24 +316,31 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): }, ) - service = configs[1] - assert service['kind'] == 'Service' - assert service['metadata'] == { - 'name': 'gateway', - 'namespace': 'default-namespace', - 'labels': {'app': 'gateway'}, - } - spec_service = service['spec'] - assert spec_service['type'] == 'ClusterIP' - assert len(spec_service['ports']) == 1 - port = spec_service['ports'][0] - assert port['name'] == 'port' - assert port['protocol'] == 'TCP' - assert port['port'] == 32465 - assert port['targetPort'] == 32465 - assert spec_service['selector'] == {'app': 'gateway'} - - deployment = configs[2] + for i, (expected_port, service) in enumerate(zip(port, configs[1 : 1 + len(port)])): + assert service['kind'] == 'Service' + service_gateway_name = ( + 'gateway' + if i == 0 + else f'gateway-{i}-{protocol[i] if protocol else "grpc"}' + ) + assert service['metadata'] == { + 'name': service_gateway_name, + 'namespace': 'default-namespace', + 'labels': {'app': service_gateway_name}, + } + spec_service = service['spec'] + assert spec_service['type'] == 'ClusterIP' + assert len(spec_service['ports']) == 1 + actual_port = spec_service['ports'][0] + assert actual_port['name'] == 'port' + assert actual_port['protocol'] == 'TCP' + assert actual_port['port'] == int(expected_port) + assert actual_port['targetPort'] == int(expected_port) + assert spec_service['selector'] == {'app': 'gateway'} + + assert spec_service['selector'] == {'app': 'gateway'} + + deployment = configs[1 + len(port)] assert deployment['kind'] == 'Deployment' assert deployment['metadata'] == { 'name': 'gateway', @@ -364,7 +381,8 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): assert '--k8s-namespace' in args assert args[args.index('--k8s-namespace') + 1] == 'default-namespace' assert '--port' in args - assert args[args.index('--port') + 1] == '32465' + for i, _port in enumerate(port): + assert args[args.index('--port') + i + 1] == _port assert '--env' not in args if deployments_addresses is not None: assert '--deployments-addresses' in args