diff --git a/jina/flow/builder.py b/jina/flow/builder.py index b255320b457a0..b816abed6d59d 100644 --- a/jina/flow/builder.py +++ b/jina/flow/builder.py @@ -155,7 +155,7 @@ def _optimize_two_connections(flow: 'Flow', start_node_name: str, end_node_name: if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and end_node.is_head_router: flow.logger.info( f'Node {end_node_name} connects to tail of {start_node_name}') - end_node.connect_to_tail_of(start_node) + end_node.optimize_connect_to_tail_of(start_node) elif end_node.role == PodRoleType.GATEWAY: # TODO: this part of the code is never executed given the current optimization level. Never tested. if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and \ @@ -164,16 +164,16 @@ def _optimize_two_connections(flow: 'Flow', start_node_name: str, end_node_name: # as gateway can not block & reduce message flow.logger.info( f'Node {start_node_name} connects to head of {end_node_name}') - start_node.connect_to_head_of(end_node) + start_node.optimize_connect_to_head_of(end_node) else: if end_node.is_head_router and not start_node.is_tail_router: flow.logger.info( f'Node {end_node_name} connects to tail of {start_node_name}') - end_node.connect_to_tail_of(start_node) + end_node.optimize_connect_to_tail_of(start_node) elif start_node.is_tail_router and start_node.tail_args.num_part <= 1: flow.logger.info( f'Node {start_node_name} connects to head of {end_node_name}') - start_node.connect_to_head_of(end_node) + start_node.optimize_connect_to_head_of(end_node) if op_flow.args.optimize_level > FlowOptimizeLevel.NONE: return _traverse_graph(op_flow, outgoing_map, _optimize_two_connections) diff --git a/jina/peapods/peas/__init__.py b/jina/peapods/peas/__init__.py index ddfe0d0f5df93..d402fd389e958 100644 --- a/jina/peapods/peas/__init__.py +++ b/jina/peapods/peas/__init__.py @@ -125,7 +125,11 @@ def start(self): return self def close(self) -> None: - # wait 1s for the process/thread to end naturally, in this case no "cancel" is required this is required for + """ Close the Pea + + This method makes sure that the `Process/thread` is properly finished and its resources properly released + """ + # wait 0.1s for the process/thread to end naturally, in this case no "cancel" is required this is required for # the is case where in subprocess, runtime.setup() fails and _finally() is not yet executed, BUT close() in the # main process is calling runtime.cancel(), which is completely unnecessary as runtime.run_forever() is not # started yet. diff --git a/jina/peapods/pods/__init__.py b/jina/peapods/pods/__init__.py index db960cde72b37..165b946e32173 100644 --- a/jina/peapods/pods/__init__.py +++ b/jina/peapods/pods/__init__.py @@ -43,6 +43,7 @@ def __init__(self, args: Union['argparse.Namespace', Dict], needs: Set[str] = No @property def role(self) -> 'PodRoleType': + """Return the role of this :class:`BasePod`.""" return self.args.pod_role @property @@ -162,7 +163,7 @@ def tail_args(self): @tail_args.setter def tail_args(self, args): - """Get the arguments for the `tail` of this BasePod. """ + """Set the arguments for the `tail` of this BasePod. """ if self.is_tail_router and self.peas_args['tail']: self.peas_args['tail'] = args elif not self.is_tail_router and len(self.peas_args['peas']) == 1: @@ -188,14 +189,12 @@ def __eq__(self, other: 'BasePod'): return self.num_peas == other.num_peas and self.name == other.name def start(self) -> 'BasePod': - """Start to run all Peas in this BasePod. - - Remember to close the BasePod with :meth:`close`. + """Start to run all :class:`BasePea` in this BasePod. - Note that this method has a timeout of ``timeout_ready`` set in CLI, - which is inherited from :class:`jina.peapods.peas.BasePea` + .. note:: + If one of the :class:`BasePea` fails to start, make sure that all of them + are properly closed. """ - # start head and tail try: for _args in self.all_args: self._enter_pea(BasePea(_args)) @@ -233,40 +232,52 @@ def _set_conditional_args(args): else: args.runtime_cls = 'GRPCRuntime' - def connect_to_tail_of(self, pod: 'BasePod'): - """Eliminate the head node by connecting prev_args node directly to peas """ + def optimize_connect_to_tail_of(self, incoming_pod: 'BasePod'): + """Removes the `head` arguments to make sure that the Peas are connected directly to the + `tail` of the incoming pod. + + :param incoming_pod: :class:`BasePod` that connects its tail to this :class:`BasePod` head + """ if self.args.parallel > 1 and self.is_head_router: # keep the port_in and socket_in of prev_args # only reset its output - pod.tail_args = _copy_to_head_args(pod.tail_args, self.args.polling.is_push, as_router=False) + incoming_pod.tail_args = _copy_to_head_args(incoming_pod.tail_args, self.args.polling.is_push, + as_router=False) # update peas to receive from it - self.peas_args['peas'] = _set_peas_args(self.args, pod.tail_args, self.tail_args) + self.peas_args['peas'] = _set_peas_args(self.args, incoming_pod.tail_args, self.tail_args) # remove the head node self.peas_args['head'] = None # head is no longer a router anymore self.is_head_router = False - self.deducted_head = pod.tail_args + self.deducted_head = incoming_pod.tail_args else: raise ValueError('the current pod has no head router, deducting the head is confusing') - def connect_to_head_of(self, pod: 'BasePod'): - """Eliminate the tail node by connecting next_args node directly to peas """ + def optimize_connect_to_head_of(self, outgoing_pod: 'BasePod'): + """Removes the `tail` arguments to make sure that the Peas are connected directly to the + `head` of the outgoing pod. + + :param outgoing_pod: :class:`BasePod` that this :class:`BasePod` tries to send data to + """ if self.args.parallel > 1 and self.is_tail_router: # keep the port_out and socket_out of next_arts # only reset its input - pod.head_args = _copy_to_tail_args(pod.head_args, - as_router=False) + outgoing_pod.head_args = _copy_to_tail_args(outgoing_pod.head_args, + as_router=False) # update peas to receive from it - self.peas_args['peas'] = _set_peas_args(self.args, self.head_args, pod.head_args) + self.peas_args['peas'] = _set_peas_args(self.args, self.head_args, outgoing_pod.head_args) # remove the tail node self.peas_args['tail'] = None # tail is no longer a router anymore self.is_tail_router = False - self.deducted_tail = pod.head_args + self.deducted_tail = outgoing_pod.head_args else: raise ValueError('the current pod has no tail router, deducting the tail is confusing') @property def is_ready(self) -> bool: - """A Pod is ready when all the Peas it contains are ready""" + """Checks if Pod is read. + .. note:: + A Pod is ready when all the Peas it contains are ready + """ return all(p.is_ready.is_set() for p in self.peas)