Skip to content

Commit

Permalink
Improve documentation in rclpy.utilities (#1038)
Browse files Browse the repository at this point in the history
Signed-off-by: Shane Loretz <sloretz@osrfoundation.org>

Signed-off-by: Shane Loretz <sloretz@osrfoundation.org>
  • Loading branch information
sloretz committed Nov 14, 2022
1 parent 6acbc3f commit d2901dd
Showing 1 changed file with 36 additions and 15 deletions.
51 changes: 36 additions & 15 deletions rclpy/rclpy/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set

import ament_index_python

Expand All @@ -28,7 +29,7 @@
g_context_lock = threading.Lock()


def get_default_context(*, shutting_down=False):
def get_default_context(*, shutting_down=False) -> Context:
"""Return the global default context singleton."""
global g_context_lock
with g_context_lock:
Expand All @@ -47,7 +48,7 @@ def remove_ros_args(args: Optional[Sequence[str]] = None) -> List[str]:
Return a list of only the non-ROS command line arguments.
:param args: A list of command line arguments to filter from. If None then
``sys.argv`` is used instead.
``sys.argv`` is used instead.
:returns: A list of all command line arguments that are not used by ROS.
"""
# imported locally to avoid loading extensions on module import
Expand All @@ -56,20 +57,38 @@ def remove_ros_args(args: Optional[Sequence[str]] = None) -> List[str]:
args if args is not None else sys.argv)


def ok(*, context=None):
def ok(*, context: Optional[Context] = None) -> bool:
"""
Return ``True`` if the given ``Context`` is initialized and not shut down.
:param context: a ``Context`` to check, else the global default context is
used.
:return: ``True`` if the context is valid.
"""
if context is None:
context = get_default_context()
return context.ok()


def shutdown(*, context=None):
def shutdown(*, context=None) -> None:
"""
Shutdown the given ``Context``.
:param context: a ``Context`` to check, else the global default context is
used.
"""
if context is None:
context = get_default_context(shutting_down=True)
return context.shutdown()
context.shutdown()


def try_shutdown(*, context=None) -> None:
"""
Shutdown the given ``Context`` if not already shutdown.
def try_shutdown(*, context=None):
"""Shutdown rclpy if not already shutdown."""
:param context: a ``Context`` to check, else the global default context is
used.
"""
global g_context_lock
global g_default_context
if context is None:
Expand All @@ -82,24 +101,26 @@ def try_shutdown(*, context=None):
if not g_default_context.ok():
g_default_context = None
else:
return context.try_shutdown()
context.try_shutdown()


def get_rmw_implementation_identifier():
def get_rmw_implementation_identifier() -> str:
"""Return the identifier of the current RMW implementation."""
# imported locally to avoid loading extensions on module import
from rclpy.impl.implementation_singleton import rclpy_implementation
return rclpy_implementation.rclpy_get_rmw_implementation_identifier()


def get_available_rmw_implementations():
def get_available_rmw_implementations() -> Set[str]:
"""
Return the set of all available RMW implementations as registered in the ament index.
The result can be overridden by setting an environment variable named
``RMW_IMPLEMENTATIONS``.
The variable can contain RMW implementation names separated by the platform
specific path separator.
Including an unavailable RMW implementation results in a RuntimeError.
:raises RuntimeError: if the environment variable includes a missing RMW implementation.
"""
available_rmw_implementations = ament_index_python.get_resources(
'rmw_typesupport')
Expand All @@ -114,6 +135,8 @@ def get_available_rmw_implementations():
missing_rmw_implementations = set(rmw_implementations) - \
available_rmw_implementations
if missing_rmw_implementations:
# TODO(sloretz) function name suggets to me it would return available ones even
# if some were missing.
raise RuntimeError(
f'The RMW implementations {missing_rmw_implementations} '
"specified in 'RMW_IMPLEMENTATIONS' are not available (" +
Expand All @@ -125,16 +148,14 @@ def get_available_rmw_implementations():
return available_rmw_implementations


def timeout_sec_to_nsec(timeout_sec):
def timeout_sec_to_nsec(timeout_sec: Optional[float]) -> int:
"""
Convert timeout in seconds to rcl compatible timeout in nanoseconds.
Python tends to use floating point numbers in seconds for timeouts. This utility converts a
python-style timeout to an integer in nanoseconds that can be used by rcl_wait.
python-style timeout to an integer in nanoseconds that can be used by ``rcl_wait``.
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if < 1ns
:type timeout_sec: float or None
:rtype: int
:returns: rcl_wait compatible timeout in nanoseconds
"""
if timeout_sec is None or timeout_sec < 0:
Expand Down

0 comments on commit d2901dd

Please sign in to comment.