Skip to content

Commit

Permalink
[jsk_topic_tools] Add Diagnostic function to ConnectionBasedTransport
Browse files Browse the repository at this point in the history
  • Loading branch information
iory committed Feb 22, 2022
1 parent a48ca42 commit c8efdb1
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 72 deletions.
1 change: 0 additions & 1 deletion jsk_topic_tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ if (CATKIN_ENABLE_TESTING)
jsk_topic_tools_add_rostest(test/test_block.test)
jsk_topic_tools_add_rostest(test/test_connection_based_nodelet.test)
jsk_topic_tools_add_rostest(test/test_connection_based_transport.test)
jsk_topic_tools_add_rostest(test/test_diagnostic_transport.test)
jsk_topic_tools_add_rostest(test/test_stealth_relay.test)
jsk_topic_tools_add_rostest(test/test_synchronized_throttle.test)
jsk_topic_tools_add_rostest(test/test_pose_stamped_publisher.test)
Expand Down
6 changes: 3 additions & 3 deletions jsk_topic_tools/sample/simple_image_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

from sensor_msgs.msg import Image

from jsk_topic_tools import DiagnosticTransport
from jsk_topic_tools import ConnectionBasedTransport


class SimpleImageTransport(DiagnosticTransport):
class SimpleImageTransport(ConnectionBasedTransport):
def __init__(self):
super(SimpleImageTransport, self).__init__('simple_image_transport')
super(SimpleImageTransport, self).__init__()
self._pub = self.advertise('~output', Image, queue_size=1)

def subscribe(self):
Expand Down
62 changes: 29 additions & 33 deletions jsk_topic_tools/src/jsk_topic_tools/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
from jsk_topic_tools import TimeredDiagnosticUpdater


__all__ = ('ConnectionBasedTransport',
'DiagnosticTransport')
__all__ = ('ConnectionBasedTransport',)


SUBSCRIBED = 0
Expand Down Expand Up @@ -72,6 +71,26 @@ def __init__(self):
self._publishers = []
self._ever_subscribed = False
self._connection_status = NOT_SUBSCRIBED

if rospy.get_param('~enable_vital_check', True):
self.diagnostic_updater = TimeredDiagnosticUpdater(
rospy.Duration(1.0))
self.node_name = rospy.get_name()
self.diagnostic_updater.set_hardware_id(self.node_name)
self.diagnostic_updater.add(self.node_name,
self.update_diagnostic)

self.use_warn = rospy.get_param(
'/diagnostic_nodelet/use_warn',
rospy.get_param('~use_warn', False))
if self.use_warn:
self.diagnostic_error_level = DiagnosticStatus.WARN
else:
self.diagnostic_error_level = DiagnosticStatus.ERROR
self.vital_rate = rospy.get_param('~vital_rate', 1.0)
self.dead_duration = 1.0 / self.vital_rate
self.diagnostic_updater.start()

kwargs = dict(
period=rospy.Duration(5),
callback=self._warn_never_subscribed_cb,
Expand Down Expand Up @@ -102,6 +121,13 @@ def _warn_never_subscribed_cb(self, timer_event):
" child subscribers. Set '~always_subscribe' as True"
' to have it subscribe always.'.format(name=rospy.get_name()))

def _is_running(self):
current_time = rospy.Time.now()
return all([
(current_time.to_sec() - pub.last_published_time.to_sec())
< self.dead_duration
for pub in self._publishers])

@abc.abstractmethod
def subscribe(self):
raise NotImplementedError
Expand Down Expand Up @@ -145,36 +171,6 @@ def advertise(self, *args, **kwargs):
self._publishers.append(pub)
return pub


class DiagnosticTransport(ConnectionBasedTransport):

def __init__(self, name):
self.name = name
super(DiagnosticTransport, self).__init__()
self.diagnostic_updater = TimeredDiagnosticUpdater(
rospy.Duration(1.0))
self.node_name = rospy.get_name()
self.diagnostic_updater.set_hardware_id(self.node_name)
self.diagnostic_updater.add('{}::{}'.format(self.node_name, self.name),
self.update_diagnostic)

self.use_warn = rospy.get_param('/diagnostic_nodelet/use_warn',
rospy.get_param('~use_warn', False))
if self.use_warn:
self.diagnostic_error_level = DiagnosticStatus.WARN
else:
self.diagnostic_error_level = DiagnosticStatus.ERROR
self.vital_rate = rospy.get_param('~vital_rate', 1.0)
self.dead_duration = 1.0 / self.vital_rate
self.diagnostic_updater.start()

