Skip to content

Commit

Permalink
Add correct timing behaviour for rosbag play (ros2#32)
Browse files Browse the repository at this point in the history
* ros2GH-69 Read storage content in a separate thread

For now the publishing starts only after the reading is completly
done. This should change aufter ros2GH-68 is done and a thread-safe
queue can be used instead of std::queue.

* ros2GH-71 Add integration test for timing behavior

* ros2GH-68 Introduce vendor package for shared queue

- Download and install headers from moodycamel readerwriterqueue
- Download and install headers from moodycamel concurrentqueue
- Use readerwriterqueue in code to load and publish concurrently

* ros2GH-71 Retain time difference of messages when playing a bag file

- The main (play) thread sleeps until the time for publishing the
  message is reached.
- Using std::chrono time_point and duration for type-safe time
  arithmetic instead of rcutils time types.

* ros2GH-71 Improve stability of read test

- Subscribers need to maintain a longer history if the messages are
  not consumed fast enough.

* ros2GH-71 Fix Classloader instance lifetime

The Classloader instance needs to outlive all objects created by it.

* ros2GH-71 Extract playing code into a class of its own

Reason: record and play have almost no common code but do the exact
opposite with the storage and rclcpp.

* ros2GH-70 Do not link explicitly against std_msgs

- only required in tests
- this decreases the amount of packages needed for a clean build without tests

* ros2GH-70 Fix error message of storage

* ros2GH-70 Fix pluginlib/storage issue for recording

* ros2GH-71 Cleanup: variable naming

* ros2GH-70 Load storage continuously instead of as fast as possible

- Only load if queue contains less than 1000 messages
- Wait a millisecond before loading again once the queue is long enough

* ros2GH-70 Add options struct to allow specification of queue size

* ros2GH-72 Wait for messages to fill up

* ros2GH-74 Rename integration tests to play/record tests

* ros2GH-74 Use test_msgs in integration tests

- gets rid of string_msgs dependency

* ros2GH-70 Rename is_not_ready to is_pending, use bulk reading to queue

* ros2GH-70 Harmonize storage_loading_future variable

* ros2GH-88 Read messages in order of their timestamps

- Currently, we write sequentially in order of arrival time so
  reading in id order is fine
- This may change at a later time and should not change the reading
  behaviour, i.e. we need to read in order of timestamps

* Fix compiler error on Mac

* ros2GH-8 Fix: use correct ros message type in test

* ros2GH-8 Cleanup: minor code style fixes

* ros2GH-8 Refactor future usage in player

Make the future a class member of player to avoid having to hand it
into several functions which is difficult with a move-only type.

* ros2GH-8 Cleanup: remove verbose logging for every stored message

* ros2GH-8 Refactor rosbag2 interface

Add an explicit overload for record without a topic_names argument to
record all topics.

* fix: call vector.reserve instead of default initalization

* fix record demo
  • Loading branch information
Martin-Idel-SI authored and Karsten1987 committed Sep 20, 2018
1 parent cf84366 commit bc822f1
Show file tree
Hide file tree
Showing 26 changed files with 701 additions and 246 deletions.
56 changes: 34 additions & 22 deletions rosbag2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,29 @@ find_package(rclcpp REQUIRED)
find_package(rcutils REQUIRED)
find_package(rosbag2_storage REQUIRED)
find_package(rosidl_generator_cpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(shared_queues_vendor REQUIRED)

add_library(
librosbag2
SHARED
src/rosbag2/rosbag2.cpp
src/rosbag2/player.cpp
src/rosbag2/typesupport_helpers.cpp
src/rosbag2/generic_publisher.cpp
src/rosbag2/generic_subscription.cpp
src/rosbag2/rosbag2_node.cpp)

ament_target_dependencies(
librosbag2
ament_target_dependencies(librosbag2
ament_index_cpp
Poco
rcl
rclcpp
rcutils
rosbag2_storage
rosidl_generator_c
std_msgs
shared_queues_vendor
)

target_include_directories(librosbag2
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
Expand Down Expand Up @@ -78,37 +79,48 @@ if(BUILD_TESTING)
find_package(test_msgs REQUIRED)
ament_lint_auto_find_test_dependencies()

ament_add_gmock(rosbag2_write_integration_test
test/rosbag2/rosbag2_write_integration_test.cpp
ament_add_gmock(rosbag2_record_integration_test
test/rosbag2/rosbag2_record_integration_test.cpp
test/rosbag2/test_memory_management.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_write_integration_test)
target_link_libraries(rosbag2_write_integration_test librosbag2
if(TARGET rosbag2_record_integration_test)
target_link_libraries(rosbag2_record_integration_test librosbag2
${test_msgs_LIBRARIES}
${rosbag2_storage_default_plugins_LIBRARIES})
target_include_directories(rosbag2_write_integration_test PRIVATE
target_include_directories(rosbag2_record_integration_test PRIVATE
${test_msgs_INCLUDE_DIRS}
${rosbag2_storage_default_plugins_INCLUDE_DIRS})
endif()

ament_add_gmock(rosbag2_write_all_integration_test
test/rosbag2/rosbag2_write_all_integration_test.cpp
ament_add_gmock(rosbag2_record_all_integration_test
test/rosbag2/rosbag2_record_all_integration_test.cpp
test/rosbag2/test_memory_management.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_write_all_integration_test)
target_link_libraries(rosbag2_write_all_integration_test librosbag2
if(TARGET rosbag2_record_all_integration_test)
target_link_libraries(rosbag2_record_all_integration_test librosbag2
${test_msgs_LIBRARIES}
${rosbag2_storage_default_plugins_LIBRARIES})
target_include_directories(rosbag2_write_all_integration_test PRIVATE
target_include_directories(rosbag2_record_all_integration_test PRIVATE
${test_msgs_INCLUDE_DIRS}
${rosbag2_storage_default_plugins_INCLUDE_DIRS})
endif()

ament_add_gmock(rosbag2_read_integration_test
test/rosbag2/rosbag2_read_integration_test.cpp
ament_add_gmock(rosbag2_play_integration_test
test/rosbag2/rosbag2_play_integration_test.cpp
test/rosbag2/test_memory_management.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_read_integration_test)
target_link_libraries(rosbag2_read_integration_test librosbag2)
ament_target_dependencies(rosbag2_read_integration_test
test_msgs
)
if(TARGET rosbag2_play_integration_test)
target_link_libraries(rosbag2_play_integration_test librosbag2)
ament_target_dependencies(rosbag2_play_integration_test test_msgs)
endif()

ament_add_gmock(rosbag2_play_timing_integration_test
test/rosbag2/rosbag2_play_timing_integration_test.cpp
test/rosbag2/test_memory_management.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_play_timing_integration_test)
target_link_libraries(rosbag2_play_timing_integration_test librosbag2)
ament_target_dependencies(rosbag2_play_timing_integration_test test_msgs)
endif()

ament_add_gmock(rosbag2_typesupport_helpers_test
Expand Down Expand Up @@ -142,7 +154,7 @@ if(BUILD_TESTING)
ament_index_cpp
Poco
rclcpp
std_msgs
test_msgs
)
endif()
endif()
Expand Down
27 changes: 18 additions & 9 deletions rosbag2/include/rosbag2/rosbag2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

#include "rclcpp/rclcpp.hpp"

#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp"
#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp"
#include "rosbag2/rosbag2_play_options.hpp"
#include "rosbag2/visibility_control.hpp"

namespace rosbag2
Expand All @@ -33,43 +35,50 @@ namespace rosbag2
class GenericPublisher;
class GenericSubscription;
class Rosbag2Node;
class Player;

class Rosbag2
{
public:
ROSBAG2_PUBLIC
Rosbag2();

/**
* Records topics to a bagfile. Subscription happens at startup time, hence the topics must
* exist when "record" is called.
*
* \param file_name Name of the bagfile to write
* \param topic_names Vector of topics to subscribe to. Topics must exist at startup time. If
* the vector is empty, all topics will be subscribed.
* \param after_write_action This function will be executed after each write to the database
* where the input parameter is the topic name of the topic written Currently needed for testing.
* Might be removed later.
*/
ROSBAG2_PUBLIC
void record(const std::string & file_name, const std::vector<std::string> & topic_names);

/**
* Records all available topics to a bagfile. Subscription happens at startup time, hence only
* topics available at startup time are recorded.
*
* \param file_name Name of the bagfile to write
*/
ROSBAG2_PUBLIC
void record(const std::string & file_name);

/**
* Replay all topics in a bagfile.
*
* \param file_name Name of the bagfile to replay
*/
ROSBAG2_PUBLIC
void play(const std::string & file_name);
void play(const std::string & file_name, const Rosbag2PlayOptions & options);

private:
void prepare_publishers(
std::shared_ptr<Rosbag2Node> node,
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage);

std::shared_ptr<rosbag2::GenericSubscription>
create_subscription(
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage,
std::shared_ptr<Rosbag2Node> & node,
const std::string & topic_name, const std::string & topic_type) const;

rosbag2_storage::StorageFactory factory_;
std::shared_ptr<Rosbag2Node> node_;
std::vector<std::shared_ptr<GenericSubscription>> subscriptions_;
std::map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
};
Expand Down
30 changes: 30 additions & 0 deletions rosbag2/include/rosbag2/rosbag2_play_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2__ROSBAG2_PLAY_OPTIONS_HPP_
#define ROSBAG2__ROSBAG2_PLAY_OPTIONS_HPP_

#include <cstddef>

namespace rosbag2
{
struct Rosbag2PlayOptions
{
public:
size_t read_ahead_queue_size;
};

} // namespace rosbag2

#endif // ROSBAG2__ROSBAG2_PLAY_OPTIONS_HPP_
7 changes: 1 addition & 6 deletions rosbag2/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,19 @@
<buildtool_depend>ament_cmake</buildtool_depend>

<depend>ament_index_cpp</depend>
<depend>libpoco-dev</depend>
<depend>poco_vendor</depend>
<depend>rcl</depend>
<depend>rclcpp</depend>
<depend>rcutils</depend>
<depend>rosbag2_storage</depend>
<depend>rosidl_generator_cpp</depend>
<depend>std_msgs</depend>
<depend>shared_queues_vendor</depend>

<test_depend>ament_cmake_gmock</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>ament_index_cpp</test_depend>
<test_depend>poco_vendor</test_depend>
<test_depend>rcl</test_depend>
<test_depend>rclcpp</test_depend>
<test_depend>rosbag2_storage_default_plugins</test_depend>
<test_depend>std_msgs</test_depend>
<test_depend>test_msgs</test_depend>

<export>
Expand Down
7 changes: 5 additions & 2 deletions rosbag2/src/rosbag2/demo_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
#include <string>

#include "rclcpp/rclcpp.hpp"

#include "rosbag2/rosbag2_play_options.hpp"
#include "rosbag2/rosbag2.hpp"

int main(int argc, const char ** argv)
{
rclcpp::init(argc, argv);

auto options = rosbag2::Rosbag2PlayOptions();
options.read_ahead_queue_size = 1000;

rosbag2::Rosbag2 rosbag2;
rosbag2.play("test.bag");
rosbag2.play("test.bag", options);

rclcpp::shutdown();

Expand Down
6 changes: 5 additions & 1 deletion rosbag2/src/rosbag2/demo_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ int main(int argc, const char ** argv)
rclcpp::init(argc, argv);

rosbag2::Rosbag2 rosbag2;
rosbag2.record(filename, topics);
if (topics.empty()) {
rosbag2.record(filename);
} else {
rosbag2.record(filename, topics);
}

rclcpp::shutdown();

Expand Down
Loading

0 comments on commit bc822f1

Please sign in to comment.