Skip to content

Commit

Permalink
feat: add ports property
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobowitz committed Mar 31, 2022
1 parent 75802dc commit b083160
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 21 deletions.
26 changes: 26 additions & 0 deletions jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,32 @@ def port(self):
"""
return self.first_pod_args.port

@property
def ports(self) -> List[int]:
"""Returns a list of ports exposed by this Deployment.
Exposed means these are the ports a Client/Gateway is supposed to communicate with.
For sharded deployments this will be the head_port.
For non sharded deployments it will be all replica ports
.. # noqa: DAR201
"""
if self.head_port:
return [self.head_port]
else:
ports = []
for replica in self.pod_args['pods'][0]:
ports.append(replica.port)
return ports

@property
def dockerized_uses(self) -> bool:
"""Checks if this Deployment uses a dockerized Executor
.. # noqa: DAR201
"""
return self.args.uses.startswith('docker://') or self.args.uses.startswith(
'jinahub+docker://'
)

def _parse_args(
self, args: Namespace
) -> Dict[str, Optional[Union[List[Namespace], Namespace]]]:
Expand Down
22 changes: 5 additions & 17 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(
cors: Optional[bool] = False,
default_swagger_ui: Optional[bool] = False,
deployments_addresses: Optional[str] = '{}',
deployments_disable_reduce: Optional[str] = '[]',
description: Optional[str] = None,
disable_reduce: Optional[bool] = False,
env: Optional[dict] = None,
Expand Down Expand Up @@ -490,23 +491,10 @@ def _get_deployments_addresses(self) -> Dict[str, List[str]]:
graph_dict[node] = [f'{v.protocol}://{v.host}:{v.head_port}']
else:
# there is no head, add the worker connection information instead
worker_addresses = []
graph_dict[node] = worker_addresses
# iterate over all replica args of the first shard
# we can assume safely here that there is only a single shard, because it's the reason there is no head
for replica in v.pod_args['pods'][0]:
host = replica.host
# check if both Gateway and the Deployment run in docker
if (
host_is_local(host)
and in_docker()
and (
v.args.uses.startswith('docker://')
or v.args.uses.startswith('jinahub+docker://')
)
):
host = __docker_host__
worker_addresses.append(f'{v.protocol}://{host}:{replica.port}')
host = v.host
if host_is_local(host) and in_docker() and v.dockerized_uses:
host = __docker_host__
graph_dict[node] = [f'{v.protocol}://{host}:{port}' for port in v.ports]

return graph_dict

Expand Down
6 changes: 2 additions & 4 deletions tests/integration/deployments/test_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ async def test_deployments_replicas(port_generator):
)
deployment.start()

ports = [args.port for args in deployment.pod_args['pods'][0]]
connections = [f'0.0.0.0:{port}' for port in ports]
connections = [f'0.0.0.0:{port}' for port in deployment.ports]
deployments_addresses = f'{{"deployment0": {json.dumps(connections)}}}'
gateway_deployment = _create_gateway_deployment(
graph_description, deployments_addresses, port
Expand Down Expand Up @@ -253,8 +252,7 @@ async def test_deployments_with_replicas_advance_faster(port_generator):
)
deployment.start()

ports = [args.port for args in deployment.pod_args['pods'][0]]
connections = [f'0.0.0.0:{port}' for port in ports]
connections = [f'0.0.0.0:{port}' for port in deployment.ports]
deployments_addresses = f'{{"deployment0": {json.dumps(connections)}}}'

gateway_deployment = _create_gateway_deployment(
Expand Down

0 comments on commit b083160

Please sign in to comment.