def _is_running(self):
current_time = rospy.Time.now()
return all([
(current_time.to_sec() - pub.last_published_time.to_sec())
< self.dead_duration
for pub in self._publishers])

def update_diagnostic(self, stat):
if self._connection_status == SUBSCRIBED:
if self._is_running():
Expand All @@ -185,7 +181,7 @@ def update_diagnostic(self, stat):
stat.summary(
self.diagnostic_error_level,
"{} not running for {} sec".format(
self.name, self.dead_duration))
self.node_name, self.dead_duration))
else:
stat.summary(
DiagnosticStatus.OK,
Expand Down
60 changes: 60 additions & 0 deletions jsk_topic_tools/test/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,66 @@ def test_subscriber_appears_disappears(self):
def _cb_test_subscriber_appears_disappears(self, msg):
pass

def _cb_diagnostic(self, msg):
self.diagnostic_msg = msg

def get_diagnostic_message(self):
self.diagnostic_msg = None
sub_diagnostic = rospy.Subscriber(
'/diagnostics',
DiagnosticArray, self._cb_diagnostic)
rate = rospy.Rate(10)
while not rospy.is_shutdown() and self.diagnostic_msg is None:
rate.sleep()
sub_diagnostic.unregister()
return self.diagnostic_msg

def _check_diagnostic_error_level(self, target_hardware_id, error_level):
while not rospy.is_shutdown():
msg = self.get_diagnostic_message()
if len(msg.status) > 0:
for status in msg.status:
if status.hardware_id != target_hardware_id:
continue
if status.level != error_level:
raise ValueError(
'ERROR Level is strange: expected {}, but {}'
.format(error_level, status.level))
return True

def test_no_subscribers_diagnostic(self):
target_hardware_id = rospy.get_param('~hardware_id')
rospy.wait_for_service('/mux/select')
mux_selector = rospy.ServiceProxy('/mux/select', MuxSelect)

mux_selector('input')

# no subsrcibers and error level == DiagnosticStatus.OK
self._check_diagnostic_error_level(
target_hardware_id, DiagnosticStatus.OK)

# start subscribe
topic_type = rospy.get_param('~input_topic_type')
msg_class = roslib.message.get_message_class(topic_type)
# Subscribe topic and bond connection
sub = rospy.Subscriber('~input', msg_class,
self._cb_test_subscriber_appears_disappears)

# subsrciber exists and error level == DiagnosticStatus.OK
self._check_diagnostic_error_level(
target_hardware_id, DiagnosticStatus.OK)

mux_selector('input_dummy')

# subscriber exists and error level == 2
self._check_diagnostic_error_level(
target_hardware_id, DiagnosticStatus.ERROR)

# stop subscribe
sub.unregister()

mux_selector('input')


if __name__ == "__main__":
import rostest
Expand Down
17 changes: 12 additions & 5 deletions jsk_topic_tools/test/test_connection_based_transport.test
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
<launch>

<node pkg="rostopic" type="rostopic" name="input"
args="pub /input sensor_msgs/Image 'data: [1, 2, 3]' -r 10">
</node>

<node name="input_mux"
pkg="topic_tools" type="mux"
args="input_image input input_dummy" >
</node>

<node name="simple_image_transport"
pkg="jsk_topic_tools" type="simple_image_transport.py">
<remap from="~input" to="input" />
<remap from="~input" to="input_image" />
</node>

<test test-name="test_connection"
name="test_connection"
pkg="jsk_topic_tools" type="test_connection.py"
<test test-name="test_diagnostic_transport"
name="test_diagnostic_transport"
pkg="jsk_topic_tools" type="test_diagnostic.py"
retry="3">
<rosparam>
input_topic_type: sensor_msgs/Image
check_connected_topics: [simple_image_transport/output, input]
check_connected_topics: [simple_image_transport/output, input_image]
wait_for_connection: 10
hardware_id: /simple_image_transport
</rosparam>
<remap from="~input" to="simple_image_transport/output" />
</test>
Expand Down
30 changes: 0 additions & 30 deletions jsk_topic_tools/test/test_diagnostic_transport.test

This file was deleted.

0 comments on commit c8efdb1

Please sign in to comment.