Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compression - 5] Add Zstd file compression #220

Merged
merged 25 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions rosbag2_compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,45 @@ find_package(ament_cmake REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rosbag2_storage REQUIRED)
find_package(zstd_vendor REQUIRED)

add_library(${PROJECT_NAME}_zstd
SHARED
src/rosbag2_compression/zstd_compressor.cpp)
target_include_directories(${PROJECT_NAME}_zstd
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
${zstd_vendor_INCLUDE_DIRS})
target_link_libraries(${PROJECT_NAME}_zstd ${zstd_vendor_LIBRARIES})
ament_target_dependencies(${PROJECT_NAME}_zstd
rcpputils
rcutils
rosbag2_storage)
# Causes the visibility macros to use dllexport rather than dllimport.
# This is appropriate when building the dll but not consuming it.
target_compile_definitions(${PROJECT_NAME}_zstd
PRIVATE
ROSBAG2_COMPRESSION_BUILDING_DLL)

install(
DIRECTORY include/
DESTINATION include)

install(
TARGETS ${PROJECT_NAME}_zstd
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION bin)

ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME}_zstd)
ament_export_dependencies(rosbag2_storage rcutils zstd_vendor zstd)

if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
find_package(ament_lint_auto REQUIRED)
find_package(rosbag2_test_common REQUIRED)
ament_lint_auto_find_test_dependencies()

