Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: docstrings for pods and peas #1867

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions jina/flow/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _optimize_two_connections(flow: 'Flow', start_node_name: str, end_node_name:
if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and end_node.is_head_router:
flow.logger.info(
f'Node {end_node_name} connects to tail of {start_node_name}')
end_node.connect_to_tail_of(start_node)
end_node.optimize_connect_to_tail_of(start_node)
elif end_node.role == PodRoleType.GATEWAY:
# TODO: this part of the code is never executed given the current optimization level. Never tested.
if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and \
Expand All @@ -164,16 +164,16 @@ def _optimize_two_connections(flow: 'Flow', start_node_name: str, end_node_name:
# as gateway can not block & reduce message
flow.logger.info(
f'Node {start_node_name} connects to head of {end_node_name}')
start_node.connect_to_head_of(end_node)
start_node.optimize_connect_to_head_of(end_node)
else:
if end_node.is_head_router and not start_node.is_tail_router:
flow.logger.info(
f'Node {end_node_name} connects to tail of {start_node_name}')
end_node.connect_to_tail_of(start_node)
end_node.optimize_connect_to_tail_of(start_node)
elif start_node.is_tail_router and start_node.tail_args.num_part <= 1:
flow.logger.info(
f'Node {start_node_name} connects to head of {end_node_name}')
start_node.connect_to_head_of(end_node)
start_node.optimize_connect_to_head_of(end_node)

if op_flow.args.optimize_level > FlowOptimizeLevel.NONE:
return _traverse_graph(op_flow, outgoing_map, _optimize_two_connections)
Expand Down
6 changes: 5 additions & 1 deletion jina/peapods/peas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ def start(self):
return self

def close(self) -> None:
# wait 1s for the process/thread to end naturally, in this case no "cancel" is required this is required for
""" Close the Pea

This method makes sure that the `Process/thread` is properly finished and its resources properly released
"""
# wait 0.1s for the process/thread to end naturally, in this case no "cancel" is required this is required for
# the is case where in subprocess, runtime.setup() fails and _finally() is not yet executed, BUT close() in the
# main process is calling runtime.cancel(), which is completely unnecessary as runtime.run_forever() is not
# started yet.
Expand Down
49 changes: 30 additions & 19 deletions jina/peapods/pods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, args: Union['argparse.Namespace', Dict], needs: Set[str] = No

@property
def role(self) -> 'PodRoleType':
"""Return the role of this :class:`BasePod`."""
return self.args.pod_role

@property
Expand Down Expand Up @@ -162,7 +163,7 @@ def tail_args(self):

@tail_args.setter
def tail_args(self, args):
"""Get the arguments for the `tail` of this BasePod. """
"""Set the arguments for the `tail` of this BasePod. """
if self.is_tail_router and self.peas_args['tail']:
self.peas_args['tail'] = args
elif not self.is_tail_router and len(self.peas_args['peas']) == 1:
Expand All @@ -188,14 +189,12 @@ def __eq__(self, other: 'BasePod'):
return self.num_peas == other.num_peas and self.name == other.name

def start(self) -> 'BasePod':
"""Start to run all Peas in this BasePod.

Remember to close the BasePod with :meth:`close`.
"""Start to run all :class:`BasePea` in this BasePod.

Note that this method has a timeout of ``timeout_ready`` set in CLI,
which is inherited from :class:`jina.peapods.peas.BasePea`
.. note::
If one of the :class:`BasePea` fails to start, make sure that all of them
are properly closed.
"""
# start head and tail
try:
for _args in self.all_args:
self._enter_pea(BasePea(_args))
Expand Down Expand Up @@ -233,40 +232,52 @@ def _set_conditional_args(args):
else:
args.runtime_cls = 'GRPCRuntime'

def connect_to_tail_of(self, pod: 'BasePod'):
"""Eliminate the head node by connecting prev_args node directly to peas """
def optimize_connect_to_tail_of(self, incoming_pod: 'BasePod'):
"""Removes the `head` arguments to make sure that the Peas are connected directly to the
`tail` of the incoming pod.

:param incoming_pod: :class:`BasePod` that connects its tail to this :class:`BasePod` head
"""
if self.args.parallel > 1 and self.is_head_router:
# keep the port_in and socket_in of prev_args
# only reset its output
pod.tail_args = _copy_to_head_args(pod.tail_args, self.args.polling.is_push, as_router=False)
incoming_pod.tail_args = _copy_to_head_args(incoming_pod.tail_args, self.args.polling.is_push,
as_router=False)
# update peas to receive from it
self.peas_args['peas'] = _set_peas_args(self.args, pod.tail_args, self.tail_args)
self.peas_args['peas'] = _set_peas_args(self.args, incoming_pod.tail_args, self.tail_args)
# remove the head node
self.peas_args['head'] = None
# head is no longer a router anymore
self.is_head_router = False
self.deducted_head = pod.tail_args
self.deducted_head = incoming_pod.tail_args
else:
raise ValueError('the current pod has no head router, deducting the head is confusing')

def connect_to_head_of(self, pod: 'BasePod'):
"""Eliminate the tail node by connecting next_args node directly to peas """
def optimize_connect_to_head_of(self, outgoing_pod: 'BasePod'):
"""Removes the `tail` arguments to make sure that the Peas are connected directly to the
`head` of the outgoing pod.

:param outgoing_pod: :class:`BasePod` that this :class:`BasePod` tries to send data to
"""
if self.args.parallel > 1 and self.is_tail_router:
# keep the port_out and socket_out of next_arts
# only reset its input
pod.head_args = _copy_to_tail_args(pod.head_args,
as_router=False)
outgoing_pod.head_args = _copy_to_tail_args(outgoing_pod.head_args,
as_router=False)
# update peas to receive from it
self.peas_args['peas'] = _set_peas_args(self.args, self.head_args, pod.head_args)
self.peas_args['peas'] = _set_peas_args(self.args, self.head_args, outgoing_pod.head_args)
# remove the tail node
self.peas_args['tail'] = None
# tail is no longer a router anymore
self.is_tail_router = False
self.deducted_tail = pod.head_args
self.deducted_tail = outgoing_pod.head_args
else:
raise ValueError('the current pod has no tail router, deducting the tail is confusing')

@property
def is_ready(self) -> bool:
"""A Pod is ready when all the Peas it contains are ready"""
"""Checks if Pod is read.
.. note::
A Pod is ready when all the Peas it contains are ready
"""
return all(p.is_ready.is_set() for p in self.peas)