Skip to content

Commit

Permalink
Update topic monitor for get_topic_names_and_types changes (#149)
Browse files Browse the repository at this point in the history
* Topics returned by get_topic_names_and_types have leading forward slashes now

* get_topic_names_and_types returns a list of types now

* Fix a bug in the reception of best effort messages

* Kinder failing without matplotlib

* Add missing __init__.py files

* Print warning when topics which appropriate names are ignored because of types

* No issues with empty waitsets anymore

(There's always a signal handler)

* Add linters

* Install the whole package rather than using py_modules

* Don't publish on topic with the same name token

The different types cause connext to get mixed up, see ros2/rmw_connext#234

* Flake8 import order wants different things on different machines?

* Add launch as exec_depend
  • Loading branch information
dhood committed Jun 29, 2017
1 parent 2a60907 commit 1208b81
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 23 deletions.
4 changes: 4 additions & 0 deletions topic_monitor/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
<build_depend>ament_python</build_depend>
<build_depend>rclpy</build_depend>

<exec_depend>launch</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>ros2run</exec_depend>
<exec_depend>std_msgs</exec_depend>

<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
Expand Down
11 changes: 3 additions & 8 deletions topic_monitor/setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ament_python.data_files import get_data_files
from ament_python.script_dir import install_scripts_to_libexec
from setuptools import find_packages
from setuptools import setup

package_name = 'topic_monitor'
Expand All @@ -9,14 +10,7 @@
setup(
name='topic_monitor',
version='0.0.0',
packages=[],
py_modules=[
'topic_monitor.launch_files.launch_depth_demo',
'topic_monitor.launch_files.launch_fragmentation_demo',
'topic_monitor.launch_files.launch_reliability_demo',
'topic_monitor.scripts.data_publisher',
'topic_monitor.scripts.topic_monitor',
],
packages=find_packages(exclude=['test']),
data_files=data_files,
install_requires=[
'launch',
Expand All @@ -31,6 +25,7 @@
],
description='Package containing tools for monitoring ROS 2 topics.',
license='Apache License, Version 2.0',
test_suite='test',
entry_points={
'console_scripts': [
'data_publisher = topic_monitor.scripts.data_publisher:main',
Expand Down
20 changes: 20 additions & 0 deletions topic_monitor/test/test_flake8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_flake8.main import main


def test_flake8():
rc = main(argv=[])
assert rc == 0, 'Found errors'
20 changes: 20 additions & 0 deletions topic_monitor/test/test_pep257.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_pep257.main import main


def test_pep257():
rc = main(argv=[])
assert rc == 0, 'Found code style errors / warnings'
Empty file.
Empty file.
Empty file.
1 change: 1 addition & 0 deletions topic_monitor/topic_monitor/scripts/data_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,6 @@ def publish_msg(val):
sleep(0.1)
raise


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions topic_monitor/topic_monitor/scripts/ros1/data_pub_ros1.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,6 @@ def publish_msg(val):
publish_msg(-1)
raise


if __name__ == '__main__':
main()
56 changes: 41 additions & 15 deletions topic_monitor/topic_monitor/scripts/topic_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import time

import rclpy
from rclpy.qos import QoSReliabilityPolicy
from rclpy.qos import qos_profile_default
from rclpy.qos import QoSReliabilityPolicy

from std_msgs.msg import Float32, Header

Expand Down Expand Up @@ -55,7 +55,7 @@ def get_data_from_msg(self, msg):
data = msg.frame_id
idx = data.find('_')
data = data[:idx] if idx != -1 else data
return int(data)
return int(data) if data else 0

def topic_data_callback(self, msg):
received_value = self.get_data_from_msg(msg)
Expand Down Expand Up @@ -109,7 +109,7 @@ class TopicMonitor:
"""Monitor of a set of topics that match a specified topic name pattern."""

def __init__(self, window_size):
self.data_topic_pattern = re.compile("((?P<data_name>\w*)_data_?(?P<reliability>\w*))")
self.data_topic_pattern = re.compile("(/(?P<data_name>\w*)_data_?(?P<reliability>\w*))")
self.monitored_topics = {}
self.monitored_topics_lock = Lock()
self.publishers = {}
Expand All @@ -129,6 +129,7 @@ def add_monitored_topic(
topic_name,
monitored_topic.topic_data_callback,
qos_profile=qos_profile)
assert sub # prevent unused warning

# Create a timer for maintaining the expected value received on the topic
expected_value_timer = node.create_timer(
Expand All @@ -142,8 +143,15 @@ def add_monitored_topic(
allowed_latency_timer.cancel()

# Create a publisher for the reception rate of the topic
reception_rate_topic_name = self.reception_rate_topic_name + topic_name

# TODO(dhood): remove this workaround
# once https://github.com/ros2/rmw_connext/issues/234 is resolved
reception_rate_topic_name += "_"

print('Publishing reception rate on topic: %s' % reception_rate_topic_name)
reception_rate_publisher = node.create_publisher(
Float32, self.reception_rate_topic_name + '/' + topic_name)
Float32, reception_rate_topic_name)

with self.monitored_topics_lock:
monitored_topic.expected_value_timer = expected_value_timer
Expand Down Expand Up @@ -301,37 +309,52 @@ def stop(self):

def run_topic_listening(node, topic_monitor, options):
"""Subscribe to relevant topics and manage the data received from susbcriptions."""
already_ignored_topics = set()
while rclpy.ok():
# Check if there is a new topic online
# TODO(dhood): use graph events rather than polling
topic_names_and_types = node.get_topic_names_and_types()

for topic_name, type_name in topic_names_and_types:
if not topic_monitor.is_supported_type(type_name):
continue

for topic_name, type_names in topic_names_and_types:
# Infer the appropriate QoS profile from the topic name
topic_info = topic_monitor.get_topic_info(topic_name)
if topic_info is None:
# The topic is not for being monitored
continue

if len(type_names) != 1:
if topic_name not in already_ignored_topics:
print(
"Warning: ignoring topic '%s', which has more than one type: [%s]"
% (topic_name, ', '.join(type_names)))
already_ignored_topics.add(topic_name)
continue

type_name = type_names[0]
if not topic_monitor.is_supported_type(type_name):
if topic_name not in already_ignored_topics:
print(
"Warning: ignoring topic '%s' because its message type (%s)"
"is not suported."
% (topic_name, type_name))
already_ignored_topics.add(topic_name)
continue

is_new_topic = topic_name and topic_name not in topic_monitor.monitored_topics
if is_new_topic:
# Register new topic with the monitor
qos_profile = qos_profile_default
qos_profile.depth = QOS_DEPTH
if topic_info['reliability'] == 'best_effort':
qos_profile.reliability = \
QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE
QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT
topic_monitor.add_monitored_topic(
Header, topic_name, node, qos_profile,
options.expected_period, options.allowed_latency, options.stale_time)

if topic_monitor.monitored_topics:
# Wait for messages with a timeout, otherwise this thread will block any other threads
# until a message is received
rclpy.spin_once(node, timeout_sec=0.05)
# Wait for messages with a timeout, otherwise this thread will block any other threads
# until a message is received
rclpy.spin_once(node, timeout_sec=0.05)


def main():
Expand Down Expand Up @@ -362,8 +385,11 @@ def main():

args = parser.parse_args()
if args.show_display:
global plt
import matplotlib.pyplot as plt
try:
global plt
import matplotlib.pyplot as plt
except:
raise RuntimeError('The --display option requires matplotlib to be installed')

topic_monitor = TopicMonitor(args.window_size)

Expand Down

0 comments on commit 1208b81

Please sign in to comment.