Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jsk_topic_tools] Add diagnostic transport to ConnectionBasedTransport #1711

Merged
merged 16 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
78 changes: 73 additions & 5 deletions doc/jsk_topic_tools/class/python_connection_based_transport.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# ConnectionBasedTransport (Python)

**WARNING**

This base-class is being deprecated and replaced by `topic_tools.LazyTransport` in
[topic_tools](http://wiki.ros.org/topic_tools).

## Description

This class is a base-class which can start subscribing topics if published topics are subscribed by the other node.
Expand All @@ -16,6 +11,27 @@ This is abstruct class.

Subscribes topics even if there is no subscribers of advertised topics if `true`.

* `~enable_vital_check` (Bool, default: `true`):

If this value is `true`, `/diagnostics` will be published and the status of whether the topic has been published will be output from this node.

* `~vital_rate` (Double, default: `1.0`):

Rate to determine if the nodelet is in health.
If the rate that the callback functions is below this parameter, error messages are displayed on diagnostics.
This value is valid only if `~enable_vital_check` is `true`.

* `/diagnostic_nodelet/use_warn` or `~use_warn` (Bool, default: `False`):

If this parameter is enabled, diagnostic messages on failure is displayed on `WARN` level instead of `ERROR` level.
`/diagnostic_nodelet/use_warn` affects every nodelets that inherits this class, but it still can be overriden for each nodelet by setting `~use_warn` parameter.

## Publishing Topic

* `/diagnostics` (`diagnostic_msgs.DiagnosticArray`):

Diagnostic messages. Required if `~enable_vital_check:=true`

## How does it behaves?

```bash
Expand Down Expand Up @@ -80,3 +96,55 @@ if __name__ == '__main__':
img_trans = SimpleImageTransport()
rospy.spin()
```

## Checking the node status by diagnostics

You can check diagnostics by setting `~enable_vital_check` to `true`.

```bash
# terminal 1:
$ roslaunch jsk_topic_tools sample_connection_based_transport.launch

# terminal 2:
$ rostopic list
/diagnostics
/diagnostics_agg
/diagnostics_toplevel_state
/input
/input_dummy
/input_image
/mux/selected
/rosout
/rosout_agg
/simple_image_transport/output

# terminal 3:
$ rosrun rqt_robot_monitor rqt_robot_monitor
```

There is no one to subscribe to `/simple_image_transport/output`, so diagnostics is OK.

![](images/diagnostics_no_subscribers.jpg)

```bash
# terminal 4:
rostopic echo /simple_image_transport/output
```

If you subscribe to `/simple_image_transport/output`,
it will change to the diagnostics message `/simple_image_transport is running`.

![](images/diagnostics_is_running.jpg)


```bash
# terminal 5:
rosservice call /mux/select input_dummy
```

When changing input for `simple_image_transport` by mux,
the error message will change to `/simple_image_transport is not running`.

![](images/diagnostics_is_not_running.jpg)

You can check if the node is running correctly like these.
1 change: 1 addition & 0 deletions jsk_topic_tools/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<build_depend>topic_tools</build_depend>

<!-- list exec_depend in alphabetical order -->
<exec_depend>diagnostic_aggregator</exec_depend>
<exec_depend>diagnostic_msgs</exec_depend>
<exec_depend>diagnostic_updater</exec_depend>
<exec_depend>dynamic_reconfigure</exec_depend>
Expand Down
10 changes: 10 additions & 0 deletions jsk_topic_tools/sample/config/diagnostics_analyzer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
analyzers:
cameras:
type: diagnostic_aggregator/AnalyzerGroup
path: Cameras
analyzers:
simple_image_transport:
type: diagnostic_aggregator/GenericAnalyzer
path: Image
find_and_remove_prefix: 'simple_image_transport: '
num_items: 1
27 changes: 27 additions & 0 deletions jsk_topic_tools/sample/sample_connection_based_transport.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<launch>

<node pkg="rostopic" type="rostopic" name="input"
args="pub /input sensor_msgs/Image 'data: [1, 2, 3]' -r 10">
</node>

<node name="input_mux"
pkg="topic_tools" type="mux"
args="input_image input input_dummy" >
</node>

<node name="simple_image_transport"
pkg="jsk_topic_tools" type="simple_image_transport.py"
clear_params="true" >
<rosparam>
enable_vital_check: true
</rosparam>
<remap from="~input" to="input_image" />
</node>

<node name="diagnostic_aggregator"
pkg="diagnostic_aggregator" type="aggregator_node"
clear_params="true" >
<rosparam command="load" file="$(find jsk_topic_tools)/sample/config/diagnostics_analyzer.yaml" />
</node>

</launch>
1 change: 1 addition & 0 deletions jsk_topic_tools/sample/simple_image_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class SimpleImageTransport(ConnectionBasedTransport):
def __init__(self):
super(SimpleImageTransport, self).__init__()
self._pub = self.advertise('~output', Image, queue_size=1)
self._dummy_pub = self.advertise('~dummy_output', Image, queue_size=1)

def subscribe(self):
self.sub_img = rospy.Subscriber('~input', Image, self._process)
Expand Down
2 changes: 2 additions & 0 deletions jsk_topic_tools/src/jsk_topic_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from .timered_diagnostic_updater import TimeredDiagnosticUpdater

from .transport import *
from .log_utils import *
from .name_utils import *
44 changes: 44 additions & 0 deletions jsk_topic_tools/src/jsk_topic_tools/timered_diagnostic_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from distutils.version import LooseVersion

import diagnostic_updater
import pkg_resources
import rospy


class TimeredDiagnosticUpdater(object):

def __init__(self, timer_duration):
self.diagnostic_updater = diagnostic_updater.Updater()

self.timer_kwargs = dict(
period=timer_duration,
callback=self.timer_callback,
oneshot=False,
)
if (LooseVersion(pkg_resources.get_distribution('rospy').version) >=
LooseVersion('1.12.0')):
# on >=kinetic, it raises ROSTimeMovedBackwardsException
# when we use rosbag play --loop.
self.timer_kwargs['reset'] = True
self.timer = None

def start(self):
if self.timer is None:
self.timer = rospy.Timer(**self.timer_kwargs)

def stop(self):
if self.timer is not None:
self.timer.shutdown()
self.timer = None

def set_hardware_id(self, name):
self.diagnostic_updater.setHardwareID(name)

def add(self, name, func):
self.diagnostic_updater.add(name, func)

def update(self):
self.diagnostic_updater.update()

def timer_callback(self, timer_event):
self.update()
67 changes: 65 additions & 2 deletions jsk_topic_tools/src/jsk_topic_tools/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import abc
import argparse
from distutils.version import LooseVersion
import pkg_resources
import sys

from diagnostic_msgs.msg import DiagnosticStatus
import pkg_resources
import rospy

from jsk_topic_tools.name_utils import unresolve_name
from jsk_topic_tools import TimeredDiagnosticUpdater


__all__ = ('ConnectionBasedTransport',)
Expand Down Expand Up @@ -48,6 +50,17 @@ def __call__(cls, *args, **kwargs):
return obj


class _Publisher(rospy.Publisher):

def __init__(self, *args, **kwargs):
super(_Publisher, self).__init__(*args, **kwargs)
self.last_published_time = rospy.Time.from_sec(0)

def publish(self, *args, **kwargs):
super(_Publisher, self).publish(*args, **kwargs)
self.last_published_time = rospy.Time.now()


class ConnectionBasedTransport(rospy.SubscribeListener):

__metaclass__ = MetaConnectionBasedTransport
Expand All @@ -58,6 +71,26 @@ def __init__(self):
self._publishers = []
self._ever_subscribed = False
self._connection_status = NOT_SUBSCRIBED

if rospy.get_param('~enable_vital_check', True):
self.diagnostic_updater = TimeredDiagnosticUpdater(
rospy.Duration(1.0))
self.node_name = rospy.get_name()
self.diagnostic_updater.set_hardware_id(self.node_name)
self.diagnostic_updater.add(self.node_name,
self.update_diagnostic)

self.use_warn = rospy.get_param(
'/diagnostic_nodelet/use_warn',
rospy.get_param('~use_warn', False))
if self.use_warn:
self.diagnostic_error_level = DiagnosticStatus.WARN
else:
self.diagnostic_error_level = DiagnosticStatus.ERROR
self.vital_rate = rospy.get_param('~vital_rate', 1.0)
self.dead_duration = 1.0 / self.vital_rate
self.diagnostic_updater.start()

kwargs = dict(
period=rospy.Duration(5),
callback=self._warn_never_subscribed_cb,
Expand Down Expand Up @@ -88,6 +121,15 @@ def _warn_never_subscribed_cb(self, timer_event):
" child subscribers. Set '~always_subscribe' as True"
' to have it subscribe always.'.format(name=rospy.get_name()))

def _is_running(self):
current_time = rospy.Time.now()
# check subscribed topics are published.
return all([
(current_time.to_sec() - pub.last_published_time.to_sec())
< self.dead_duration
for pub in self._publishers
if pub.get_num_connections() > 0])

@abc.abstractmethod
def subscribe(self):
raise NotImplementedError
Expand Down Expand Up @@ -127,6 +169,27 @@ def advertise(self, *args, **kwargs):
assert kwargs.get('subscriber_listener') is None
kwargs['subscriber_listener'] = self

pub = rospy.Publisher(*args, **kwargs)
pub = _Publisher(*args, **kwargs)
self._publishers.append(pub)
return pub

def update_diagnostic(self, stat):
if self._connection_status == SUBSCRIBED:
if self._is_running():
stat.summary(
DiagnosticStatus.OK,
'{} is running'.format(self.node_name))
else:
stat.summary(
self.diagnostic_error_level,
"{} is not running for {} sec".format(
self.node_name, self.dead_duration))
else:
stat.summary(
DiagnosticStatus.OK,
'{} is not subscribed'.format(self.node_name))
topic_names = ', '.join([pub.name for pub in self._publishers])
stat.add('watched topics', topic_names)
for pub in self._publishers:
stat.add(pub.name, '{} subscribers'.format(
pub.get_num_connections()))
18 changes: 7 additions & 11 deletions jsk_topic_tools/test/test_connection_based_transport.test
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
<launch>
<node pkg="rostopic" type="rostopic" name="input"
args="pub /input sensor_msgs/Image 'data: [1, 2, 3]' -r 10">
</node>

<node name="simple_image_transport"
pkg="jsk_topic_tools" type="simple_image_transport.py">
<remap from="~input" to="input" />
</node>
<include file="$(find jsk_topic_tools)/sample/sample_connection_based_transport.launch" >
</include>

<test test-name="test_connection"
name="test_connection"
pkg="jsk_topic_tools" type="test_connection.py"
<test test-name="test_diagnostic_transport"
name="test_diagnostic_transport"
pkg="jsk_topic_tools" type="test_diagnostic.py"
retry="3">
<rosparam>
input_topic_type: sensor_msgs/Image
check_connected_topics: [simple_image_transport/output, input]
check_connected_topics: [simple_image_transport/output, input_image]
wait_for_connection: 10
hardware_id: /simple_image_transport
</rosparam>
<remap from="~input" to="simple_image_transport/output" />
</test>
Expand Down
Loading