Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support composite gateway for k8s export #5532

Merged
merged 8 commits into from Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions jina/__init__.py
Expand Up @@ -119,6 +119,7 @@ def _warning_on_one_line(message, category, filename, lineno, *args, **kwargs):
__default_http_gateway__ = 'HTTPGateway'
__default_websocket_gateway__ = 'WebSocketGateway'
__default_grpc_gateway__ = 'GRPCGateway'
__default_composite_gateway__ = 'CompositeGateway'
__default_endpoint__ = '/default'
__ready_msg__ = 'ready and listening'
__stop_msg__ = 'terminated'
Expand Down
7 changes: 4 additions & 3 deletions jina/orchestrate/deployments/config/docker_compose.py
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, List, Optional, Tuple, Union

from jina import (
__default_composite_gateway__,
__default_executor__,
__default_grpc_gateway__,
__default_http_gateway__,
Expand Down Expand Up @@ -80,6 +81,7 @@ def get_gateway_config(
__default_http_gateway__,
__default_websocket_gateway__,
__default_grpc_gateway__,
__default_composite_gateway__,
]:
cargs.uses = 'config.yml'

Expand All @@ -92,9 +94,7 @@ def get_gateway_config(

protocol = str(non_defaults.get('protocol', ['grpc'])[0]).lower()

ports = cargs.port + (
[cargs.port_monitoring] if cargs.monitoring else []
)
ports = cargs.port + ([cargs.port_monitoring] if cargs.monitoring else [])

envs = [f'JINA_LOG_LEVEL={os.getenv("JINA_LOG_LEVEL", "INFO")}']
if cargs.env:
Expand Down Expand Up @@ -125,6 +125,7 @@ def _get_image_name(self, uses: Optional[str]):
__default_http_gateway__,
__default_websocket_gateway__,
__default_grpc_gateway__,
__default_composite_gateway__,
]:
image_name = get_image_name(uses)

Expand Down
2 changes: 2 additions & 0 deletions jina/orchestrate/deployments/config/helper.py
Expand Up @@ -5,6 +5,7 @@
from hubble.executor.hubio import HubIO

from jina import (
__default_composite_gateway__,
__default_executor__,
__default_grpc_gateway__,
__default_http_gateway__,
Expand Down Expand Up @@ -132,6 +133,7 @@ def validate_uses(uses: str):
__default_http_gateway__,
__default_websocket_gateway__,
__default_grpc_gateway__,
__default_composite_gateway__,
__default_executor__,
]
or uses.startswith('docker://')
Expand Down
3 changes: 3 additions & 0 deletions jina/orchestrate/deployments/config/k8s.py
Expand Up @@ -3,6 +3,7 @@
from typing import Dict, List, Optional, Tuple, Union

from jina import (
__default_composite_gateway__,
__default_executor__,
__default_grpc_gateway__,
__default_http_gateway__,
Expand Down Expand Up @@ -78,6 +79,7 @@ def get_gateway_yamls(
__default_http_gateway__,
__default_websocket_gateway__,
__default_grpc_gateway__,
__default_composite_gateway__,
]:
cargs.uses = 'config.yml'

Expand Down Expand Up @@ -114,6 +116,7 @@ def _get_image_name(self, uses: Optional[str]):
__default_http_gateway__,
__default_websocket_gateway__,
__default_grpc_gateway__,
__default_composite_gateway__,
]:
image_name = get_image_name(uses)

Expand Down
81 changes: 78 additions & 3 deletions tests/k8s/test_k8s.py
Expand Up @@ -1224,18 +1224,18 @@ async def test_flow_with_custom_gateway(logger, docker_images, tmpdir):
[['multiprotocol-gateway']],
indirect=True,
)
async def test_flow_multiple_protocols_gateway(
async def test_flow_multiple_protocols_custom_gateway(
logger, docker_images, tmpdir, k8s_cluster
):
from kubernetes import client

namespace = 'flow-multiprotocol-gateway'.lower()
api_client = client.ApiClient()
core_client = client.CoreV1Api(api_client=api_client)
app_client = client.AppsV1Api(api_client=api_client)
try:
http_port = random_port()
grpc_port = random_port()

flow = Flow().config_gateway(
uses=f'docker://{docker_images[0]}',
port=[http_port, grpc_port],
Expand Down Expand Up @@ -1276,8 +1276,83 @@ async def test_flow_multiple_protocols_gateway(
import requests

resp = requests.get(f'http://localhost:{http_port}').json()
assert resp['protocol'] == 'http'
assert resp == {'protocol': 'http'}

# test portforwarding the gateway pod and service using grpc
forward_args = [
[gateway_pod_name, grpc_port, grpc_port, namespace],
['service/gateway-1-grpc', grpc_port, grpc_port, namespace],
]
for forward in forward_args:
with shell_portforward(k8s_cluster._cluster.kubectl_path, *forward):
grpc_client = Client(protocol='grpc', port=grpc_port, asyncio=True)
async for _ in grpc_client.post('/', inputs=DocumentArray.empty(5)):
pass
assert AsyncNewLoopRuntime.is_ready(f'localhost:{grpc_port}')
except Exception as exc:
logger.error(f' Exception raised {exc}')
raise exc


@pytest.mark.asyncio
@pytest.mark.timeout(3600)
@pytest.mark.parametrize(
'docker_images',
[['multiprotocol-gateway']],
indirect=True,
)
async def test_flow_multiple_protocols_built_in(
logger, docker_images, tmpdir, k8s_cluster
):
from kubernetes import client

api_client = client.ApiClient()
core_client = client.CoreV1Api(api_client=api_client)
app_client = client.AppsV1Api(api_client=api_client)
try:
http_port = random_port()
grpc_port = random_port()

flow = Flow().config_gateway(
port=[http_port, grpc_port],
protocol=['http', 'grpc'],
)

dump_path = os.path.join(str(tmpdir), 'k8s-flow-multiprotocol-gateway-builtin')
namespace = 'flow-multiprotocol-gateway-builtin'
flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace)

await create_all_flow_deployments_and_wait_ready(
dump_path,
namespace=namespace,
api_client=api_client,
app_client=app_client,
core_client=core_client,
deployment_replicas_expected={
'gateway': 1,
},
logger=logger,
)

gateway_pod_name = (
core_client.list_namespaced_pod(
namespace=namespace, label_selector='app=gateway'
)
.items[0]
.metadata.name
)

# test portforwarding the gateway pod and service using http
forward_args = [
[gateway_pod_name, http_port, http_port, namespace],
['service/gateway', http_port, http_port, namespace],
]
for forward in forward_args:
with shell_portforward(k8s_cluster._cluster.kubectl_path, *forward):
import requests

resp = requests.get(f'http://localhost:{http_port}').json()
assert resp == {}
# test portforwarding the gateway pod and service using grpc
forward_args = [
[gateway_pod_name, grpc_port, grpc_port, namespace],
Expand Down
Expand Up @@ -367,15 +367,25 @@ def test_worker_services(name: str, shards: str):


@pytest.mark.parametrize('deployments_addresses', [None, {'1': 'executor-head:8081'}])
@pytest.mark.parametrize(
'port,protocol',
[
(['12345'], None),
(['12345'], ['grpc']),
(['12345', '12344'], ['grpc', 'http']),
(['12345', '12344', '12343'], ['grpc', 'http', 'websocket']),
],
)
@pytest.mark.parametrize('custom_gateway', ['jinaai/jina:custom-gateway', None])
def test_docker_compose_gateway(deployments_addresses, custom_gateway):
def test_docker_compose_gateway(deployments_addresses, custom_gateway, port, protocol):
if custom_gateway:
os.environ['JINA_GATEWAY_IMAGE'] = custom_gateway
elif 'JINA_GATEWAY_IMAGE' in os.environ:
del os.environ['JINA_GATEWAY_IMAGE']
args = set_gateway_parser().parse_args(
['--env', 'ENV_VAR:ENV_VALUE', '--port', '32465']
) # envs are
args_list = ['--env', 'ENV_VAR:ENV_VALUE', '--port', *port]
if protocol:
args_list.extend(['--protocol', *protocol])
args = set_gateway_parser().parse_args(args_list) # envs are
# ignored for gateway
deployment_config = DockerComposeConfig(
args, deployments_addresses=deployments_addresses
Expand All @@ -388,12 +398,15 @@ def test_docker_compose_gateway(deployments_addresses, custom_gateway):
else f'jinaai/jina:{deployment_config.worker_services[0].version}-py38-standard'
)
assert gateway_config['entrypoint'] == ['jina']
assert gateway_config['ports'] == [f'{args.port[0]}:{args.port[0]}']
assert gateway_config['expose'] == [args.port[0]]
assert gateway_config['ports'] == [f'{_port}:{_port}' for _port in args.port]
assert gateway_config['expose'] == args.port
args = gateway_config['command']
assert args[0] == 'gateway'
assert '--port' in args
assert args[args.index('--port') + 1] == '32465'

for i, _port in enumerate(port):
assert args[args.index('--port') + i + 1] == _port

assert '--env' not in args
if deployments_addresses is not None:
assert '--deployments-addresses' in args
Expand Down
Expand Up @@ -276,15 +276,25 @@ def assert_config_map_config(


@pytest.mark.parametrize('deployments_addresses', [None, {'1': 'address.svc'}])
@pytest.mark.parametrize(
'port,protocol',
[
(['12345'], None),
(['12345'], ['grpc']),
(['12345', '12344'], ['grpc', 'http']),
(['12345', '12344', '12343'], ['grpc', 'http', 'websocket']),
],
)
@pytest.mark.parametrize('custom_gateway', ['jinaai/jina:custom-gateway', None])
def test_k8s_yaml_gateway(deployments_addresses, custom_gateway):
def test_k8s_yaml_gateway(deployments_addresses, custom_gateway, port, protocol):
if custom_gateway:
os.environ['JINA_GATEWAY_IMAGE'] = custom_gateway
elif 'JINA_GATEWAY_IMAGE' in os.environ:
del os.environ['JINA_GATEWAY_IMAGE']
args = set_gateway_parser().parse_args(
['--env', 'ENV_VAR:ENV_VALUE', '--port', '32465']
) # envs are
args_list = ['--env', 'ENV_VAR:ENV_VALUE', '--port', *port]
if protocol:
args_list.extend(['--protocol', *protocol])
args = set_gateway_parser().parse_args(args_list) # envs are
# ignored for gateway
deployment_config = K8sDeploymentConfig(
args, 'default-namespace', deployments_addresses
Expand All @@ -293,7 +303,7 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway):
assert len(yaml_configs) == 1
name, configs = yaml_configs[0]
assert name == 'gateway'
assert len(configs) == 3 # 3 configs per yaml (configmap, service and deployment)
assert len(configs) == 2 + len(port) # configmap, deployment and 1 service per port
config_map = configs[0]
assert_config_map_config(
config_map,
Expand All @@ -306,24 +316,31 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway):
},
)

service = configs[1]
assert service['kind'] == 'Service'
assert service['metadata'] == {
'name': 'gateway',
'namespace': 'default-namespace',
'labels': {'app': 'gateway'},
}
spec_service = service['spec']
assert spec_service['type'] == 'ClusterIP'
assert len(spec_service['ports']) == 1
port = spec_service['ports'][0]
assert port['name'] == 'port'
assert port['protocol'] == 'TCP'
assert port['port'] == 32465
assert port['targetPort'] == 32465
assert spec_service['selector'] == {'app': 'gateway'}

deployment = configs[2]
for i, (expected_port, service) in enumerate(zip(port, configs[1 : 1 + len(port)])):
assert service['kind'] == 'Service'
service_gateway_name = (
'gateway'
if i == 0
else f'gateway-{i}-{protocol[i] if protocol else "grpc"}'
)
assert service['metadata'] == {
'name': service_gateway_name,
'namespace': 'default-namespace',
'labels': {'app': service_gateway_name},
}
spec_service = service['spec']
assert spec_service['type'] == 'ClusterIP'
assert len(spec_service['ports']) == 1
actual_port = spec_service['ports'][0]
assert actual_port['name'] == 'port'
assert actual_port['protocol'] == 'TCP'
assert actual_port['port'] == int(expected_port)
assert actual_port['targetPort'] == int(expected_port)
assert spec_service['selector'] == {'app': 'gateway'}

assert spec_service['selector'] == {'app': 'gateway'}

deployment = configs[1 + len(port)]
assert deployment['kind'] == 'Deployment'
assert deployment['metadata'] == {
'name': 'gateway',
Expand Down Expand Up @@ -364,7 +381,8 @@ def test_k8s_yaml_gateway(deployments_addresses, custom_gateway):
assert '--k8s-namespace' in args
assert args[args.index('--k8s-namespace') + 1] == 'default-namespace'
assert '--port' in args
assert args[args.index('--port') + 1] == '32465'
for i, _port in enumerate(port):
assert args[args.index('--port') + i + 1] == _port
assert '--env' not in args
if deployments_addresses is not None:
assert '--deployments-addresses' in args
Expand Down