Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jsk_topic_tools] Add diagnostic transport to ConnectionBasedTransport #1711

Merged
merged 16 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions doc/jsk_topic_tools/class/python_diagnostic_transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# DiagnosticTransport (Python)

## Description

This class is a subclass of [`ConnectionBasedTransport`](python_connection_based_transport.html).
Instances of this class can publish diagnostics messages.
This is an abstruct class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this feature is designed for human operator to observe the health of robot system on rqt_robot_monitor?, please add image of diagnostic viewer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll add it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

## Note

Each subclass of this class is required to call `self.vital_checker.poke()` on all callback functions so that this class can check the health of the functionality of the transport.

## Publishing Topic

- `~diagnostics` (`diagnostic_msgs.DiagnosticArray`):

Diagnostic messages

## Parameter
- `~vital_rate` (Double, default: `0.1`):

Rate to determine if the nodelet is in health.
If the rate that the callback functions is below this parameter, error messages are displayed on diagnostics.

- `/diagnostic_nodelet/use_warn` or `~use_warn` (Bool, default: `False`):

If this parameter is enabled, diagnostic messages on failure is displayed on `WARN` level instead of `ERROR` level.
`/diagnostic_nodelet/use_warn` affects every nodelets that inherits this class, but it still can be overriden for each nodelet by setting `~use_warn` parameter.
1 change: 1 addition & 0 deletions jsk_topic_tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ if (CATKIN_ENABLE_TESTING)
jsk_topic_tools_add_rostest(test/test_block.test)
jsk_topic_tools_add_rostest(test/test_connection_based_nodelet.test)
jsk_topic_tools_add_rostest(test/test_connection_based_transport.test)
jsk_topic_tools_add_rostest(test/test_diagnostic_transport.test)
jsk_topic_tools_add_rostest(test/test_stealth_relay.test)
jsk_topic_tools_add_rostest(test/test_synchronized_throttle.test)
jsk_topic_tools_add_rostest(test/test_pose_stamped_publisher.test)
Expand Down
7 changes: 4 additions & 3 deletions jsk_topic_tools/sample/simple_image_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

from sensor_msgs.msg import Image

from jsk_topic_tools import ConnectionBasedTransport
from jsk_topic_tools import DiagnosticTransport


class SimpleImageTransport(ConnectionBasedTransport):
class SimpleImageTransport(DiagnosticTransport):
def __init__(self):
super(SimpleImageTransport, self).__init__()
super(SimpleImageTransport, self).__init__('simple_image_transport')
self._pub = self.advertise('~output', Image, queue_size=1)

def subscribe(self):
Expand All @@ -20,6 +20,7 @@ def unsubscribe(self):
self.sub_img.unregister()

def _process(self, img_msg):
self.vital_checker.poke()
self._pub.publish(img_msg)


Expand Down
3 changes: 3 additions & 0 deletions jsk_topic_tools/src/jsk_topic_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from .timered_diagnostic_updater import TimeredDiagnosticUpdater
from .vital_checker import VitalChecker

from .transport import *
from .log_utils import *
from .name_utils import *
44 changes: 44 additions & 0 deletions jsk_topic_tools/src/jsk_topic_tools/timered_diagnostic_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from distutils.version import LooseVersion

import diagnostic_updater
import pkg_resources
import rospy


class TimeredDiagnosticUpdater(object):

def __init__(self, timer_duration):
self.diagnostic_updater = diagnostic_updater.Updater()

self.timer_kwargs = dict(
period=timer_duration,
callback=self.timer_callback,
oneshot=False,
)
if (LooseVersion(pkg_resources.get_distribution('rospy').version) >=
LooseVersion('1.12.0')):
# on >=kinetic, it raises ROSTimeMovedBackwardsException
# when we use rosbag play --loop.
self.timer_kwargs['reset'] = True
self.timer = None

def start(self):
if self.timer is None:
self.timer = rospy.Timer(**self.timer_kwargs)

def stop(self):
if self.timer is not None:
self.timer.shutdown()
self.timer = None

def set_hardware_id(self, name):
self.diagnostic_updater.setHardwareID(name)

def add(self, name, func):
self.diagnostic_updater.add(name, func)

def update(self):
self.diagnostic_updater.update()

def timer_callback(self, timer_event):
self.update()
52 changes: 50 additions & 2 deletions jsk_topic_tools/src/jsk_topic_tools/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import abc
import argparse
from distutils.version import LooseVersion
import pkg_resources
import sys

from diagnostic_msgs.msg import DiagnosticStatus
import pkg_resources
import rospy

from jsk_topic_tools.name_utils import unresolve_name
from jsk_topic_tools import TimeredDiagnosticUpdater
from jsk_topic_tools import VitalChecker


__all__ = ('ConnectionBasedTransport',)
__all__ = ('ConnectionBasedTransport',
'DiagnosticTransport')


SUBSCRIBED = 0
Expand Down Expand Up @@ -130,3 +134,47 @@ def advertise(self, *args, **kwargs):
pub = rospy.Publisher(*args, **kwargs)
self._publishers.append(pub)
return pub


class DiagnosticTransport(ConnectionBasedTransport):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of create new class, how about update ConnectionBasedTransport and enable this feature when /use_vital_check_on_connection_based_transport(or other good variable name) is true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea!
I merge diagnostic function to ConnectionBasedTransport.
c8efdb1


def __init__(self, name):
self.name = name
super(DiagnosticTransport, self).__init__()
self.diagnostic_updater = TimeredDiagnosticUpdater(
rospy.Duration(1.0))
self.node_name = rospy.get_name()
self.diagnostic_updater.set_hardware_id(self.node_name)
self.diagnostic_updater.add('{}::{}'.format(self.node_name, self.name),
self.update_diagnostic)

