Skip to content

Commit

Permalink
Pass parsed user key to prefix extractor in V2 compaction
Browse files Browse the repository at this point in the history
Previously, the prefix extractor was being supplied with the RocksDB
key instead of a parsed user key. This makes correct interpretation
by calling application fragile or impossible.
  • Loading branch information
Spencer Kimball committed Aug 12, 2014
1 parent 54153ab commit 3fcf7b2
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
30 changes: 28 additions & 2 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,29 @@ static void CompactionFilterV2Filter(
}
}

// Custom prefix extractor for compaction filter V2 which extracts first 3 characters.
static void CFV2PrefixExtractorDestroy(void* arg) { }
static char* CFV2PrefixExtractorTransform(void* arg, const char* key, size_t length, size_t* dst_length) {
// Verify keys are maximum length 4; this verifies fix for a
// prior bug which was passing the RocksDB-encoded key with
// logical timestamp suffix instead of parsed user key.
if (length > 4) {
fprintf(stderr, "%s:%d: %s: key %s is not user key\n", __FILE__, __LINE__, phase, key);
abort();
}
*dst_length = length < 3 ? length : 3;
return (char*)key;
}
static unsigned char CFV2PrefixExtractorInDomain(void* state, const char* key, size_t length) {
return 1;
}
static unsigned char CFV2PrefixExtractorInRange(void* state, const char* key, size_t length) {
return 1;
}
static const char* CFV2PrefixExtractorName(void* state) {
return "TestCFV2PrefixExtractor";
}

// Custom compaction filter factory V2.
static void CompactionFilterFactoryV2Destroy(void* arg) {
rocksdb_slicetransform_destroy((rocksdb_slicetransform_t*)arg);
Expand Down Expand Up @@ -585,9 +608,12 @@ int main(int argc, char** argv) {
{
rocksdb_compactionfilterfactoryv2_t* factory;
rocksdb_slicetransform_t* prefix_extractor;
prefix_extractor = rocksdb_slicetransform_create_fixed_prefix(3);
prefix_extractor = rocksdb_slicetransform_create(
NULL, CFV2PrefixExtractorDestroy, CFV2PrefixExtractorTransform,
CFV2PrefixExtractorInDomain, CFV2PrefixExtractorInRange,
CFV2PrefixExtractorName);
factory = rocksdb_compactionfilterfactoryv2_create(
prefix_extractor, prefix_extractor, CompactionFilterFactoryV2Destroy,
NULL, prefix_extractor, CompactionFilterFactoryV2Destroy,
CompactionFilterFactoryV2Create, CompactionFilterFactoryV2Name);
// Create new database
rocksdb_close(db);
Expand Down
14 changes: 7 additions & 7 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3000,19 +3000,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
Slice key = backup_input->key();
Slice value = backup_input->value();

const SliceTransform* transformer =
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(key);
if (!prefix_initialized) {
compact->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(options_.info_log, "[%s] Failed to parse key: %s",
cfd->GetName().c_str(), key.ToString().c_str());
continue;
} else {
const SliceTransform* transformer =
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(ikey.user_key);
if (!prefix_initialized) {
compact->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
// If the prefix remains the same, keep buffering
if (key_prefix.compare(Slice(compact->cur_prefix_)) == 0) {
// Apply the compaction filter V2 to all the kv pairs sharing
Expand Down

0 comments on commit 3fcf7b2

Please sign in to comment.