Skip to content

Commit

Permalink
fix: fix the collision in the same flow
Browse files Browse the repository at this point in the history
  • Loading branch information
nan-wang committed Aug 2, 2020
1 parent 2bcbcec commit c56ed42
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
20 changes: 17 additions & 3 deletions jina/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .. import JINA_GLOBAL
from ..enums import FlowBuildLevel, FlowOptimizeLevel
from ..excepts import FlowTopologyError, FlowMissingPodError, FlowBuildLevelError
from ..helper import yaml, expand_env_var, get_non_defaults_args, deprecated_alias
from ..helper import yaml, expand_env_var, get_non_defaults_args, deprecated_alias, random_port
from ..logging import get_logger
from ..logging.sse import start_sse_logger
from ..peapods.pod import SocketType, FlowPod, GatewayFlowPod
Expand Down Expand Up @@ -171,6 +171,7 @@ def __init__(self, args: 'argparse.Namespace' = None, **kwargs):
self._last_changed_pod = ['gateway'] #: default first pod is gateway, will add when build()

self._update_args(args, **kwargs)
self._ports_in_use = []

def _update_args(self, args, **kwargs):
from ..main.parser import set_flow_parser
Expand Down Expand Up @@ -398,9 +399,8 @@ def add(self,

kwargs.update(op_flow._common_kwargs)
kwargs['name'] = pod_name
# self._check_port_collision(kwargs)
op_flow._ports_in_use += self._check_port_collision(kwargs)
op_flow._pod_nodes[pod_name] = FlowPod(kwargs=kwargs, needs=needs)

op_flow.set_last_pod(pod_name, False)

return op_flow
Expand Down Expand Up @@ -835,3 +835,17 @@ def use_grpc_gateway(self):
def use_rest_gateway(self):
"""Change to use REST gateway for IO """
self._common_kwargs['rest_api'] = True

def _check_port_collision(self, kwargs):
"""Check if the Pods' ports collide"""
for _port_name in ('port_in', 'port_out', 'port_ctrl', 'port_expose'):
_port = kwargs.get(_port_name)
while _port in self._ports_in_use or _port is None:
_new = random_port()
if _port is not None:
self.logger.critical(
f'{_port_name} collision detected. set from {_port} to {_new}')
_port = _new
kwargs[_port_name] = _port
self._ports_in_use.append(_port)
return self._ports_in_use
11 changes: 11 additions & 0 deletions tests/unit/flow/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,17 @@ def input_fn():
with flow:
flow.index(input_fn=input_fn, output_fn=validate)

def test_flow_with_collision(self):
collision_port = 55555

flow = (Flow()
.add(name='p1', uses='_pass', port_out=collision_port)
.add(name='p2', uses='_pass', port_out=collision_port))

self.assertNotEqual(
flow._pod_nodes['p1']._args.port_out,
flow._pod_nodes['p2']._args.port_out)


if __name__ == '__main__':
unittest.main()

0 comments on commit c56ed42

Please sign in to comment.