Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter topic by type #1577

Merged
merged 6 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def add_arguments(self, parser, cli_name): # noqa: D102
# Topic filter arguments
parser.add_argument(
'topics', nargs='*', default=None, help='List of topics to record.')
parser.add_argument(
'--topic_types', nargs='+', default=[], help='List of topic types to record.')
ahcorde marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument(
'-a', '--all', action='store_true',
help='Record all topics and services (Exclude hidden topic).')
Expand Down Expand Up @@ -189,10 +191,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'Has no effect if no compression mode is chosen. Default: %(default)s.')

def _check_necessary_argument(self, args):
# At least one options out of --all, --all-topics, --all-services, --services, --topics or
# --regex must be used
# At least one options out of --all, --all-topics, --all-services, --services, --topics,
# --topic_types or --regex must be used
ahcorde marked this conversation as resolved.
Show resolved Hide resolved
if not (args.all or args.all_topics or args.all_services or
args.services or (args.topics and len(args.topics) > 0) or args.regex):
args.services or (args.topics and len(args.topics) > 0) or
(args.topic_types and len(args.topic_types) > 0) or args.regex):
return False
return True

Expand All @@ -212,7 +215,7 @@ def main(self, *, args): # noqa: D102
# Only one option out of --all, --all-topics, topics or --regex can be used
if (args.all and args.all_topics) or \
((args.all or args.all_topics) and args.regex) or \
((args.all or args.all_topics or args.regex) and args.topics):
((args.all or args.all_topics or args.regex) and (args.topics or args.topic_types)):
return print_error('Must specify only one option out of --all, --all-topics, '
'topics or --regex')
ahcorde marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -282,6 +285,7 @@ def main(self, *, args): # noqa: D102
record_options.all_topics = args.all_topics or args.all
record_options.is_discovery_disabled = args.no_discovery
record_options.topics = args.topics
record_options.topic_types = args.topic_types
record_options.rmw_serialization_format = args.serialization_format
record_options.topic_polling_interval = datetime.timedelta(
milliseconds=args.polling_interval)
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("all_topics", &RecordOptions::all_topics)
.def_readwrite("is_discovery_disabled", &RecordOptions::is_discovery_disabled)
.def_readwrite("topics", &RecordOptions::topics)
.def_readwrite("topic_types", &RecordOptions::topic_types)
.def_readwrite("rmw_serialization_format", &RecordOptions::rmw_serialization_format)
.def_readwrite("topic_polling_interval", &RecordOptions::topic_polling_interval)
.def_readwrite("regex", &RecordOptions::regex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only
storage_options.storage_id = storage_id;
storage_options.uri = bag_path_str;
rosbag2_transport::RecordOptions record_options =
{false, true, false, {}, {}, {"/rosout"}, {}, "cdr", 100ms};
{false, true, false, {}, {}, {}, {"/rosout"}, {}, "cdr", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options);
recorder->record();
Expand Down Expand Up @@ -265,7 +265,7 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se
storage_options.storage_id = storage_id;
storage_options.uri = bag_path_str;
rosbag2_transport::RecordOptions record_options =
{true, true, false, {}, {}, {"/rosout"}, {}, "cdr", 100ms};
{true, true, false, {}, {}, {}, {"/rosout"}, {}, "cdr", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options);
recorder->record();
Expand Down
12 changes: 12 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,18 @@ if(BUILD_TESTING)
endfunction()
call_for_each_rmw_implementation(test_record_services_for_rmw_implementation)

ament_add_gmock_executable(test_record_topic_types test/rosbag2_transport/test_record_topic_types.cpp)
target_link_libraries(test_record_topic_types rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common)
ahcorde marked this conversation as resolved.
Show resolved Hide resolved
function(test_record_services_for_rmw_implementation)
ahcorde marked this conversation as resolved.
Show resolved Hide resolved
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})

ament_add_gmock_test(test_record_topic_types
TEST_NAME test_record_topic_types${target_suffix}
ENV ${rmw_implementation_env_var}
)
endfunction()
call_for_each_rmw_implementation(test_record_services_for_rmw_implementation)
ahcorde marked this conversation as resolved.
Show resolved Hide resolved