self.use_warn = rospy.get_param('/diagnostic_nodelet/use_warn',
rospy.get_param('~use_warn', False))
if self.use_warn:
self.diagnostic_error_level = DiagnosticStatus.WARN
else:
self.diagnostic_error_level = DiagnosticStatus.ERROR
self.vital_rate = rospy.get_param('~vital_rate', 1.0)
self.vital_checker = VitalChecker(1.0 / self.vital_rate)
self.diagnostic_updater.start()

def update_diagnostic(self, stat):
if self._connection_status == SUBSCRIBED:
if self.vital_checker.is_alive():
stat.summary(
DiagnosticStatus.OK,
'{} running'.format(self.node_name))
else:
stat.summary(
self.diagnostic_error_level,
"{} not running for {} sec".format(
self.name, self.vital_checker.dead_sec))
else:
stat.summary(
DiagnosticStatus.OK,
'{} is not subscribed'.format(self.node_name))
topic_names = ', '.join([pub.name for pub in self._publishers])
stat.add('watched topics', topic_names)
for pub in self._publishers:
stat.add(pub.name, '{} subscribers'.format(
pub.get_num_connections()))
27 changes: 27 additions & 0 deletions jsk_topic_tools/src/jsk_topic_tools/vital_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from threading import Lock

import rospy


class VitalChecker(object):

def __init__(self, dead_sec):
self.dead_sec = dead_sec
self.last_alive_time = rospy.Time.now()
self.lock = Lock()

def poke(self):
with self.lock:
self.last_alive_time = rospy.Time.now()

def last_alive_time_relative(self):
return (rospy.Time.now() - self.last_alive_time).to_sec()

def is_alive(self):
with self.lock:
return self.last_alive_time_relative() < self.dead_sec

def register_stat_info(self, stat, name):
with self.lock:
stat.add(name, '{} sec before'.format(
self.last_alive_time_relative()))
84 changes: 84 additions & 0 deletions jsk_topic_tools/test/test_diagnostic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env python

from diagnostic_msgs.msg import DiagnosticArray
from diagnostic_msgs.msg import DiagnosticStatus
import roslib
import rospy
from topic_tools.srv import MuxSelect

from test_connection import TestConnection


PKG = 'jsk_topic_tools'
NAME = 'test_diagnostic_transport'


class TestDiagnosticTransport(TestConnection):

def __init__(self, *args):
super(TestDiagnosticTransport, self).__init__(*args)

def _cb_diagnostic(self, msg):
self.diagnostic_msg = msg

def get_diagnostic_message(self):
self.diagnostic_msg = None
sub_diagnostic = rospy.Subscriber(
'/diagnostics',
DiagnosticArray, self._cb_diagnostic)
rate = rospy.Rate(10)
while not rospy.is_shutdown() and self.diagnostic_msg is None:
rate.sleep()
sub_diagnostic.unregister()
return self.diagnostic_msg

def _check_diagnostic_error_level(self, target_hardware_id, error_level):
while not rospy.is_shutdown():
msg = self.get_diagnostic_message()
if len(msg.status) > 0:
for status in msg.status:
if status.hardware_id != target_hardware_id:
continue
if status.level != error_level:
raise ValueError(
'ERROR Level is strange: expected {}, but {}'
.format(error_level, status.level))
return True

def test_no_subscribers_diagnostic(self):
target_hardware_id = rospy.get_param('~hardware_id')
rospy.wait_for_service('/mux/select')
mux_selector = rospy.ServiceProxy('/mux/select', MuxSelect)

mux_selector('input')

# no subsrcibers and error level == DiagnosticStatus.OK
self._check_diagnostic_error_level(
target_hardware_id, DiagnosticStatus.OK)

# start subscribe
topic_type = rospy.get_param('~input_topic_type')
msg_class = roslib.message.get_message_class(topic_type)
# Subscribe topic and bond connection
sub = rospy.Subscriber('~input', msg_class,
self._cb_test_subscriber_appears_disappears)

# subsrciber exists and error level == DiagnosticStatus.OK
self._check_diagnostic_error_level(
target_hardware_id, DiagnosticStatus.OK)

mux_selector('input_dummy')

# subscriber exists and error level == 2
self._check_diagnostic_error_level(
target_hardware_id, DiagnosticStatus.ERROR)

# stop subscribe
sub.unregister()

mux_selector('input')


if __name__ == "__main__":
import rostest
rostest.rosrun(PKG, NAME, TestDiagnosticTransport)
30 changes: 30 additions & 0 deletions jsk_topic_tools/test/test_diagnostic_transport.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<launch>

<node pkg="rostopic" type="rostopic" name="input"
args="pub /input sensor_msgs/Image 'data: [1, 2, 3]' -r 10">
</node>

<node name="input_mux"
pkg="topic_tools" type="mux"
args="input_image input input_dummy" >
</node>

<node name="simple_image_transport"
pkg="jsk_topic_tools" type="simple_image_transport.py">
<remap from="~input" to="input_image" />
</node>

<test test-name="test_diagnostic_transport"
name="test_diagnostic_transport"
pkg="jsk_topic_tools" type="test_diagnostic.py"
retry="3">
<rosparam>
input_topic_type: sensor_msgs/Image
check_connected_topics: [simple_image_transport/output, input_image]
wait_for_connection: 10
hardware_id: /simple_image_transport
</rosparam>
<remap from="~input" to="simple_image_transport/output" />
</test>

</launch>