Skip to content

Commit

Permalink
Initializing the repeat_latched option (#17)
Browse files Browse the repository at this point in the history
* Initializing the repeat_latched option and adding test
  • Loading branch information
ayrton04 authored and paulbovbel committed Jan 23, 2023
1 parent 2d25fb4 commit b0ba107
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 3 deletions.
79 changes: 76 additions & 3 deletions test/test_rosbag/test/latched_pub.py
Expand Up @@ -32,16 +32,37 @@
# POSSIBILITY OF SUCH DAMAGE.

import unittest
import rosbag
import roslaunch
import rospy
import rostest
import os
import sys
from std_msgs.msg import *

def generate_bags(topic, rosbag_args, player_name):
pub = rospy.Publisher(topic, String, latch=True)

pub.publish(String("hello"))

# Wait for some time before starting the bag recording
rospy.sleep(rospy.Duration.from_sec(5.0))

# Kick off a rosbag record
node = roslaunch.core.Node(package="rosbag", node_type="record", name=player_name, args=rosbag_args)
launch = roslaunch.scriptapi.ROSLaunch()
launch.start()
process = launch.launch(node)

# Give the record time to split
rospy.sleep(rospy.Duration.from_sec(6.0))

# Shut down the roslaunch instance to stop bagging
launch.stop()

class LatchedPub(unittest.TestCase):

def test_latched_pub(self):
rospy.init_node('latched_pub')

# Wait a while before publishing
rospy.sleep(rospy.Duration.from_sec(5.0))

Expand All @@ -50,7 +71,59 @@ def test_latched_pub(self):
pub.publish(String("hello"))

rospy.sleep(rospy.Duration.from_sec(5.0))


def test_latched_split(self):
# Check that the topic is in the first bag file only (we do not use --repeat-latched)
topic = "chatter2"
bag_prefix = "test_latched_pub_delayed_sub"
rosbag_args = "--split --duration=2s {} -O /tmp/{}".format(topic, bag_prefix)
generate_bags(topic, rosbag_args, "play_test_latched_split")

bag_count = 0

for file in os.listdir("/tmp/"):
if file.startswith(bag_prefix) and file.endswith(".bag"):
bag_file_name = os.path.join("/tmp", file)
bag_file = rosbag.Bag(bag_file_name, 'r')
bag_count += 1

messages_in_bag = 0

for _, msg, t in bag_file.read_messages(topics=[topic]):
self.assertEqual(1, bag_count) # Should only enter this loop for the first bag file
self.assertEqual("hello", msg.data)
messages_in_bag += 1
self.assertEqual(1, messages_in_bag) # We should only have one instance of this message

self.assertGreater(bag_count, 1)

def test_latched_repeat_split(self):
# Check that the topic is in *each* of the bag files
topic = "chatter3"
bag_prefix = "test_latched_pub_delayed_repeat_sub"
rosbag_args = "--split --repeat-latched --duration=2s {} -O /tmp/{}".format(topic, bag_prefix)
generate_bags(topic, rosbag_args, "play_test_latched_repeat_split")

bag_count = 0
previous_time = rospy.Time()

for file in os.listdir("/tmp/"):
if file.startswith(bag_prefix) and file.endswith(".bag"):
bag_file_name = os.path.join("/tmp", file)
bag_file = rosbag.Bag(bag_file_name, 'r')
bag_count += 1

messages_in_bag = 0

for _, msg, t in bag_file.read_messages(topics=[topic]):
self.assertNotEqual(previous_time, t) # Timestamp gets updated when we re-record latched data
self.assertEqual("hello", msg.data)
messages_in_bag += 1
self.assertEqual(1, messages_in_bag) # We should only have one instance of this message
previous_time = t

self.assertGreater(bag_count, 1)

if __name__ == '__main__':
rospy.init_node('latched_pub')
rostest.rosrun('test_rosbag', 'latched_pub', LatchedPub, sys.argv)
1 change: 1 addition & 0 deletions tools/rosbag/src/recorder.cpp
Expand Up @@ -97,6 +97,7 @@ RecorderOptions::RecorderOptions() :
snapshot(false),
verbose(false),
publish(false),
repeat_latched(false),
compression(compression::Uncompressed),
prefix(""),
name(""),
Expand Down

0 comments on commit b0ba107

Please sign in to comment.