add_library(fake_compressor SHARED
Expand Down Expand Up @@ -66,6 +95,12 @@ if(BUILD_TESTING)
ament_add_gmock(test_decompressor
test/rosbag2_compression/test_decompressor.cpp)
target_link_libraries(test_decompressor fake_decompressor)

ament_add_gmock(test_zstd_compressor
test/rosbag2_compression/test_zstd_compressor.cpp)
target_include_directories(test_zstd_compressor PUBLIC include)
target_link_libraries(test_zstd_compressor ${PROJECT_NAME}_zstd)
ament_target_dependencies(test_zstd_compressor rosbag2_test_common rosbag2_storage)
endif()

ament_package()
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_COMPRESSION__ZSTD_COMPRESSOR_HPP_
#define ROSBAG2_COMPRESSION__ZSTD_COMPRESSOR_HPP_

#include <zstd.h>

#include <fstream>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>

#include "rosbag2_compression/base_compressor_interface.hpp"
#include "rosbag2_compression/visibility_control.hpp"

namespace rosbag2_compression
{

class ROSBAG2_COMPRESSION_PUBLIC ZstdCompressor : public BaseCompressorInterface
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this discussion on the previous PR for the compression interfaces, but I don't see the benefit of splitting compression and decompression in two interfaces. So in this case, on the compression interface is implemented (AFAICT), and I don't understand how users can actually use this as-is.
They are able to compress their bags now which makes them essentially unusable as no means of decompressing it is available. Do I understand this correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the benefit of splitting compression and decompression in two interfaces.

The reasoning for decoupling was quickly discussed here.

Decoupling allows us to perform specific actions for de/compression which can vary based on the library/algorithm we are using.

For example, we want to measure the performance of playback when we're decompressing without changing the implementation of the compression.

I don't understand how users can actually use this as-is.

As is, users can only compress, but the goal is to add decompression as well. We just want to keep PRs small and manageable. We haven't exposed this functionality yet since it's not complete and exposing it via the CLI will be the last step (similar to what was done with splitting).

They are able to compress their bags now which makes them essentially unusable as no means of decompressing it is available.

There is no means of decompression, but that's being added in the next PR.

{
public:
ZstdCompressor() = default;

~ZstdCompressor() = default;

std::string compress_uri(const std::string & uri) override;

void compress_serialized_bag_message(
rosbag2_storage::SerializedBagMessage * bag_message) override;

std::string get_compression_identifier() const override;
};

} // namespace rosbag2_compression

#endif // ROSBAG2_COMPRESSION__ZSTD_COMPRESSOR_HPP_
2 changes: 2 additions & 0 deletions rosbag2_compression/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
<depend>rcpputils</depend>
<depend>rcutils</depend>
<depend>rosbag2_storage</depend>
<depend>zstd_vendor</depend>

<test_depend>ament_cmake_gmock</test_depend>
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>rosbag2_test_common</test_depend>

<export>
<build_type>ament_cmake</build_type>
Expand Down
147 changes: 147 additions & 0 deletions rosbag2_compression/src/rosbag2_compression/zstd_compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <cstdio>
#include <memory>
#include <string>
#include <vector>

#include "rosbag2_compression/zstd_compressor.hpp"

#include "rosbag2_storage/filesystem_helper.hpp"

#include "logging.hpp"

namespace
{
// Increasing the compression level will:
// - Increase the time taken to compress
// - Decrease the size of the compressed data
// Setting to zero uses Zstd's default value of 3.
constexpr const int DEFAULT_ZSTD_COMPRESSION_LEVEL = 1;

constexpr const char COMPRESSION_IDENTIFIER[] = "zstd";

FILE * open_file(const std::string & uri, const std::string & read_mode)
{
FILE * fp{nullptr};
#ifdef _WIN32
fopen_s(&fp, uri.c_str(), read_mode.c_str());
#else
fp = std::fopen(uri.c_str(), read_mode.c_str());
#endif
return fp;
}

std::vector<uint8_t> get_input_buffer(
const std::string & uri,
size_t & decompressed_buffer_length)
{
// Get the file size
decompressed_buffer_length = rosbag2_storage::FilesystemHelper::get_file_size(uri);
// Read in buffer, handling accordingly
auto file_pointer = open_file(uri.c_str(), "rb");
if (file_pointer == nullptr) {
throw std::runtime_error("Error opening file");
}
// Allocate and read in
std::vector<uint8_t> decompressed_buffer;
decompressed_buffer.reserve(decompressed_buffer_length);
fread(decompressed_buffer.data(), sizeof(uint8_t), decompressed_buffer_length, file_pointer);
dirk-thomas marked this conversation as resolved.
Show resolved Hide resolved
if (ferror(file_pointer)) {
fclose(file_pointer);
throw std::runtime_error("Unable to read file");
}
fclose(file_pointer);
return decompressed_buffer;
}

void write_output_buffer(
const uint8_t * output_buffer,
const size_t output_buffer_length,
const std::string & uri)
{
auto file_pointer = open_file(uri.c_str(), "wb");
fwrite(output_buffer, sizeof(uint8_t), output_buffer_length, file_pointer);
if (ferror(file_pointer)) {
fclose(file_pointer);
throw std::runtime_error("Unable to write compressed file");
}
fclose(file_pointer);
}

void throw_on_zstd_error(const size_t compression_result)
{
if (ZSTD_isError(compression_result)) {
std::stringstream error;
error << "ZSTD compression error: " << ZSTD_getErrorName(compression_result);
throw std::runtime_error(error.str());
}
ROSBAG2_COMPRESSION_LOG_DEBUG("ZSTD compressed file.");
}

void print_compression_statistics(
std::chrono::high_resolution_clock::time_point start,
std::chrono::high_resolution_clock::time_point end,
size_t decompressed_size, size_t compressed_size)
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
{
const auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
const auto compression_ratio =
static_cast<double>(decompressed_size) / static_cast<double>(compressed_size);
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM(
"Compression statistics:\n" <<
"Time: " << duration.count() << " microseconds" <<
"Compression Ratio: " << compression_ratio
);
}
} // namespace

namespace rosbag2_compression
{

std::string ZstdCompressor::compress_uri(const std::string & uri)
{
const auto start = std::chrono::high_resolution_clock::now();
const auto compressed_uri = uri + "." + get_compression_identifier();
size_t decompressed_buffer_length{0};
const auto decompressed_buffer = get_input_buffer(uri, decompressed_buffer_length);
// Allocate based on compression bound and compress
const size_t compressed_buffer_length = ZSTD_compressBound(decompressed_buffer_length);
std::vector<uint8_t> compressed_buffer;
compressed_buffer.reserve(compressed_buffer_length);
// Perform compression and check.
// compression_result is either the actual compressed size or an error code.
const size_t compression_result = ZSTD_compress(
compressed_buffer.data(), compressed_buffer_length,
decompressed_buffer.data(), decompressed_buffer_length, DEFAULT_ZSTD_COMPRESSION_LEVEL);
throw_on_zstd_error(compression_result);
write_output_buffer(compressed_buffer.data(), compression_result, uri);
const auto end = std::chrono::high_resolution_clock::now();
print_compression_statistics(start, end, decompressed_buffer_length, compression_result);
return compressed_uri;
}

void ZstdCompressor::compress_serialized_bag_message(
rosbag2_storage::SerializedBagMessage *)
{
throw std::logic_error("Not implemented");
}

std::string ZstdCompressor::get_compression_identifier() const
{
return COMPRESSION_IDENTIFIER;
}

} // namespace rosbag2_compression
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fstream>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "rosbag2_compression/zstd_compressor.hpp"
#include "rosbag2_storage/filesystem_helper.hpp"
#include "rosbag2_test_common/temporary_directory_fixture.hpp"

#include "gmock/gmock.h"

namespace
{
constexpr const char * GARBAGE_STATEMENT = "garbage";
constexpr const int DEFAULT_GARBAGE_FILE_SIZE = 10; // MiB

/**
* Creates a text file of a certain size.
* \param uri File path to write file.
* \param size Size of file in MiB.
*/
void create_garbage_file(const std::string & uri, int size = DEFAULT_GARBAGE_FILE_SIZE)
{
std::ofstream out{uri};
if (!out) {
throw std::runtime_error("Unable to write garbage file.");
}
const auto file_size = size * 1024 * 1024;
const auto num_iterations = file_size / static_cast<int>(strlen(GARBAGE_STATEMENT));

for (int i = 0; i < num_iterations; i++) {
out << GARBAGE_STATEMENT;
}
}
} // namespace

class CompressionHelperFixture : public rosbag2_test_common::TemporaryDirectoryFixture
{
protected:
CompressionHelperFixture() = default;

void SetUp() override
{
rclcpp::init(0, nullptr);
}

void TearDown() override
{
rclcpp::shutdown();
}
};

TEST_F(CompressionHelperFixture, zstd_compress_file_uri)
{
const auto uri = rosbag2_storage::FilesystemHelper::concat({temporary_dir_path_, "file1.txt"});
create_garbage_file(uri);
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved
auto zstd_compressor = rosbag2_compression::ZstdCompressor();
auto compressed_uri = zstd_compressor.compress_uri(uri);
piraka9011 marked this conversation as resolved.
Show resolved Hide resolved

const auto expected_compressed_uri = uri + "." + zstd_compressor.get_compression_identifier();
const auto uncompressed_file_size = rosbag2_storage::FilesystemHelper::get_file_size(uri);
const auto compressed_file_size =
rosbag2_storage::FilesystemHelper::get_file_size(compressed_uri);

EXPECT_EQ(compressed_uri, expected_compressed_uri);
EXPECT_LT(compressed_file_size, uncompressed_file_size);
}
Loading