Skip to content

Commit

Permalink
feat: k8s custom port expose (#3449)
Browse files Browse the repository at this point in the history
* feat: k8s custom port expose

* style: fix overload and cli autocomplete

* feat: k8s custom port expose

* style: fix overload and cli autocomplete

* feat: k8s custom port expose

Co-authored-by: Jina Dev Bot <dev-bot@jina.ai>
  • Loading branch information
florian-hoenicke and jina-bot committed Sep 21, 2021
1 parent 951ffc4 commit b8f2641
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 44 deletions.
6 changes: 5 additions & 1 deletion cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,12 @@
'--pea-id',
'--pea-role',
'--noblock-on-start',
'--k8s-uses-init',
'--k8s-mount-path',
'--k8s-init-container-command',
'--k8s-namespace',
'--k8s-custom-resource-dir',
'--dynamic-routing',
'--connect-to-predecessor',
],
'hub new': ['--help'],
'hub push': [
Expand Down
3 changes: 1 addition & 2 deletions jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ def __init__(
compress: Optional[str] = 'NONE',
compress_min_bytes: Optional[int] = 1024,
compress_min_ratio: Optional[float] = 1.1,
connect_to_predecessor: Optional[bool] = False,
cors: Optional[bool] = False,
ctrl_with_ipc: Optional[bool] = True,
daemon: Optional[bool] = False,
Expand Down Expand Up @@ -175,7 +174,6 @@ def __init__(
it depends on the settings of `--compress-min-bytes` and `compress-min-ratio`
:param compress_min_bytes: The original message size must be larger than this number to trigger the compress algorithm, -1 means disable compression.
:param compress_min_ratio: The compression ratio (uncompressed_size/compressed_size) must be higher than this number to trigger the compress algorithm.
:param connect_to_predecessor: The head Pea of this Pod will connect to the TailPea of the predecessor Pod.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param ctrl_with_ipc: If set, use ipc protocol for control socket
:param daemon: The Pea attempts to terminate all of its Runtime child processes/threads on existing. setting it to true basically tell the Pea do not wait on the Runtime when closing
Expand Down Expand Up @@ -432,6 +430,7 @@ def _add_gateway(self, needs, **kwargs):
args = ArgNamespace.kwargs2namespace(kwargs, set_gateway_parser())

args.k8s_namespace = self.args.name
args.connect_to_predecessor = False
self._pod_nodes[GATEWAY_NAME] = PodFactory.build_pod(
args, needs, self.args.infrastructure
)
Expand Down
12 changes: 3 additions & 9 deletions jina/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from jina.parsers.client import mixin_comm_protocol_parser
from .helper import _SHOW_ALL_ARGS
from .peapods.pod import mixin_k8s_pod_parser


def set_pea_parser(parser=None):
Expand Down Expand Up @@ -50,7 +49,7 @@ def set_pod_parser(parser=None):

set_pea_parser(parser)

from .peapods.pod import mixin_base_pod_parser
from .peapods.pod import mixin_base_pod_parser, mixin_k8s_pod_parser

mixin_base_pod_parser(parser)
mixin_k8s_pod_parser(parser)
Expand Down Expand Up @@ -78,6 +77,7 @@ def set_gateway_parser(parser=None):
mixin_http_gateway_parser,
mixin_compressor_parser,
)
from .peapods.pod import mixin_base_pod_parser, mixin_k8s_pod_parser
from .peapods.pea import mixin_pea_parser

mixin_base_ppr_parser(parser)
Expand All @@ -89,6 +89,7 @@ def set_gateway_parser(parser=None):
mixin_comm_protocol_parser(parser)
mixin_gateway_parser(parser)
mixin_pea_parser(parser)
mixin_k8s_pod_parser(parser)

from ..enums import SocketType, PodRoleType

Expand All @@ -110,13 +111,6 @@ def set_gateway_parser(parser=None):
else argparse.SUPPRESS,
)

parser.add_argument(
'--connect-to-predecessor',
action='store_true',
default=False,
help='The head Pea of this Pod will connect to the TailPea of the predecessor Pod.',
)

return parser


Expand Down
2 changes: 1 addition & 1 deletion jina/peapods/pods/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _deploy_gateway(self):
logger=JinaLogger(f'deploy_{self.name}'),
replicas=1,
pull_policy='Always',
init_container=None,
port_expose=self.args.port_expose,
)

