Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[humble] Use rw_lock to protect mcap metadata lists. (backport #1561) #1567

Merged
merged 2 commits into from
Feb 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "rcpputils/env.hpp"
#include "rcpputils/thread_safety_annotations.hpp"
#include "rcutils/logging_macros.h"
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/ros_helper.hpp"
Expand Down Expand Up @@ -42,6 +43,7 @@
#include <algorithm>
#include <filesystem>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -217,6 +219,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
#endif

private:
void write_lock_free(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg);
void open_impl(const std::string & uri, const std::string & preset_profile,
rosbag2_storage::storage_interfaces::IOFlag io_flag,
const std::string & storage_config_uri);
Expand All @@ -234,9 +237,13 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
std::shared_ptr<rosbag2_storage::SerializedBagMessage> next_;

rosbag2_storage::BagMetadata metadata_{};
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_;
std::unordered_map<std::string, mcap::SchemaId> schema_ids_; // datatype -> schema_id
std::unordered_map<std::string, mcap::ChannelId> channel_ids_; // topic -> channel_id
std::mutex mcap_storage_mutex_;
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_
RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_);
std::unordered_map<std::string, mcap::SchemaId> schema_ids_
RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_); // datatype -> schema_id
std::unordered_map<std::string, mcap::ChannelId> channel_ids_
RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_); // topic -> channel_id
rosbag2_storage::StorageFilter storage_filter_{};
mcap::ReadMessageOptions::ReadOrder read_order_ =
mcap::ReadMessageOptions::ReadOrder::LogTimeOrder;
Expand Down Expand Up @@ -657,6 +664,21 @@ uint64_t MCAPStorage::get_minimum_split_file_size() const

/** BaseWriteInterface **/
void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
write_lock_free(msg);
}

void MCAPStorage::write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs)
{
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
for (const auto & msg : msgs) {
write_lock_free(msg);
}
}

void MCAPStorage::write_lock_free(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
const auto topic_it = topics_.find(msg->topic_name);
if (topic_it == topics_.end()) {
Expand Down Expand Up @@ -697,16 +719,9 @@ void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMess
metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time);
}

void MCAPStorage::write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs)
{
for (const auto & msg : msgs) {
write(msg);
}
}

void MCAPStorage::create_topic(const rosbag2_storage::TopicMetadata & topic)
{
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
auto topic_info = rosbag2_storage::TopicInformation{topic, 0};
const auto topic_it = topics_.find(topic.name);
if (topic_it == topics_.end()) {
Expand Down Expand Up @@ -768,7 +783,13 @@ void MCAPStorage::create_topic(const rosbag2_storage::TopicMetadata & topic)

void MCAPStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic)
{
topics_.erase(topic.name);
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
const auto topic_it = topics_.find(topic.name);
if (topic_it != topics_.end()) {
const auto & datatype = topic_it->second.topic_metadata.type;
schema_ids_.erase(datatype);
topics_.erase(topic.name);
}
}

#ifdef ROSBAG2_STORAGE_MCAP_HAS_UPDATE_METADATA
Expand Down
Loading