Skip to content

Commit

Permalink
docs: docstrings for pods and peas (#1867)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Feb 4, 2021
1 parent 4e218e8 commit 1b8bd6e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 24 deletions.
8 changes: 4 additions & 4 deletions jina/flow/builder.py
Expand Up @@ -155,7 +155,7 @@ def _optimize_two_connections(flow: 'Flow', start_node_name: str, end_node_name:
if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and end_node.is_head_router:
flow.logger.info(
f'Node {end_node_name} connects to tail of {start_node_name}')
end_node.connect_to_tail_of(start_node)
end_node.optimize_connect_to_tail_of(start_node)
elif end_node.role == PodRoleType.GATEWAY:
# TODO: this part of the code is never executed given the current optimization level. Never tested.
if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and \
Expand All @@ -164,16 +164,16 @@ def _optimize_two_connections(flow: 'Flow', start_node_name: str, end_node_name:
# as gateway can not block & reduce message
flow.logger.info(
f'Node {start_node_name} connects to head of {end_node_name}')
start_node.connect_to_head_of(end_node)
start_node.optimize_connect_to_head_of(end_node)
else:
if end_node.is_head_router and not start_node.is_tail_router:
flow.logger.info(
f'Node {end_node_name} connects to tail of {start_node_name}')
end_node.connect_to_tail_of(start_node)
end_node.optimize_connect_to_tail_of(start_node)
elif start_node.is_tail_router and start_node.tail_args.num_part <= 1:
flow.logger.info(
f'Node {start_node_name} connects to head of {end_node_name}')
start_node.connect_to_head_of(end_node)
start_node.optimize_connect_to_head_of(end_node)

if op_flow.args.optimize_level > FlowOptimizeLevel.NONE:
return _traverse_graph(op_flow, outgoing_map, _optimize_two_connections)
Expand Down
6 changes: 5 additions & 1 deletion jina/peapods/peas/__init__.py
Expand Up @@ -125,7 +125,11 @@ def start(self):
return self

def close(self) -> None:
# wait 1s for the process/thread to end naturally, in this case no "cancel" is required this is required for
""" Close the Pea
This method makes sure that the `Process/thread` is properly finished and its resources properly released
"""
# wait 0.1s for the process/thread to end naturally, in this case no "cancel" is required this is required for
# the is case where in subprocess, runtime.setup() fails and _finally() is not yet executed, BUT close() in the
# main process is calling runtime.cancel(), which is completely unnecessary as runtime.run_forever() is not
# started yet.
Expand Down
49 changes: 30 additions & 19 deletions jina/peapods/pods/__init__.py
Expand Up @@ -43,6 +43,7 @@ def __init__(self, args: Union['argparse.Namespace', Dict], needs: Set[str] = No

@property
def role(self) -> 'PodRoleType':
"""Return the role of this :class:`BasePod`."""
return self.args.pod_role

@property
Expand Down Expand Up @@ -162,7 +163,7 @@ def tail_args(self):

@tail_args.setter
def tail_args(self, args):
"""Get the arguments for the `tail` of this BasePod. """
"""Set the arguments for the `tail` of this BasePod. """
if self.is_tail_router and self.peas_args['tail']:
self.peas_args['tail'] = args
elif not self.is_tail_router and len(self.peas_args['peas']) == 1:
Expand All @@ -188,14 +189,12 @@ def __eq__(self, other: 'BasePod'):
return self.num_peas == other.num_peas and self.name == other.name

def start(self) -> 'BasePod':
"""Start to run all Peas in this BasePod.
Remember to close the BasePod with :meth:`close`.
"""Start to run all :class:`BasePea` in this BasePod.
Note that this method has a timeout of ``timeout_ready`` set in CLI,
which is inherited from :class:`jina.peapods.peas.BasePea`
.. note::
If one of the :class:`BasePea` fails to start, make sure that all of them
are properly closed.
"""
# start head and tail
try:
for _args in self.all_args:
self._enter_pea(BasePea(_args))
Expand Down Expand Up @@ -233,40 +232,52 @@ def _set_conditional_args(args):
else:
args.runtime_cls = 'GRPCRuntime'

def connect_to_tail_of(self, pod: 'BasePod'):
"""Eliminate the head node by connecting prev_args node directly to peas """
def optimize_connect_to_tail_of(self, incoming_pod: 'BasePod'):
"""Removes the `head` arguments to make sure that the Peas are connected directly to the
`tail` of the incoming pod.
:param incoming_pod: :class:`BasePod` that connects its tail to this :class:`BasePod` head
"""
if self.args.parallel > 1 and self.is_head_router:
# keep the port_in and socket_in of prev_args
# only reset its output
pod.tail_args = _copy_to_head_args(pod.tail_args, self.args.polling.is_push, as_router=False)
incoming_pod.tail_args = _copy_to_head_args(incoming_pod.tail_args, self.args.polling.is_push,
as_router=False)
# update peas to receive from it
self.peas_args['peas'] = _set_peas_args(self.args, pod.tail_args, self.tail_args)
self.peas_args['peas'] = _set_peas_args(self.args, incoming_pod.tail_args, self.tail_args)
# remove the head node
self.peas_args['head'] = None
# head is no longer a router anymore
self.is_head_router = False
self.deducted_head = pod.tail_args
self.deducted_head = incoming_pod.tail_args
else:
raise ValueError('the current pod has no head router, deducting the head is confusing')

def connect_to_head_of(self, pod: 'BasePod'):
"""Eliminate the tail node by connecting next_args node directly to peas """
def optimize_connect_to_head_of(self, outgoing_pod: 'BasePod'):
"""Removes the `tail` arguments to make sure that the Peas are connected directly to the
`head` of the outgoing pod.
:param outgoing_pod: :class:`BasePod` that this :class:`BasePod` tries to send data to
"""
if self.args.parallel > 1 and self.is_tail_router:
# keep the port_out and socket_out of next_arts
# only reset its input
pod.head_args = _copy_to_tail_args(pod.head_args,
as_router=False)
outgoing_pod.head_args = _copy_to_tail_args(outgoing_pod.head_args,
as_router=False)
# update peas to receive from it
self.peas_args['peas'] = _set_peas_args(self.args, self.head_args, pod.head_args)
self.peas_args['peas'] = _set_peas_args(self.args, self.head_args, outgoing_pod.head_args)
# remove the tail node
self.peas_args['tail'] = None
# tail is no longer a router anymore
self.is_tail_router = False
self.deducted_tail = pod.head_args
self.deducted_tail = outgoing_pod.head_args
else:
raise ValueError('the current pod has no tail router, deducting the tail is confusing')

@property
def is_ready(self) -> bool:
"""A Pod is ready when all the Peas it contains are ready"""
"""Checks if Pod is read.
.. note::
A Pod is ready when all the Peas it contains are ready
"""
return all(p.is_ready.is_set() for p in self.peas)

0 comments on commit 1b8bd6e

Please sign in to comment.