-
Notifications
You must be signed in to change notification settings - Fork 235
/
writer.cpp
220 lines (185 loc) · 6.71 KB
/
writer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rosbag2_cpp/writer.hpp"
#include <algorithm>
#include <chrono>
#include <cstring>
#include <memory>
#include <stdexcept>
#include <string>
#include <utility>
#include "rclcpp/logging.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"
#include "rosbag2_cpp/info.hpp"
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_storage/topic_metadata.hpp"
#include "rosbag2_storage/default_storage_id.hpp"
#include "rmw/rmw.h"
namespace rosbag2_cpp
{
Writer::Writer(std::unique_ptr<rosbag2_cpp::writer_interfaces::BaseWriterInterface> writer_impl)
: writer_impl_(std::move(writer_impl))
{}
Writer::~Writer()
{
writer_impl_.reset();
}
void Writer::open(const std::string & uri)
{
rosbag2_storage::StorageOptions storage_options;
storage_options.uri = uri;
rosbag2_cpp::ConverterOptions converter_options{};
return open(storage_options, converter_options);
}
void Writer::open(
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
writer_impl_->open(storage_options, converter_options);
}
void Writer::close()
{
writer_impl_->close();
}
void Writer::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->create_topic(topic_with_type);
}
void Writer::create_topic(
const rosbag2_storage::TopicMetadata & topic_with_type,
const rosbag2_storage::MessageDefinition & message_definition)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->create_topic(topic_with_type, message_definition);
}
void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->remove_topic(topic_with_type);
}
bool Writer::take_snapshot()
{
return writer_impl_->take_snapshot();
}
void Writer::split_bagfile()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
return writer_impl_->split_bagfile();
}
void Writer::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->write(message);
}
void Writer::write(
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message,
const std::string & topic_name,
const std::string & type_name,
const std::string & serialization_format)
{
if (message->topic_name != topic_name) {
auto err = std::string("trying to write a message with mismatching topic information: ");
err += "(" + message->topic_name + ") vs (" + topic_name + ")";
throw std::runtime_error(err);
}
rosbag2_storage::TopicMetadata tm;
tm.name = topic_name;
tm.type = type_name;
tm.serialization_format = serialization_format;
create_topic(tm);
write(message);
}
void Writer::write(
const rclcpp::SerializedMessage & message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->receive_time_stamp = time.nanoseconds();
serialized_bag_message->publish_time_stamp = time.nanoseconds();
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_cpp"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});
// While using compression mode and cache size isn't 0, another thread deals with this serialized
// message asynchronously.
// In order to keep serialized message valid, have to duplicate message.
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcutils_ret_t ret = rcutils_uint8_array_init(
serialized_bag_message->serialized_data.get(),
message.get_rcl_serialized_message().buffer_capacity,
&allocator);
if (ret != RCUTILS_RET_OK) {
auto err = std::string("Failed to call rcutils_uint8_array_init(): return ");
err += ret;
throw std::runtime_error(err);
}
std::memcpy(
serialized_bag_message->serialized_data->buffer,
message.get_rcl_serialized_message().buffer,
message.get_rcl_serialized_message().buffer_length);
serialized_bag_message->serialized_data->buffer_length =
message.get_rcl_serialized_message().buffer_length;
return write(
serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}
void Writer::write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
write(message, topic_name, type_name, time, time);
}
void Writer::write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & recv_time,
const rclcpp::Time & send_time)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->receive_time_stamp = recv_time.nanoseconds();
serialized_bag_message->publish_time_stamp = send_time.nanoseconds();
// point to actual data and keep reference to original message to avoid premature releasing
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t(message->get_rcl_serialized_message()),
[message](rcutils_uint8_array_t * data) {
(void)message;
if (data != nullptr) {
data->buffer = nullptr;
delete data;
}
});
return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}
void Writer::add_event_callbacks(bag_events::WriterEventCallbacks & callbacks)
{
writer_impl_->add_event_callbacks(callbacks);
}
} // namespace rosbag2_cpp