def _deploy_runtime(self, deployment_args, replicas, deployment_id):
Expand Down
10 changes: 6 additions & 4 deletions jina/peapods/pods/k8slib/kubernetes_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def deploy_service(
pull_policy: str,
init_container: Dict = None,
custom_resource_dir: Optional[str] = None,
port_expose: Optional[int] = None,
) -> str:
"""Deploy service on Kubernetes.
Expand All @@ -42,11 +43,14 @@ def deploy_service(
:param init_container: additional arguments used for the init container
:param custom_resource_dir: Path to a folder containing the kubernetes yml template files.
Defaults to the standard location jina.resources if not specified.
:param port_expose: port which will be exposed by the deployed containers
:return: dns name of the created service
"""

# small hack - we can always assume the ports are the same for all executors since they run on different k8s pods
port_expose = 8080
# we can always assume the ports are the same for all executors since they run on different k8s pods
# port expose can be defined by the user
if not port_expose:
port_expose = 8080
port_in = 8081
port_out = 8082
port_ctrl = 8083
Expand Down Expand Up @@ -127,7 +131,6 @@ def get_cli_params(arguments: Namespace, skip_list: Tuple[str] = ()) -> str:
'port_in',
'port_out',
'port_ctrl',
'port_expose',
'k8s_init_container_command',
'k8s_uses_init',
'k8s_mount_path',
Expand All @@ -148,7 +151,6 @@ def get_cli_params(arguments: Namespace, skip_list: Tuple[str] = ()) -> str:
value = value.replace('\'', '').replace('"', '\\"')
cli_args.append(f'"--{cli_attribute}", "{value}"')

