Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initializing the repeat_latched option #17

Merged
merged 2 commits into from Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 76 additions & 3 deletions test/test_rosbag/test/latched_pub.py
Expand Up @@ -32,16 +32,37 @@
# POSSIBILITY OF SUCH DAMAGE.

import unittest
import rosbag
import roslaunch
import rospy
import rostest
import os
import sys
from std_msgs.msg import *

def generate_bags(topic, rosbag_args, player_name):
pub = rospy.Publisher(topic, String, latch=True)

pub.publish(String("hello"))

# Wait for some time before starting the bag recording
rospy.sleep(rospy.Duration.from_sec(5.0))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this is only part of the original test because it's waiting for the external rosbag record node to start up.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's also testing latching, no? We want to confirm latching is working, so we

  • Publish
  • Wait a little while
  • Start recording


# Kick off a rosbag record
node = roslaunch.core.Node(package="rosbag", node_type="record", name=player_name, args=rosbag_args)
launch = roslaunch.scriptapi.ROSLaunch()
launch.start()
process = launch.launch(node)

# Give the record time to split
rospy.sleep(rospy.Duration.from_sec(6.0))

# Shut down the roslaunch instance to stop bagging
launch.stop()

class LatchedPub(unittest.TestCase):

def test_latched_pub(self):
rospy.init_node('latched_pub')

# Wait a while before publishing
rospy.sleep(rospy.Duration.from_sec(5.0))

Expand All @@ -50,7 +71,59 @@ def test_latched_pub(self):
pub.publish(String("hello"))

rospy.sleep(rospy.Duration.from_sec(5.0))


def test_latched_split(self):
# Check that the topic is in the first bag file only (we do not use --repeat-latched)
topic = "chatter2"
bag_prefix = "test_latched_pub_delayed_sub"
rosbag_args = "--split --duration=2s {} -O /tmp/{}".format(topic, bag_prefix)
generate_bags(topic, rosbag_args, "play_test_latched_split")

bag_count = 0

for file in os.listdir("/tmp/"):
if file.startswith(bag_prefix) and file.endswith(".bag"):
bag_file_name = os.path.join("/tmp", file)
bag_file = rosbag.Bag(bag_file_name, 'r')
bag_count += 1

messages_in_bag = 0

for _, msg, t in bag_file.read_messages(topics=[topic]):
self.assertEqual(1, bag_count) # Should only enter this loop for the first bag file
self.assertEqual("hello", msg.data)
messages_in_bag += 1
self.assertEqual(1, messages_in_bag) # We should only have one instance of this message

self.assertGreater(bag_count, 1)

def test_latched_repeat_split(self):
# Check that the topic is in *each* of the bag files
topic = "chatter3"
bag_prefix = "test_latched_pub_delayed_repeat_sub"
rosbag_args = "--split --repeat-latched --duration=2s {} -O /tmp/{}".format(topic, bag_prefix)
generate_bags(topic, rosbag_args, "play_test_latched_repeat_split")

bag_count = 0
previous_time = rospy.Time()

for file in os.listdir("/tmp/"):
if file.startswith(bag_prefix) and file.endswith(".bag"):
bag_file_name = os.path.join("/tmp", file)
bag_file = rosbag.Bag(bag_file_name, 'r')
bag_count += 1

messages_in_bag = 0

for _, msg, t in bag_file.read_messages(topics=[topic]):
self.assertNotEqual(previous_time, t) # Timestamp gets updated when we re-record latched data
self.assertEqual("hello", msg.data)
messages_in_bag += 1
self.assertEqual(1, messages_in_bag) # We should only have one instance of this message
previous_time = t

self.assertGreater(bag_count, 1)

if __name__ == '__main__':
rospy.init_node('latched_pub')
rostest.rosrun('test_rosbag', 'latched_pub', LatchedPub, sys.argv)
1 change: 1 addition & 0 deletions tools/rosbag/src/recorder.cpp
Expand Up @@ -97,6 +97,7 @@ RecorderOptions::RecorderOptions() :
snapshot(false),
verbose(false),
publish(false),
repeat_latched(false),
compression(compression::Uncompressed),
prefix(""),
name(""),
Expand Down