Skip to content

Commit

Permalink
use uint8 for serialized message
Browse files Browse the repository at this point in the history
  • Loading branch information
Karsten1987 committed Nov 23, 2018
1 parent cb28689 commit 4ac9de9
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 65 deletions.
6 changes: 3 additions & 3 deletions rosbag2_storage/include/rosbag2_storage/ros_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

#include <memory>

#include "rcutils/types.h"
#include "rcutils/types/uint8_array.h"

#include "rosbag2_storage/visibility_control.hpp"

namespace rosbag2_storage
{

ROSBAG2_STORAGE_PUBLIC
std::shared_ptr<rcutils_char_array_t>
std::shared_ptr<rcutils_uint8_array_t>
make_serialized_message(const void * data, size_t size);

ROSBAG2_STORAGE_PUBLIC
std::shared_ptr<rcutils_char_array_t>
std::shared_ptr<rcutils_uint8_array_t>
make_empty_serialized_message(size_t size);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
#include <memory>
#include <string>

#include "rcutils/types/char_array.h"
#include "rcutils/types/uint8_array.h"
#include "rcutils/time.h"

namespace rosbag2_storage
{

struct SerializedBagMessage
{
std::shared_ptr<rcutils_char_array_t> serialized_data;
std::shared_ptr<rcutils_uint8_array_t> serialized_data;
rcutils_time_point_value_t time_stamp;
std::string topic_name;
};
Expand Down
16 changes: 8 additions & 8 deletions rosbag2_storage/src/rosbag2_storage/ros_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rosbag2_storage

static rcutils_allocator_t allocator = rcutils_get_default_allocator();

std::shared_ptr<rcutils_char_array_t>
std::shared_ptr<rcutils_uint8_array_t>
make_serialized_message(const void * data, size_t size)
{
auto serialized_message = make_empty_serialized_message(size);
Expand All @@ -35,20 +35,20 @@ make_serialized_message(const void * data, size_t size)
return serialized_message;
}

std::shared_ptr<rcutils_char_array_t>
std::shared_ptr<rcutils_uint8_array_t>
make_empty_serialized_message(size_t size)
{
auto msg = new rcutils_char_array_t;
*msg = rcutils_get_zero_initialized_char_array();
auto ret = rcutils_char_array_init(msg, size, &allocator);
auto msg = new rcutils_uint8_array_t;
*msg = rcutils_get_zero_initialized_uint8_array();
auto ret = rcutils_uint8_array_init(msg, size, &allocator);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error("Error allocating resources for serialized message: " +
std::string(rcutils_get_error_string().str));
}

auto serialized_message = std::shared_ptr<rcutils_char_array_t>(msg,
[](rcutils_char_array_t * msg) {
int error = rcutils_char_array_fini(msg);
auto serialized_message = std::shared_ptr<rcutils_uint8_array_t>(msg,
[](rcutils_uint8_array_t * msg) {
int error = rcutils_uint8_array_fini(msg);
delete msg;
if (error != RCUTILS_RET_OK) {
ROSBAG2_STORAGE_LOG_ERROR_STREAM(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStatementWrapper
std::shared_ptr<SqliteStatementWrapper> bind(rcutils_time_point_value_t value);
std::shared_ptr<SqliteStatementWrapper> bind(double value);
std::shared_ptr<SqliteStatementWrapper> bind(const std::string & value);
std::shared_ptr<SqliteStatementWrapper> bind(std::shared_ptr<rcutils_char_array_t> value);
std::shared_ptr<SqliteStatementWrapper> bind(std::shared_ptr<rcutils_uint8_array_t> value);

std::shared_ptr<SqliteStatementWrapper> reset();

Expand All @@ -199,15 +199,15 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStatementWrapper
void obtain_column_value(size_t index, rcutils_time_point_value_t & value) const;
void obtain_column_value(size_t index, double & value) const;
void obtain_column_value(size_t index, std::string & value) const;
void obtain_column_value(size_t index, std::shared_ptr<rcutils_char_array_t> & value) const;
void obtain_column_value(size_t index, std::shared_ptr<rcutils_uint8_array_t> & value) const;

template<typename T>
void check_and_report_bind_error(int return_code, T value);
void check_and_report_bind_error(int return_code);

sqlite3_stmt * statement_;
int last_bound_parameter_index_;
std::vector<std::shared_ptr<rcutils_char_array_t>> written_blobs_cache_;
std::vector<std::shared_ptr<rcutils_uint8_array_t>> written_blobs_cache_;
};

template<typename T1, typename T2, typename ... Params>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
bool is_read_only(const rosbag2_storage::storage_interfaces::IOFlag & io_flag) const;

using ReadQueryResult = SqliteStatementWrapper::QueryResult<
std::shared_ptr<rcutils_char_array_t>, rcutils_time_point_value_t, std::string>;
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string>;

std::shared_ptr<SqliteWrapper> database_;
std::string database_name_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ std::shared_ptr<SqliteStatementWrapper> SqliteStatementWrapper::bind(const std::
}

std::shared_ptr<SqliteStatementWrapper>
SqliteStatementWrapper::bind(std::shared_ptr<rcutils_char_array_t> value)
SqliteStatementWrapper::bind(std::shared_ptr<rcutils_uint8_array_t> value)
{
written_blobs_cache_.push_back(value);
auto return_code = sqlite3_bind_blob(
Expand Down Expand Up @@ -147,7 +147,7 @@ void SqliteStatementWrapper::obtain_column_value(size_t index, std::string & val
}

void SqliteStatementWrapper::obtain_column_value(
size_t index, std::shared_ptr<rcutils_char_array_t> & value) const
size_t index, std::shared_ptr<rcutils_uint8_array_t> & value) const
{
auto data = sqlite3_column_blob(statement_, static_cast<int>(index));
auto size = static_cast<size_t>(sqlite3_column_bytes(statement_, static_cast<int>(index)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void SqliteStorage::prepare_for_reading()
"FROM messages JOIN topics ON messages.topic_id = topics.id "
"ORDER BY messages.timestamp;");
message_result_ = read_statement_->execute_query<
std::shared_ptr<rcutils_char_array_t>, rcutils_time_point_value_t, std::string>();
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string>();
current_message_row_ = message_result_.begin();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ class StorageTestFixture : public TemporaryDirectoryFixture
allocator_ = rcutils_get_default_allocator();
}

std::shared_ptr<rcutils_char_array_t> make_serialized_message(std::string message)
std::shared_ptr<rcutils_uint8_array_t> make_serialized_message(std::string message)
{
int message_size = get_buffer_capacity(message);
message_size++; // need to account for terminating null character
assert(message_size > 0);

auto msg = new rcutils_char_array_t;
*msg = rcutils_get_zero_initialized_char_array();
auto ret = rcutils_char_array_init(msg, message_size, &allocator_);
auto msg = new rcutils_uint8_array_t;
*msg = rcutils_get_zero_initialized_uint8_array();
auto ret = rcutils_uint8_array_init(msg, message_size, &allocator_);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error("Error allocating resources " + std::to_string(ret));
}

auto serialized_data = std::shared_ptr<rcutils_char_array_t>(msg,
[](rcutils_char_array_t * msg) {
int error = rcutils_char_array_fini(msg);
auto serialized_data = std::shared_ptr<rcutils_uint8_array_t>(msg,
[](rcutils_uint8_array_t * msg) {
int error = rcutils_uint8_array_fini(msg);
delete msg;
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
Expand All @@ -73,12 +73,12 @@ class StorageTestFixture : public TemporaryDirectoryFixture
return serialized_data;
}

std::string deserialize_message(std::shared_ptr<rcutils_char_array_t> serialized_message)
std::string deserialize_message(std::shared_ptr<rcutils_uint8_array_t> serialized_message)
{
char * copied = new char[serialized_message->buffer_length];
uint8_t * copied = new uint8_t[serialized_message->buffer_length];
auto string_length = serialized_message->buffer_length - 8;
memcpy(copied, &serialized_message->buffer[8], string_length);
std::string message_content(copied);
std::string message_content(reinterpret_cast<char *>(copied));
// cppcheck-suppress mismatchAllocDealloc ; complains about "copied" but used new[] and delete[]
delete[] copied;
return message_content;
Expand Down Expand Up @@ -129,11 +129,11 @@ class StorageTestFixture : public TemporaryDirectoryFixture
}

int write_data_to_serialized_string_message(
char * buffer, size_t buffer_capacity, const std::string & message)
uint8_t * buffer, size_t buffer_capacity, const std::string & message)
{
// This function also writes the final null charachter, which is absent in the CDR format.
// Here this behaviour is ok, because we only test test writing and reading from/to sqlite.
return rcutils_snprintf(buffer,
return rcutils_snprintf(reinterpret_cast<char *>(buffer),
buffer_capacity,
"%c%c%c%c%c%c%c%c%s",
0x00, 0x01, 0x00, 0x00, 0x0f, 0x00, 0x00, 0x00,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ TEST_F(SqliteWrapperTestFixture, ros_specific_types_are_supported_for_reading_an
db_.prepare_statement("CREATE TABLE test (timestamp INTEGER, data BLOB);")->execute_and_reset();
rcutils_time_point_value_t time = 1099511627783;
auto msg_content = "message";
std::shared_ptr<rcutils_char_array_t> message = make_serialized_message(msg_content);
std::shared_ptr<rcutils_uint8_array_t> message = make_serialized_message(msg_content);

db_.prepare_statement("INSERT INTO test (timestamp, data) VALUES (?, ?);")
->bind(time, message)->execute_and_reset();
auto row_iter = db_.prepare_statement("SELECT timestamp, data FROM test")
->execute_query<rcutils_time_point_value_t, std::shared_ptr<rcutils_char_array_t>>().begin();
->execute_query<rcutils_time_point_value_t, std::shared_ptr<rcutils_uint8_array_t>>().begin();

ASSERT_THAT(std::get<0>(*row_iter), Eq(time));
ASSERT_THAT(deserialize_message(std::get<1>(*row_iter)), StrEq(msg_content));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,9 @@ class MemoryManagement
return message;
}

std::shared_ptr<rcutils_char_array_t> make_initialized_message()
std::shared_ptr<rmw_serialized_message_t> make_initialized_message()
{
auto msg = new rcutils_char_array_t;
*msg = rcutils_get_zero_initialized_char_array();
auto ret = rcutils_char_array_init(msg, 0, &rcutils_allocator_);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error("Error allocating resources for serialized message: " +
std::string(rcutils_get_error_string().str));
}

auto serialized_message = std::shared_ptr<rcutils_char_array_t>(msg,
[](rcutils_char_array_t * msg) {
int error = rcutils_char_array_fini(msg);
delete msg;
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED("rosbag2_test_common",
"Leaking memory. Error: %s", rcutils_get_error_string().str);
}
});

return serialized_message;
return get_initialized_serialized_message(0);
}

private:
Expand All @@ -99,16 +81,16 @@ class MemoryManagement
get_initialized_serialized_message(size_t capacity)
{
auto msg = new rmw_serialized_message_t;
*msg = rcutils_get_zero_initialized_char_array();
auto ret = rcutils_char_array_init(msg, capacity, &rcutils_allocator_);
*msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error("Error allocating resources for serialized message: " +
std::string(rcutils_get_error_string().str));
}

auto serialized_message = std::shared_ptr<rmw_serialized_message_t>(msg,
[](rmw_serialized_message_t * msg) {
int error = rcutils_char_array_fini(msg);
int error = rmw_serialized_message_fini(msg);
delete msg;
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED("rosbag2_test_common", "Leaking memory. Error: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class SubscriptionManager
subscriptions_.push_back(
subscriber_node_->create_subscription<MessageT>(
topic_name,
[this, topic_name](std::shared_ptr<rcutils_char_array_t> msg) {
[this, topic_name](std::shared_ptr<rmw_serialized_message_t> msg) {
subscribed_messages_[topic_name].push_back(msg);
}, qos_profile));
}
Expand Down Expand Up @@ -88,7 +88,8 @@ class SubscriptionManager
}

std::vector<rclcpp::SubscriptionBase::SharedPtr> subscriptions_;
std::map<std::string, std::vector<std::shared_ptr<rcutils_char_array_t>>> subscribed_messages_;
std::map<std::string,
std::vector<std::shared_ptr<rmw_serialized_message_t>>> subscribed_messages_;
std::map<std::string, size_t> expected_topics_with_size_;
rclcpp::Node::SharedPtr subscriber_node_;
MemoryManagement memory_management_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ GenericSubscription::GenericSubscription(
std::shared_ptr<rcl_node_t> node_handle,
const rosidl_message_type_support_t & ts,
const std::string & topic_name,
std::function<void(std::shared_ptr<rcutils_char_array_t>)> callback)
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback)
: SubscriptionBase(
node_handle,
ts,
Expand All @@ -54,13 +54,13 @@ void GenericSubscription::handle_message(
std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
{
(void) message_info;
auto typed_message = std::static_pointer_cast<rcutils_char_array_t>(message);
auto typed_message = std::static_pointer_cast<rmw_serialized_message_t>(message);
callback_(typed_message);
}

void GenericSubscription::return_message(std::shared_ptr<void> & message)
{
auto typed_message = std::static_pointer_cast<rcutils_char_array_t>(message);
auto typed_message = std::static_pointer_cast<rmw_serialized_message_t>(message);
return_serialized_message(typed_message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase
std::shared_ptr<rcl_node_t> node_handle,
const rosidl_message_type_support_t & ts,
const std::string & topic_name,
std::function<void(std::shared_ptr<rcutils_char_array_t>)> callback);
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback);

// Same as create_serialized_message() as the subscription is to serialized_messages only
std::shared_ptr<void> create_message() override;
Expand Down Expand Up @@ -77,7 +77,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase

std::shared_ptr<rmw_serialized_message_t> borrow_serialized_message(size_t capacity);
rcutils_allocator_t default_allocator_;
std::function<void(std::shared_ptr<rcutils_char_array_t>)> callback_;
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback_;
};

} // namespace rosbag2_transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class RosBag2NodeFixture : public Test
return messages;
}

std::shared_ptr<rcutils_char_array_t> serialize_string_message(const std::string & message)
std::shared_ptr<rmw_serialized_message_t> serialize_string_message(const std::string & message)
{
auto string_message = std::make_shared<test_msgs::msg::Primitives>();
string_message->string_value = message;
Expand Down

0 comments on commit 4ac9de9

Please sign in to comment.