cli_args.append('"--port-expose", "8080"')
cli_args.append('"--port-in", "8081"')
cli_args.append('"--port-out", "8082"')
cli_args.append('"--port-ctrl", "8083"')
Expand Down
20 changes: 14 additions & 6 deletions tests/k8s/test_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@pytest.fixture()
def k8s_flow_with_needs(test_executor_image: str, executor_merger_image: str) -> Flow:
flow = (
Flow(name='test-flow', port_expose=8080, infrastructure='K8S', protocol='http')
Flow(name='test-flow', port_expose=9090, infrastructure='K8S', protocol='http')
.add(
name='segmenter',
uses=test_executor_image,
Expand Down Expand Up @@ -57,22 +57,27 @@ def pull_images(images, cluster, logger):
logger.debug(f'Done loading docker image into kind cluster...')


def run_test(images, cluster, flow, logger, expected_running_pods, endpoint):
def run_test(
images, cluster, flow, logger, expected_running_pods, endpoint, port_expose
):
pull_images(images, cluster, logger)
start_flow(expected_running_pods, cluster, flow, logger)
resp = send_dummy_request(endpoint, cluster, flow, logger)
resp = send_dummy_request(endpoint, cluster, flow, logger, port_expose=port_expose)
return resp


def send_dummy_request(endpoint, k8s_cluster_namespaced, k8s_flow_with_needs, logger):
def send_dummy_request(
endpoint, k8s_cluster_namespaced, k8s_flow_with_needs, logger, port_expose
):
logger.debug(f'Starting port-forwarding to gateway service...')
with k8s_cluster_namespaced.port_forward(
'service/gateway', 8080, 8080, k8s_flow_with_needs.args.name
'service/gateway', port_expose, port_expose, k8s_flow_with_needs.args.name
) as _:
logger.debug(f'Port-forward running...')

resp = requests.post(
f'http://localhost:8080/{endpoint}', json={'data': [{} for _ in range(10)]}
f'http://localhost:{port_expose}/{endpoint}',
json={'data': [{} for _ in range(10)]},
)
return resp

Expand Down Expand Up @@ -149,6 +154,7 @@ def test_flow_with_needs(
logger,
expected_running_pods=7,
endpoint='index',
port_expose=9090,
)

expected_traversed_executors = {
Expand Down Expand Up @@ -181,6 +187,7 @@ def test_flow_with_init(
logger,
expected_running_pods=2,
endpoint='search',
port_expose=8080,
)

assert resp.status_code == HTTPStatus.OK
Expand All @@ -206,6 +213,7 @@ def test_flow_with_sharding(
logger,
expected_running_pods=9,
endpoint='index',
port_expose=8080,
)

expected_traversed_executors = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_deploy_service(init_container: Dict, custom_resource: str, monkeypatch)
)
def test_get_cli_params(namespace: Dict, skip_attr: Tuple, expected_string: str):
base_string = (
', "--host", "0.0.0.0", "--port-expose", "8080", "--port-in",'
', "--host", "0.0.0.0", "--port-in",'
' "8081", "--port-out", "8082", "--port-ctrl", "8083"'
)
namespace = Namespace(**namespace)
Expand Down
52 changes: 32 additions & 20 deletions tests/unit/peapods/pods/test_k8s_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import jina
from jina.helper import Namespace
from jina.parsers import set_pod_parser
from jina.parsers import set_pod_parser, set_gateway_parser
from jina.peapods.pods.k8s import K8sPod
from jina.peapods.pods.k8slib import kubernetes_tools, kubernetes_deployment
from jina.peapods.pods.k8slib.kubernetes_deployment import dictionary_to_cli_param
Expand Down Expand Up @@ -157,21 +157,32 @@ def get_k8s_pod(
needs: Optional[Set[str]] = None,
uses_before=None,
uses_after=None,
port_expose=None,
):
if parallel is None:
parallel = '1'
if replicas is None:
replicas = '1'
parameter_list = [
'--name',
pod_name,
'--k8s-namespace',
namespace,
'--parallel',
parallel,
'--replicas',
replicas,
]

parameter_list = ['--name', pod_name, '--k8s-namespace', namespace]
if parallel:
parameter_list.extend(
[
'--parallel',
str(parallel),
]
)
if replicas:
parameter_list.extend(
[
'--replicas',
str(replicas),
]
)

if port_expose:
parameter_list.extend(
[
'--port-expose',
str(port_expose),
]
)
if uses_before:
parameter_list.extend(
[
Expand All @@ -181,22 +192,23 @@ def get_k8s_pod(
)
if uses_after:
parameter_list.extend(['--uses-after', uses_after])
args = set_pod_parser().parse_args(parameter_list)
parser = set_gateway_parser() if pod_name == 'gateway' else set_pod_parser()
args = parser.parse_args(parameter_list)
pod = K8sPod(args, needs)
return pod


def test_start_creates_namespace():
ns = 'test'
pod = get_k8s_pod('gateway', ns)
pod._deploy_gateway = Mock()
pod = get_k8s_pod('gateway', ns, port_expose=8085)
kubernetes_deployment.deploy_service = Mock()
kubernetes_tools.create = Mock()

pod.start()

kubernetes_tools.create.assert_called_once()
assert kubernetes_tools.create.call_args[0][0] == 'namespace'
assert kubernetes_tools.create.call_args[0][1] == {'name': ns}
assert kubernetes_deployment.deploy_service.call_args[0][0] == 'gateway'
assert kubernetes_deployment.deploy_service.call_args[1]['port_expose'] == 8085


def test_start_deploys_gateway():
Expand Down

0 comments on commit b8f2641

Please sign in to comment.