ament_add_gmock_executable(test_composable_recorder test/rosbag2_transport/test_composable_recorder.cpp)
target_link_libraries(test_composable_recorder
${composition_interfaces_TARGETS}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct RecordOptions
bool all_services = false;
bool is_discovery_disabled = false;
std::vector<std::string> topics;
std::vector<std::string> topic_types;
std::vector<std::string> services; // service event topic
std::vector<std::string> exclude_topics;
std::vector<std::string> exclude_service_events; // service event topic
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/record_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Node convert<rosbag2_transport::RecordOptions>::encode(
node["all_services"] = record_options.all_services;
node["is_discovery_disabled"] = record_options.is_discovery_disabled;
node["topics"] = record_options.topics;
node["topic_types"] = record_options.topic_types;
node["services"] = record_options.services;
node["rmw_serialization_format"] = record_options.rmw_serialization_format;
node["topic_polling_interval"] = record_options.topic_polling_interval;
Expand Down Expand Up @@ -59,6 +60,7 @@ bool convert<rosbag2_transport::RecordOptions>::decode(
optional_assign<bool>(node, "all_services", record_options.all_services);
optional_assign<bool>(node, "is_discovery_disabled", record_options.is_discovery_disabled);
optional_assign<std::vector<std::string>>(node, "topics", record_options.topics);
optional_assign<std::vector<std::string>>(node, "topic_types", record_options.topic_types);
optional_assign<std::vector<std::string>>(node, "services", record_options.services);
optional_assign<std::string>(
node, "rmw_serialization_format", record_options.rmw_serialization_format);
Expand Down
15 changes: 14 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/topic_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ bool topic_in_list(const std::string & topic_name, const std::vector<std::string
return it != topics.end();
}

bool type_in_list(const std::string & type_name, const std::vector<std::string> & topic_types)
ahcorde marked this conversation as resolved.
Show resolved Hide resolved
{
auto it = std::find(topic_types.begin(), topic_types.end(), type_name);
return it != topic_types.end();
}

bool
topic_is_unpublished(
const std::string & topic_name, rclcpp::node_interfaces::NodeGraphInterface & node_graph)
Expand Down Expand Up @@ -140,13 +146,14 @@ bool TopicFilter::take_topic(
if (!is_service_event_topic) {
if (!record_options_.all_topics &&
record_options_.topics.empty() &&
record_options_.topic_types.empty() &&
record_options_.regex.empty() &&
!record_options_.include_hidden_topics)
{
return false;
}

if (!record_options_.all_topics) {
if (!record_options_.all_topics && record_options_.topic_types.empty()) {
// Not in include topic list
if (record_options_.topics.empty() || !topic_in_list(topic_name, record_options_.topics)) {
// Not match include regex
Expand All @@ -167,6 +174,12 @@ bool TopicFilter::take_topic(
return false;
}

if (!record_options_.topic_types.empty() &&
!type_in_list(topic_type, record_options_.topic_types))
{
return false;
}

if (!record_options_.exclude_regex.empty()) {
std::regex exclude_regex(record_options_.exclude_regex);
if (std::regex_search(topic_name, exclude_regex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ TEST_F(RecordIntegrationTestFixture, test_keyboard_controls)
auto keyboard_handler = std::make_shared<MockKeyboardHandler>();

rosbag2_transport::RecordOptions record_options =
{true, false, false, {}, {}, {}, {}, "rmw_format", 100ms};
{true, false, false, {}, {}, {}, {}, {}, "rmw_format", 100ms};
record_options.start_paused = true;

auto recorder = std::make_shared<Recorder>(
Expand Down
16 changes: 8 additions & 8 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, false, {string_topic, array_topic}, {}, {}, {}, "rmw_format", 50ms};
{false, false, false, {string_topic, array_topic}, {}, {}, {}, {}, "rmw_format", 50ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -98,7 +98,7 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop)
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, false, {string_topic}, {}, {}, {}, "rmw_format", 50ms};
{false, false, false, {string_topic}, {}, {}, {}, {}, "rmw_format", 50ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -150,7 +150,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)
pub_manager.setup_publisher(topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, false, {topic}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, {topic}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -214,7 +214,7 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data)
pub_manager.setup_publisher(topic, string_message, 2, rclcpp::SensorDataQoS());

rosbag2_transport::RecordOptions record_options =
{false, false, false, {topic}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, {topic}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -257,7 +257,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages)
pub_manager.run_publishers();

rosbag2_transport::RecordOptions record_options =
{false, false, false, {topic}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, {topic}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -302,7 +302,7 @@ TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) {
topic, profile_transient_local);

rosbag2_transport::RecordOptions record_options =
{false, false, false, {topic}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, {topic}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -349,7 +349,7 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe
auto publisher_liveliness = create_pub(profile_liveliness);

rosbag2_transport::RecordOptions record_options =
{false, false, false, {topic}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, {topic}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -387,7 +387,7 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called)
mock_writer.set_max_messages_per_file(5);

rosbag2_transport::RecordOptions record_options =
{false, false, false, {string_topic}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, {string_topic}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{true, false, false, {}, {}, {"/rosout"}, {}, "rmw_format", 100ms};
{true, false, false, {}, {}, {}, {"/rosout"}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -97,7 +97,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a
"test_service_2");

rosbag2_transport::RecordOptions record_options =
{false, true, false, {}, {}, {"/rosout"}, {}, "rmw_format", 100ms};
{false, true, false, {}, {}, {}, {"/rosout"}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -139,7 +139,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a
pub_manager.setup_publisher(string_topic, string_message, 1);

rosbag2_transport::RecordOptions record_options =
{true, true, false, {}, {}, {"/rosout"}, {}, "rmw_format", 100ms};
{true, true, false, {}, {}, {}, {"/rosout"}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_two_topics_ignore_l
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{true, false, false, {}, {}, {}, {}, "rmw_format", 100ms};
{true, false, false, {}, {}, {}, {}, {}, "rmw_format", 100ms};
record_options.ignore_leaf_topics = true;
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignore
string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {});

rosbag2_transport::RecordOptions record_options =
{true, false, false, {}, {}, {}, {}, "rmw_format", 100ms};
{true, false, false, {}, {}, {}, {}, {}, "rmw_format", 100ms};
record_options.include_unpublished_topics = false;
auto recorder = std::make_shared<MockRecorder>(writer_, storage_options_, record_options);
recorder->record();
Expand All @@ -53,7 +53,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_include
string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {});

rosbag2_transport::RecordOptions record_options =
{true, false, false, {}, {}, {}, {}, "rmw_format", 100ms};
{true, false, false, {}, {}, {}, {}, {}, "rmw_format", 100ms};
record_options.include_unpublished_topics = true;
auto recorder = std::make_shared<MockRecorder>(writer_, storage_options_, record_options);
recorder->record();
Expand All @@ -73,7 +73,7 @@ TEST_F(
string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {});

rosbag2_transport::RecordOptions record_options =
{true, false, false, {}, {}, {}, {}, "rmw_format", 100ms};
{true, false, false, {}, {}, {}, {}, {}, "rmw_format", 100ms};
record_options.include_unpublished_topics = false;
auto recorder = std::make_shared<MockRecorder>(writer_, storage_options_, record_options);
recorder->record();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_
string_message->string_value = "Hello World";

rosbag2_transport::RecordOptions record_options =
{true, false, true, {}, {}, {}, {}, "rmw_format", 100ms};
{true, false, true, {}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time)

rosbag2_transport::RecordOptions record_options =
{
false, false, false, {string_topic, clock_topic}, {}, {}, {}, "rmw_format", 100ms
false, false, false, {string_topic, clock_topic}, {}, {}, {}, {}, "rmw_format", 100ms
};
record_options.use_sim_time = true;
auto recorder = std::make_shared<MockRecorder>(
Expand Down
10 changes: 5 additions & 5 deletions rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording)
ASSERT_FALSE(std::regex_match(b4, re));

rosbag2_transport::RecordOptions record_options =
{false, false, false, {}, {}, {}, {}, "rmw_format", 10ms};
{false, false, false, {}, {}, {}, {}, {}, "rmw_format", 10ms};
record_options.regex = regex;

// TODO(karsten1987) Refactor this into publication manager
Expand Down Expand Up @@ -133,7 +133,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording)
ASSERT_TRUE(std::regex_match(e1, exclude));

rosbag2_transport::RecordOptions record_options =
{false, false, false, {}, {}, {}, {}, "rmw_format", 10ms};
{false, false, false, {}, {}, {}, {}, {}, "rmw_format", 10ms};
record_options.regex = regex;
record_options.exclude_regex = topics_regex_to_exclude;

Expand Down Expand Up @@ -209,7 +209,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording)
ASSERT_TRUE(e1 == topics_exclude);

rosbag2_transport::RecordOptions record_options =
{false, false, false, {}, {}, {}, {}, "rmw_format", 10ms};
{false, false, false, {}, {}, {}, {}, {}, "rmw_format", 10ms};
record_options.regex = regex;
record_options.exclude_topics.emplace_back(topics_exclude);

Expand Down Expand Up @@ -271,7 +271,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording)
std::string b2 = "/namespace_before/not_nice";

rosbag2_transport::RecordOptions record_options =
{false, false, false, {}, {}, {}, {}, "rmw_format", 10ms};
{false, false, false, {}, {}, {}, {}, {}, "rmw_format", 10ms};
record_options.regex = regex;
record_options.exclude_regex = services_regex_to_exclude;

Expand Down Expand Up @@ -344,7 +344,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording
std::string b2 = "/namespace_before/not_nice";

rosbag2_transport::RecordOptions record_options =
{false, false, false, {}, {}, {}, {}, "rmw_format", 10ms};
{false, false, false, {}, {}, {}, {}, {}, "rmw_format", 10ms};
record_options.regex = regex;
record_options.exclude_service_events.emplace_back(services_exclude);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
client_node_ = std::make_shared<rclcpp::Node>("test_record_client");

rosbag2_transport::RecordOptions record_options =
{false, false, false, {test_topic_}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, {test_topic_}, {}, {}, {}, {}, "rmw_format", 100ms};
storage_options_.snapshot_mode = snapshot_mode_;
storage_options_.max_cache_size = 200;
recorder_ = std::make_shared<rosbag2_transport::Recorder>(
Expand Down