diff --git a/jsk_topic_tools/CMakeLists.txt b/jsk_topic_tools/CMakeLists.txt index dd0deda71..18a8d30c4 100644 --- a/jsk_topic_tools/CMakeLists.txt +++ b/jsk_topic_tools/CMakeLists.txt @@ -145,7 +145,6 @@ if (CATKIN_ENABLE_TESTING) jsk_topic_tools_add_rostest(test/test_block.test) jsk_topic_tools_add_rostest(test/test_connection_based_nodelet.test) jsk_topic_tools_add_rostest(test/test_connection_based_transport.test) - jsk_topic_tools_add_rostest(test/test_diagnostic_transport.test) jsk_topic_tools_add_rostest(test/test_stealth_relay.test) jsk_topic_tools_add_rostest(test/test_synchronized_throttle.test) jsk_topic_tools_add_rostest(test/test_pose_stamped_publisher.test) diff --git a/jsk_topic_tools/sample/simple_image_transport.py b/jsk_topic_tools/sample/simple_image_transport.py index 6352c6665..04fb39705 100755 --- a/jsk_topic_tools/sample/simple_image_transport.py +++ b/jsk_topic_tools/sample/simple_image_transport.py @@ -5,12 +5,12 @@ from sensor_msgs.msg import Image -from jsk_topic_tools import DiagnosticTransport +from jsk_topic_tools import ConnectionBasedTransport -class SimpleImageTransport(DiagnosticTransport): +class SimpleImageTransport(ConnectionBasedTransport): def __init__(self): - super(SimpleImageTransport, self).__init__('simple_image_transport') + super(SimpleImageTransport, self).__init__() self._pub = self.advertise('~output', Image, queue_size=1) def subscribe(self): diff --git a/jsk_topic_tools/src/jsk_topic_tools/transport.py b/jsk_topic_tools/src/jsk_topic_tools/transport.py index 5a4c21ead..3adf3745e 100644 --- a/jsk_topic_tools/src/jsk_topic_tools/transport.py +++ b/jsk_topic_tools/src/jsk_topic_tools/transport.py @@ -13,8 +13,7 @@ from jsk_topic_tools import TimeredDiagnosticUpdater -__all__ = ('ConnectionBasedTransport', - 'DiagnosticTransport') +__all__ = ('ConnectionBasedTransport',) SUBSCRIBED = 0 @@ -72,6 +71,26 @@ def __init__(self): self._publishers = [] self._ever_subscribed = False self._connection_status = NOT_SUBSCRIBED + + if rospy.get_param('~enable_vital_check', True): + self.diagnostic_updater = TimeredDiagnosticUpdater( + rospy.Duration(1.0)) + self.node_name = rospy.get_name() + self.diagnostic_updater.set_hardware_id(self.node_name) + self.diagnostic_updater.add(self.node_name, + self.update_diagnostic) + + self.use_warn = rospy.get_param( + '/diagnostic_nodelet/use_warn', + rospy.get_param('~use_warn', False)) + if self.use_warn: + self.diagnostic_error_level = DiagnosticStatus.WARN + else: + self.diagnostic_error_level = DiagnosticStatus.ERROR + self.vital_rate = rospy.get_param('~vital_rate', 1.0) + self.dead_duration = 1.0 / self.vital_rate + self.diagnostic_updater.start() + kwargs = dict( period=rospy.Duration(5), callback=self._warn_never_subscribed_cb, @@ -102,6 +121,13 @@ def _warn_never_subscribed_cb(self, timer_event): " child subscribers. Set '~always_subscribe' as True" ' to have it subscribe always.'.format(name=rospy.get_name())) + def _is_running(self): + current_time = rospy.Time.now() + return all([ + (current_time.to_sec() - pub.last_published_time.to_sec()) + < self.dead_duration + for pub in self._publishers]) + @abc.abstractmethod def subscribe(self): raise NotImplementedError @@ -145,36 +171,6 @@ def advertise(self, *args, **kwargs): self._publishers.append(pub) return pub - -class DiagnosticTransport(ConnectionBasedTransport): - - def __init__(self, name): - self.name = name - super(DiagnosticTransport, self).__init__() - self.diagnostic_updater = TimeredDiagnosticUpdater( - rospy.Duration(1.0)) - self.node_name = rospy.get_name() - self.diagnostic_updater.set_hardware_id(self.node_name) - self.diagnostic_updater.add('{}::{}'.format(self.node_name, self.name), - self.update_diagnostic) - - self.use_warn = rospy.get_param('/diagnostic_nodelet/use_warn', - rospy.get_param('~use_warn', False)) - if self.use_warn: - self.diagnostic_error_level = DiagnosticStatus.WARN - else: - self.diagnostic_error_level = DiagnosticStatus.ERROR - self.vital_rate = rospy.get_param('~vital_rate', 1.0) - self.dead_duration = 1.0 / self.vital_rate - self.diagnostic_updater.start() - - def _is_running(self): - current_time = rospy.Time.now() - return all([ - (current_time.to_sec() - pub.last_published_time.to_sec()) - < self.dead_duration - for pub in self._publishers]) - def update_diagnostic(self, stat): if self._connection_status == SUBSCRIBED: if self._is_running(): @@ -185,7 +181,7 @@ def update_diagnostic(self, stat): stat.summary( self.diagnostic_error_level, "{} not running for {} sec".format( - self.name, self.dead_duration)) + self.node_name, self.dead_duration)) else: stat.summary( DiagnosticStatus.OK, diff --git a/jsk_topic_tools/test/test_connection.py b/jsk_topic_tools/test/test_connection.py index dedc143ac..23f4e4415 100755 --- a/jsk_topic_tools/test/test_connection.py +++ b/jsk_topic_tools/test/test_connection.py @@ -70,6 +70,66 @@ def test_subscriber_appears_disappears(self): def _cb_test_subscriber_appears_disappears(self, msg): pass + def _cb_diagnostic(self, msg): + self.diagnostic_msg = msg + + def get_diagnostic_message(self): + self.diagnostic_msg = None + sub_diagnostic = rospy.Subscriber( + '/diagnostics', + DiagnosticArray, self._cb_diagnostic) + rate = rospy.Rate(10) + while not rospy.is_shutdown() and self.diagnostic_msg is None: + rate.sleep() + sub_diagnostic.unregister() + return self.diagnostic_msg + + def _check_diagnostic_error_level(self, target_hardware_id, error_level): + while not rospy.is_shutdown(): + msg = self.get_diagnostic_message() + if len(msg.status) > 0: + for status in msg.status: + if status.hardware_id != target_hardware_id: + continue + if status.level != error_level: + raise ValueError( + 'ERROR Level is strange: expected {}, but {}' + .format(error_level, status.level)) + return True + + def test_no_subscribers_diagnostic(self): + target_hardware_id = rospy.get_param('~hardware_id') + rospy.wait_for_service('/mux/select') + mux_selector = rospy.ServiceProxy('/mux/select', MuxSelect) + + mux_selector('input') + + # no subsrcibers and error level == DiagnosticStatus.OK + self._check_diagnostic_error_level( + target_hardware_id, DiagnosticStatus.OK) + + # start subscribe + topic_type = rospy.get_param('~input_topic_type') + msg_class = roslib.message.get_message_class(topic_type) + # Subscribe topic and bond connection + sub = rospy.Subscriber('~input', msg_class, + self._cb_test_subscriber_appears_disappears) + + # subsrciber exists and error level == DiagnosticStatus.OK + self._check_diagnostic_error_level( + target_hardware_id, DiagnosticStatus.OK) + + mux_selector('input_dummy') + + # subscriber exists and error level == 2 + self._check_diagnostic_error_level( + target_hardware_id, DiagnosticStatus.ERROR) + + # stop subscribe + sub.unregister() + + mux_selector('input') + if __name__ == "__main__": import rostest diff --git a/jsk_topic_tools/test/test_connection_based_transport.test b/jsk_topic_tools/test/test_connection_based_transport.test index cd8037bad..62f24d40f 100644 --- a/jsk_topic_tools/test/test_connection_based_transport.test +++ b/jsk_topic_tools/test/test_connection_based_transport.test @@ -1,21 +1,28 @@ + + + + - + - input_topic_type: sensor_msgs/Image - check_connected_topics: [simple_image_transport/output, input] + check_connected_topics: [simple_image_transport/output, input_image] wait_for_connection: 10 + hardware_id: /simple_image_transport diff --git a/jsk_topic_tools/test/test_diagnostic_transport.test b/jsk_topic_tools/test/test_diagnostic_transport.test deleted file mode 100644 index 62f24d40f..000000000 --- a/jsk_topic_tools/test/test_diagnostic_transport.test +++ /dev/null @@ -1,30 +0,0 @@ - - - - - - - - - - - - - - - input_topic_type: sensor_msgs/Image - check_connected_topics: [simple_image_transport/output, input_image] - wait_for_connection: 10 - hardware_id: /simple_image_transport - - - - -