Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Compressor interface #584

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ set(SOURCES
util/comparator.cc
util/compression.cc
util/compression_context_cache.cc
util/compressor.cc
util/concurrent_task_limiter_impl.cc
util/crc32c.cc
util/data_structure.cc
Expand Down Expand Up @@ -1493,6 +1494,7 @@ if(WITH_TESTS)
util/autovector_test.cc
util/bloom_test.cc
util/coding_test.cc
util/compression_test.cc
util/crc32c_test.cc
util/defer_test.cc
util/dynamic_bloom_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,9 @@ cache_test: $(OBJ_DIR)/cache/cache_test.o $(TEST_LIBRARY) $(LIBRARY)
coding_test: $(OBJ_DIR)/util/coding_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

hash_test: $(OBJ_DIR)/util/hash_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/comparator.cc",
"util/compression.cc",
"util/compression_context_cache.cc",
"util/compressor.cc",
"util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc",
"util/crc32c_arm64.cc",
Expand Down Expand Up @@ -4667,6 +4668,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="compression_test",
srcs=["util/compression_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="configurable_test",
srcs=["options/configurable_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
29 changes: 14 additions & 15 deletions cache/compressed_secondary_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,16 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
s = helper->create_cb(Slice(ptr->get(), handle_value_charge),
create_context, allocator, &value, &charge);
} else {
UncompressionContext uncompression_context(cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);
auto compressor =
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(),
cache_options_.compress_format_version,
allocator);

size_t uncompressed_size{0};
CacheAllocationPtr uncompressed = UncompressData(
uncompression_info, (char*)ptr->get(), handle_value_charge,
&uncompressed_size, cache_options_.compress_format_version, allocator);
CacheAllocationPtr uncompressed = uncompression_info.UncompressData(
compressor.get(), (char*)ptr->get(), handle_value_charge,
&uncompressed_size);

if (!uncompressed) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
Expand Down Expand Up @@ -151,17 +152,15 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
if (cache_options_.compression_type != kNoCompression &&
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
CompressionOptions compression_opts;
CompressionContext compression_context(cache_options_.compression_type,
compression_opts);
auto compressor =
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
uint64_t sample_for_compression{0};
CompressionInfo compression_info(
compression_opts, compression_context, CompressionDict::GetEmptyDict(),
cache_options_.compression_type, sample_for_compression);
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
cache_options_.compress_format_version,
sample_for_compression);

bool success =
CompressData(val, compression_info,
cache_options_.compress_format_version, &compressed_val);
compression_info.CompressData(compressor.get(), val, &compressed_val);

if (!success) {
return Status::Corruption("Error compressing value.");
Expand Down
1 change: 1 addition & 0 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/string_util.h"
#include "util/user_comparator_wrapper.h"

namespace ROCKSDB_NAMESPACE {
Expand Down
26 changes: 11 additions & 15 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
blob_compressor_(mutable_cf_options->blob_compressor),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
db_id_(std::move(db_id)),
Expand All @@ -91,6 +91,10 @@ BlobFileBuilder::BlobFileBuilder(
assert(blob_file_paths_->empty());
assert(blob_file_additions_);
assert(blob_file_additions_->empty());

if (blob_compressor_ == nullptr) {
blob_compressor_ = BuiltinCompressor::GetCompressor(kNoCompression);
}
}

BlobFileBuilder::~BlobFileBuilder() = default;
Expand Down Expand Up @@ -150,7 +154,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
}

BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
blob_compressor_->GetCompressionType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking through the code, I am not sure if blob_compressor_ is always set. Do you know if that is true? Should there be an assert somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blob_compressor_ is initialized in BlobFileBuilder's constructor from mutable_cf_options.blob_compressor. The default constructor of MutableCFOptions sets blob_compressor_ to nullptr.
I'm not sure if we can ever run into a situation where blob_compressor_ is left null, but I'll initialize it to NoCompressor in the BlobFileBuilder's constructor to be safe. This better aligns with the previous code initializing to kNoCompression by default.


return Status::OK();
}
Expand Down Expand Up @@ -227,7 +231,8 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;

BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
BlobLogHeader header(column_family_id_,
blob_compressor_->GetCompressionType(), has_ttl,
expiration_range);

{
Expand Down Expand Up @@ -255,27 +260,18 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
assert(compressed_blob->empty());
assert(immutable_options_);

if (blob_compression_type_ == kNoCompression) {
if (blob_compressor_->GetCompressionType() == kNoCompression) {
return Status::OK();
}

// TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
CompressionOptions opts;
CompressionContext context(blob_compression_type_, opts);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
blob_compression_type_, sample_for_compression);

constexpr uint32_t compression_format_version = 2;
CompressionInfo info;

bool success = false;

{
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
BLOB_DB_COMPRESSION_MICROS);
success =
CompressData(*blob, info, compression_format_version, compressed_blob);
success = info.CompressData(blob_compressor_.get(), *blob, compressed_blob);
}

if (!success) {
Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -89,7 +90,7 @@ class BlobFileBuilder {
const ImmutableOptions* immutable_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;
CompressionType blob_compression_type_;
std::shared_ptr<Compressor> blob_compressor_;
PrepopulateBlobCache prepopulate_blob_cache_;
const FileOptions* file_options_;
const std::string db_id_;
Expand Down
12 changes: 4 additions & 8 deletions db/blob/blob_file_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,16 +405,12 @@ TEST_F(BlobFileBuilderTest, Compression) {
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);

CompressionOptions opts;
CompressionContext context(kSnappyCompression, opts);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
kSnappyCompression, sample_for_compression);
auto compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
ASSERT_NE(compressor, nullptr);

std::string compressed_value;
ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(),
uncompressed_value.size(), &compressed_value));
ASSERT_OK(compressor->Compress(CompressionInfo(), uncompressed_value,
&compressed_value));

ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
BlobLogRecord::kHeaderSize + key_size + compressed_value.size());
Expand Down
45 changes: 21 additions & 24 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ Status BlobFileReader::Create(

Statistics* const statistics = immutable_options.stats;

CompressionType compression_type = kNoCompression;
std::shared_ptr<Compressor> compressor;

{
const Status s =
ReadHeader(file_reader.get(), read_options, column_family_id,
statistics, &compression_type);
const Status s = ReadHeader(file_reader.get(), read_options,
column_family_id, statistics, &compressor);
if (!s.ok()) {
return s;
}
Expand All @@ -70,7 +69,7 @@ Status BlobFileReader::Create(
}

blob_file_reader->reset(
new BlobFileReader(std::move(file_reader), file_size, compression_type,
new BlobFileReader(std::move(file_reader), file_size, compressor,
immutable_options.clock, statistics));

return Status::OK();
Expand Down Expand Up @@ -140,9 +139,9 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint32_t column_family_id,
Statistics* statistics,
CompressionType* compression_type) {
std::shared_ptr<Compressor>* compressor) {
assert(file_reader);
assert(compression_type);
assert(compressor);

Slice header_slice;
Buffer buf;
Expand Down Expand Up @@ -184,7 +183,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
return Status::Corruption("Column family ID mismatch");
}

*compression_type = header.compression;
*compressor = BuiltinCompressor::GetCompressor(header.compression);

return Status::OK();
}
Expand Down Expand Up @@ -281,11 +280,11 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,

BlobFileReader::BlobFileReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
CompressionType compression_type, SystemClock* clock,
const std::shared_ptr<Compressor>& compressor, SystemClock* clock,
Statistics* statistics)
: file_reader_(std::move(file_reader)),
file_size_(file_size),
compression_type_(compression_type),
compressor_(compressor),
clock_(clock),
statistics_(statistics) {
assert(file_reader_);
Expand All @@ -295,7 +294,7 @@ BlobFileReader::~BlobFileReader() = default;

Status BlobFileReader::GetBlob(
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
uint64_t value_size, CompressionType compression_type,
uint64_t value_size, const std::shared_ptr<Compressor>& compressor,
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
assert(result);
Expand All @@ -306,7 +305,7 @@ Status BlobFileReader::GetBlob(
return Status::Corruption("Invalid blob offset");
}

if (compression_type != compression_type_) {
if (compressor->GetCompressionType() != compressor_->GetCompressionType()) {
return Status::Corruption("Compression type mismatch when reading blob");
}

Expand Down Expand Up @@ -374,7 +373,7 @@ Status BlobFileReader::GetBlob(

{
const Status s = UncompressBlobIfNeeded(
value_slice, compression_type, allocator, clock_, statistics_, result);
value_slice, compressor.get(), allocator, clock_, statistics_, result);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -420,7 +419,8 @@ void BlobFileReader::MultiGetBlob(
*req->status = Status::Corruption("Invalid blob offset");
continue;
}
if (req->compression != compression_type_) {
if (req->compressor->GetCompressionType() !=
compressor_->GetCompressionType()) {
*req->status =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
Expand Down Expand Up @@ -522,7 +522,7 @@ void BlobFileReader::MultiGetBlob(
// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], req->len);
*req->status =
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
UncompressBlobIfNeeded(value_slice, compressor_.get(), allocator,
clock_, statistics_, &blob_reqs[i].second);
if (req->status->ok()) {
total_bytes += record_slice.size();
Expand Down Expand Up @@ -579,31 +579,28 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
}

Status BlobFileReader::UncompressBlobIfNeeded(
const Slice& value_slice, CompressionType compression_type,
const Slice& value_slice, Compressor* compressor,
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
std::unique_ptr<BlobContents>* result) {
assert(compressor);
assert(result);

if (compression_type == kNoCompression) {
if (compressor->GetCompressionType() == kNoCompression) {
BlobContentsCreator::Create(result, nullptr, value_slice, allocator);
return Status::OK();
}

UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
UncompressionInfo info;

size_t uncompressed_size = 0;
constexpr uint32_t compression_format_version = 2;

CacheAllocationPtr output;

{
PERF_TIMER_GUARD(blob_decompress_time);
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
output = UncompressData(info, value_slice.data(), value_slice.size(),
&uncompressed_size, compression_format_version,
allocator);
output = info.UncompressData(compressor, value_slice.data(),
value_slice.size(), &uncompressed_size);
}

TEST_SYNC_POINT_CALLBACK(
Expand Down