From ce874672abfd9a64e889fd8b2006556cf29afa0e Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Fri, 16 Dec 2022 09:27:29 +0100 Subject: [PATCH 1/8] fix: support composite gateway for k8s export --- jina/__init__.py | 1 + jina/orchestrate/deployments/config/docker_compose.py | 7 ++++--- jina/orchestrate/deployments/config/helper.py | 2 ++ jina/orchestrate/deployments/config/k8s.py | 3 +++ 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/jina/__init__.py b/jina/__init__.py index ceb3c42b23603..33ba9658191ef 100644 --- a/jina/__init__.py +++ b/jina/__init__.py @@ -119,6 +119,7 @@ def _warning_on_one_line(message, category, filename, lineno, *args, **kwargs): __default_http_gateway__ = 'HTTPGateway' __default_websocket_gateway__ = 'WebSocketGateway' __default_grpc_gateway__ = 'GRPCGateway' +__default_composite_gateway__ = 'CompositeGateway' __default_endpoint__ = '/default' __ready_msg__ = 'ready and listening' __stop_msg__ = 'terminated' diff --git a/jina/orchestrate/deployments/config/docker_compose.py b/jina/orchestrate/deployments/config/docker_compose.py index 7d70d67c95d4e..ec655b781aa37 100644 --- a/jina/orchestrate/deployments/config/docker_compose.py +++ b/jina/orchestrate/deployments/config/docker_compose.py @@ -4,6 +4,7 @@ from typing import Dict, List, Optional, Tuple, Union from jina import ( + __default_composite_gateway__, __default_executor__, __default_grpc_gateway__, __default_http_gateway__, @@ -80,6 +81,7 @@ def get_gateway_config( __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: cargs.uses = 'config.yml' @@ -92,9 +94,7 @@ def get_gateway_config( protocol = str(non_defaults.get('protocol', ['grpc'])[0]).lower() - ports = cargs.port + ( - [cargs.port_monitoring] if cargs.monitoring else [] - ) + ports = cargs.port + ([cargs.port_monitoring] if cargs.monitoring else []) envs = [f'JINA_LOG_LEVEL={os.getenv("JINA_LOG_LEVEL", "INFO")}'] if cargs.env: @@ -125,6 +125,7 @@ def _get_image_name(self, uses: Optional[str]): __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: image_name = get_image_name(uses) diff --git a/jina/orchestrate/deployments/config/helper.py b/jina/orchestrate/deployments/config/helper.py index e99d5bf171c6c..10b5d91bd444d 100644 --- a/jina/orchestrate/deployments/config/helper.py +++ b/jina/orchestrate/deployments/config/helper.py @@ -5,6 +5,7 @@ from hubble.executor.hubio import HubIO from jina import ( + __default_composite_gateway__, __default_executor__, __default_grpc_gateway__, __default_http_gateway__, @@ -132,6 +133,7 @@ def validate_uses(uses: str): __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, __default_executor__, ] or uses.startswith('docker://') diff --git a/jina/orchestrate/deployments/config/k8s.py b/jina/orchestrate/deployments/config/k8s.py index da5764689f05d..caa816a4f6085 100644 --- a/jina/orchestrate/deployments/config/k8s.py +++ b/jina/orchestrate/deployments/config/k8s.py @@ -3,6 +3,7 @@ from typing import Dict, List, Optional, Tuple, Union from jina import ( + __default_composite_gateway__, __default_executor__, __default_grpc_gateway__, __default_http_gateway__, @@ -78,6 +79,7 @@ def get_gateway_yamls( __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: cargs.uses = 'config.yml' @@ -114,6 +116,7 @@ def _get_image_name(self, uses: Optional[str]): __default_http_gateway__, __default_websocket_gateway__, __default_grpc_gateway__, + __default_composite_gateway__, ]: image_name = get_image_name(uses) From dc603bf90eb03513e0bd739edd35e013ed8aa430 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Fri, 16 Dec 2022 10:22:33 +0100 Subject: [PATCH 2/8] test: cover exporting to k8s in tests --- .../config/test_k8s_deployment_config.py | 66 ++++++++++++------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py b/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py index a801b87b3dbbc..6740da78ccde9 100644 --- a/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py +++ b/tests/unit/orchestrate/deployments/config/test_k8s_deployment_config.py @@ -276,15 +276,25 @@ def assert_config_map_config( @pytest.mark.parametrize('deployments_addresses', [None, {'1': 'address.svc'}]) +@pytest.mark.parametrize( + 'port,protocol', + [ + (['12345'], None), + (['12345'], ['grpc']), + (['12345', '12344'], ['grpc', 'http']), + (['12345', '12344', '12343'], ['grpc', 'http', 'websocket']), + ], +) @pytest.mark.parametrize('custom_gateway', ['jinaai/jina:custom-gateway', None]) -def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): +def test_k8s_yaml_gateway(deployments_addresses, custom_gateway, port, protocol): if custom_gateway: os.environ['JINA_GATEWAY_IMAGE'] = custom_gateway elif 'JINA_GATEWAY_IMAGE' in os.environ: del os.environ['JINA_GATEWAY_IMAGE'] - args = set_gateway_parser().parse_args( - ['--env', 'ENV_VAR:ENV_VALUE', '--port', '32465'] - ) # envs are + args_list = ['--env', 'ENV_VAR:ENV_VALUE', '--port', *port] + if protocol: + args_list.extend(['--protocol', *protocol]) + args = set_gateway_parser().parse_args(args_list) # envs are # ignored for gateway deployment_config = K8sDeploymentConfig( args, 'default-namespace', deployments_addresses @@ -293,7 +303,7 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): assert len(yaml_configs) == 1 name, configs = yaml_configs[0] assert name == 'gateway' - assert len(configs) == 3 # 3 configs per yaml (configmap, service and deployment) + assert len(configs) == 2 + len(port) # configmap, deployment and 1 service per port config_map = configs[0] assert_config_map_config( config_map, @@ -306,24 +316,31 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): }, ) - service = configs[1] - assert service['kind'] == 'Service' - assert service['metadata'] == { - 'name': 'gateway', - 'namespace': 'default-namespace', - 'labels': {'app': 'gateway'}, - } - spec_service = service['spec'] - assert spec_service['type'] == 'ClusterIP' - assert len(spec_service['ports']) == 1 - port = spec_service['ports'][0] - assert port['name'] == 'port' - assert port['protocol'] == 'TCP' - assert port['port'] == 32465 - assert port['targetPort'] == 32465 - assert spec_service['selector'] == {'app': 'gateway'} - - deployment = configs[2] + for i, (expected_port, service) in enumerate(zip(port, configs[1 : 1 + len(port)])): + assert service['kind'] == 'Service' + service_gateway_name = ( + 'gateway' + if i == 0 + else f'gateway-{i}-{protocol[i] if protocol else "grpc"}' + ) + assert service['metadata'] == { + 'name': service_gateway_name, + 'namespace': 'default-namespace', + 'labels': {'app': service_gateway_name}, + } + spec_service = service['spec'] + assert spec_service['type'] == 'ClusterIP' + assert len(spec_service['ports']) == 1 + actual_port = spec_service['ports'][0] + assert actual_port['name'] == 'port' + assert actual_port['protocol'] == 'TCP' + assert actual_port['port'] == int(expected_port) + assert actual_port['targetPort'] == int(expected_port) + assert spec_service['selector'] == {'app': 'gateway'} + + assert spec_service['selector'] == {'app': 'gateway'} + + deployment = configs[1 + len(port)] assert deployment['kind'] == 'Deployment' assert deployment['metadata'] == { 'name': 'gateway', @@ -364,7 +381,8 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway): assert '--k8s-namespace' in args assert args[args.index('--k8s-namespace') + 1] == 'default-namespace' assert '--port' in args - assert args[args.index('--port') + 1] == '32465' + for i, _port in enumerate(port): + assert args[args.index('--port') + i + 1] == _port assert '--env' not in args if deployments_addresses is not None: assert '--deployments-addresses' in args From 52f6463e251bd81b5361189869cb284de2b21f6f Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Fri, 16 Dec 2022 10:30:32 +0100 Subject: [PATCH 3/8] test: cover composite gateway for docker compose --- .../config/test_docker_compose_pod_config.py | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py b/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py index 43706e5085215..26cc488528665 100644 --- a/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py +++ b/tests/unit/orchestrate/deployments/config/test_docker_compose_pod_config.py @@ -367,15 +367,25 @@ def test_worker_services(name: str, shards: str): @pytest.mark.parametrize('deployments_addresses', [None, {'1': 'executor-head:8081'}]) +@pytest.mark.parametrize( + 'port,protocol', + [ + (['12345'], None), + (['12345'], ['grpc']), + (['12345', '12344'], ['grpc', 'http']), + (['12345', '12344', '12343'], ['grpc', 'http', 'websocket']), + ], +) @pytest.mark.parametrize('custom_gateway', ['jinaai/jina:custom-gateway', None]) -def test_docker_compose_gateway(deployments_addresses, custom_gateway): +def test_docker_compose_gateway(deployments_addresses, custom_gateway, port, protocol): if custom_gateway: os.environ['JINA_GATEWAY_IMAGE'] = custom_gateway elif 'JINA_GATEWAY_IMAGE' in os.environ: del os.environ['JINA_GATEWAY_IMAGE'] - args = set_gateway_parser().parse_args( - ['--env', 'ENV_VAR:ENV_VALUE', '--port', '32465'] - ) # envs are + args_list = ['--env', 'ENV_VAR:ENV_VALUE', '--port', *port] + if protocol: + args_list.extend(['--protocol', *protocol]) + args = set_gateway_parser().parse_args(args_list) # envs are # ignored for gateway deployment_config = DockerComposeConfig( args, deployments_addresses=deployments_addresses @@ -388,12 +398,15 @@ def test_docker_compose_gateway(deployments_addresses, custom_gateway): else f'jinaai/jina:{deployment_config.worker_services[0].version}-py38-standard' ) assert gateway_config['entrypoint'] == ['jina'] - assert gateway_config['ports'] == [f'{args.port[0]}:{args.port[0]}'] - assert gateway_config['expose'] == [args.port[0]] + assert gateway_config['ports'] == [f'{_port}:{_port}' for _port in args.port] + assert gateway_config['expose'] == args.port args = gateway_config['command'] assert args[0] == 'gateway' assert '--port' in args - assert args[args.index('--port') + 1] == '32465' + + for i, _port in enumerate(port): + assert args[args.index('--port') + i + 1] == _port + assert '--env' not in args if deployments_addresses is not None: assert '--deployments-addresses' in args From 1517368cb8e10fc188603a52c621e86fba2b5350 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Fri, 16 Dec 2022 11:04:56 +0100 Subject: [PATCH 4/8] test: add k8s test to builtin multiprotocol gateway --- tests/k8s/test_k8s.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index 5d49d6b10d422..4f0ee927dadc7 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -1224,23 +1224,29 @@ async def test_flow_with_custom_gateway(logger, docker_images, tmpdir): [['multiprotocol-gateway']], indirect=True, ) +@pytest.mark.parametrize('built_in_gateway', [True, False]) async def test_flow_multiple_protocols_gateway( - logger, docker_images, tmpdir, k8s_cluster + logger, docker_images, built_in_gateway, tmpdir, k8s_cluster ): from kubernetes import client - namespace = 'flow-multiprotocol-gateway'.lower() api_client = client.ApiClient() core_client = client.CoreV1Api(api_client=api_client) app_client = client.AppsV1Api(api_client=api_client) try: http_port = random_port() grpc_port = random_port() - flow = Flow().config_gateway( - uses=f'docker://{docker_images[0]}', - port=[http_port, grpc_port], - protocol=['http', 'grpc'], - ) + if built_in_gateway: + flow = Flow().config_gateway( + port=[http_port, grpc_port], + protocol=['http', 'grpc'], + ) + else: + flow = Flow().config_gateway( + uses=f'docker://{docker_images[0]}', + port=[http_port, grpc_port], + protocol=['http', 'grpc'], + ) dump_path = os.path.join(str(tmpdir), 'k8s-flow-multiprotocol-gateway') namespace = 'flow-multiprotocol-gateway' From 774ba86f6642403f3b0138cfbc3c994004994c9c Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Fri, 16 Dec 2022 13:27:07 +0100 Subject: [PATCH 5/8] test: fix test --- tests/k8s/test_k8s.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index 4f0ee927dadc7..6f060fe0a70e6 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -1282,7 +1282,10 @@ async def test_flow_multiple_protocols_gateway( import requests resp = requests.get(f'http://localhost:{http_port}').json() - assert resp['protocol'] == 'http' + assert resp in [ + {'protocol': 'http'}, + {}, + ] # first one for the custom gateway implementation, the second is for the built-in multiprotocol gateway # test portforwarding the gateway pod and service using grpc forward_args = [ From ee11e91aeb2646d795514c8b5043fc9cca2b273f Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Fri, 16 Dec 2022 15:51:21 +0100 Subject: [PATCH 6/8] test: fix test --- tests/k8s/test_k8s.py | 102 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 18 deletions(-) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index 6f060fe0a70e6..daf76e78d2eea 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -1224,9 +1224,8 @@ async def test_flow_with_custom_gateway(logger, docker_images, tmpdir): [['multiprotocol-gateway']], indirect=True, ) -@pytest.mark.parametrize('built_in_gateway', [True, False]) -async def test_flow_multiple_protocols_gateway( - logger, docker_images, built_in_gateway, tmpdir, k8s_cluster +async def test_flow_multiple_protocols_custom_gateway( + logger, docker_images, tmpdir, k8s_cluster ): from kubernetes import client @@ -1236,17 +1235,12 @@ async def test_flow_multiple_protocols_gateway( try: http_port = random_port() grpc_port = random_port() - if built_in_gateway: - flow = Flow().config_gateway( - port=[http_port, grpc_port], - protocol=['http', 'grpc'], - ) - else: - flow = Flow().config_gateway( - uses=f'docker://{docker_images[0]}', - port=[http_port, grpc_port], - protocol=['http', 'grpc'], - ) + + flow = Flow().config_gateway( + uses=f'docker://{docker_images[0]}', + port=[http_port, grpc_port], + protocol=['http', 'grpc'], + ) dump_path = os.path.join(str(tmpdir), 'k8s-flow-multiprotocol-gateway') namespace = 'flow-multiprotocol-gateway' @@ -1282,11 +1276,83 @@ async def test_flow_multiple_protocols_gateway( import requests resp = requests.get(f'http://localhost:{http_port}').json() - assert resp in [ - {'protocol': 'http'}, - {}, - ] # first one for the custom gateway implementation, the second is for the built-in multiprotocol gateway + assert resp == {'protocol': 'http'} + + # test portforwarding the gateway pod and service using grpc + forward_args = [ + [gateway_pod_name, grpc_port, grpc_port, namespace], + ['service/gateway-1-grpc', grpc_port, grpc_port, namespace], + ] + for forward in forward_args: + with shell_portforward(k8s_cluster._cluster.kubectl_path, *forward): + grpc_client = Client(protocol='grpc', port=grpc_port, asyncio=True) + async for _ in grpc_client.post('/', inputs=DocumentArray.empty(5)): + pass + assert AsyncNewLoopRuntime.is_ready(f'localhost:{grpc_port}') + except Exception as exc: + logger.error(f' Exception raised {exc}') + raise exc + +@pytest.mark.asyncio +@pytest.mark.timeout(3600) +@pytest.mark.parametrize( + 'docker_images', + [['multiprotocol-gateway']], + indirect=True, +) +async def test_flow_multiple_protocols_built_in( + logger, docker_images, tmpdir, k8s_cluster +): + from kubernetes import client + + api_client = client.ApiClient() + core_client = client.CoreV1Api(api_client=api_client) + app_client = client.AppsV1Api(api_client=api_client) + try: + http_port = random_port() + grpc_port = random_port() + + flow = Flow().config_gateway( + port=[http_port, grpc_port], + protocol=['http', 'grpc'], + ) + + dump_path = os.path.join(str(tmpdir), 'k8s-flow-multiprotocol-gateway') + namespace = 'flow-multiprotocol-gateway' + flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) + + await create_all_flow_deployments_and_wait_ready( + dump_path, + namespace=namespace, + api_client=api_client, + app_client=app_client, + core_client=core_client, + deployment_replicas_expected={ + 'gateway': 1, + }, + logger=logger, + ) + + gateway_pod_name = ( + core_client.list_namespaced_pod( + namespace=namespace, label_selector='app=gateway' + ) + .items[0] + .metadata.name + ) + + # test portforwarding the gateway pod and service using http + forward_args = [ + [gateway_pod_name, http_port, http_port, namespace], + ['service/gateway', http_port, http_port, namespace], + ] + for forward in forward_args: + with shell_portforward(k8s_cluster._cluster.kubectl_path, *forward): + import requests + + resp = requests.get(f'http://localhost:{http_port}').json() + assert resp == {} # test portforwarding the gateway pod and service using grpc forward_args = [ [gateway_pod_name, grpc_port, grpc_port, namespace], From 50c9061722c56d8db531ccc3d5b0c3ffb32ec853 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Sun, 18 Dec 2022 14:31:59 +0100 Subject: [PATCH 7/8] chore: add debugging --- tests/k8s/test_k8s.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index daf76e78d2eea..b0e68e5c49e58 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -1318,10 +1318,14 @@ async def test_flow_multiple_protocols_built_in( protocol=['http', 'grpc'], ) - dump_path = os.path.join(str(tmpdir), 'k8s-flow-multiprotocol-gateway') - namespace = 'flow-multiprotocol-gateway' + dump_path = os.path.join(str(tmpdir), 'k8s-flow-multiprotocol-gateway-builtin') + namespace = 'flow-multiprotocol-gateway-builtin' flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) + print('printing k8s yaml...') + with open(os.path.join(dump_path, 'gateway/gateway.yml'), 'r') as f: + print(f.read()) + await create_all_flow_deployments_and_wait_ready( dump_path, namespace=namespace, From 4f6b8fc2bf2b1bb393f22f2cdc4e6594687485bd Mon Sep 17 00:00:00 2001 From: AlaeddineAbdessalem Date: Mon, 19 Dec 2022 00:58:56 +0100 Subject: [PATCH 8/8] chore: remove debugging --- tests/k8s/test_k8s.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index b0e68e5c49e58..0d16ff5fce925 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -1322,10 +1322,6 @@ async def test_flow_multiple_protocols_built_in( namespace = 'flow-multiprotocol-gateway-builtin' flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) - print('printing k8s yaml...') - with open(os.path.join(dump_path, 'gateway/gateway.yml'), 'r') as f: - print(f.read()) - await create_all_flow_deployments_and_wait_ready( dump_path, namespace=namespace,