Skip to content

Commit

Permalink
Delete blob files only after destroy column handle (#6)
Browse files Browse the repository at this point in the history
As Rocksdb described, `DropColumnFamilies()` only records the drop of the column family specified by ColumnFamilyHandle. The actual data is not deleted until the client calls `delete column_family`, namely `DestroyColumnFamilyHandle()`. We can still continue using the column family if we have outstanding ColumnFamilyHandle pointer.	

now there are some problems:

- after dropping column family, the blob files of it may be deleted before column family handle is destroyed.
- When opening DB without some existing column families(not supported yet), the blob file of these will be regarded as obsolete files and deleted. 
- After destroyed column handle(not drop the column, just not need to read/write it anymore), we need to remove inner blob storage and evict related cache.
  • Loading branch information
Connor1996 authored and yiwu-arbug committed May 14, 2019
1 parent b552d16 commit 05a3755
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 164 deletions.
8 changes: 5 additions & 3 deletions include/titan/db.h
Expand Up @@ -72,24 +72,26 @@ class TitanDB : public StackableDB {
return DropColumnFamilies({handle});
}

virtual Status DropColumnFamilies(
Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) override = 0;

Status DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) override = 0;

using StackableDB::Merge;
Status Merge(const WriteOptions&, ColumnFamilyHandle*, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("TitanDB doesn't support this operation");
}

using rocksdb::StackableDB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& /*wopts*/,
Status SingleDelete(const WriteOptions& /*wopts*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in titan db.");
}

using rocksdb::StackableDB::CompactFiles;
virtual Status CompactFiles(
Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names, const int output_level,
Expand Down
2 changes: 1 addition & 1 deletion src/blob_gc_picker_test.cc
Expand Up @@ -22,7 +22,7 @@ class BlobGCPickerTest : public testing::Test {
const TitanCFOptions& titan_cf_options) {
auto blob_file_cache = std::make_shared<BlobFileCache>(
titan_db_options, titan_cf_options, NewLRUCache(128));
blob_storage_.reset(new BlobStorage(titan_cf_options, blob_file_cache));
blob_storage_.reset(new BlobStorage(titan_db_options, titan_cf_options, blob_file_cache));
basic_blob_gc_picker_.reset(new BasicBlobGCPicker(titan_db_options, titan_cf_options));
}

Expand Down
40 changes: 34 additions & 6 deletions src/blob_storage.cc
Expand Up @@ -47,12 +47,40 @@ void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
files_.emplace(std::make_pair(file->file_number(), file));
}

void BlobStorage::DeleteBlobFile(uint64_t file) {
{
WriteLock wl(&mutex_);
files_.erase(file);
void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence) {
WriteLock wl(&mutex_);
obsolete_files_.push_back(std::make_pair(file->file_number(), obsolete_sequence));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
}

void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence) {
WriteLock wl(&mutex_);

for (auto it = obsolete_files_.begin(); it != obsolete_files_.end();) {
auto& file_number = it->first;
auto& obsolete_sequence = it->second;
// We check whether the oldest snapshot is no less than the last sequence
// by the time the blob file become obsolete. If so, the blob file is not
// visible to all existing snapshots.
if (oldest_sequence > obsolete_sequence) {
// remove obsolete files
files_.erase(file_number);
file_cache_->Evict(file_number);

ROCKS_LOG_INFO(db_options_.info_log,
"Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
") not visible to oldest snapshot %" PRIu64 ", delete it.",
file_number, obsolete_sequence, oldest_sequence);
if (obsolete_files) {
obsolete_files->emplace_back(
BlobFileName(db_options_.dirname, file_number));
}

it = obsolete_files_.erase(it);
continue;
}
++it;
}
file_cache_->Evict(file);
}

void BlobStorage::ComputeGCScore() {
Expand All @@ -69,7 +97,7 @@ void BlobStorage::ComputeGCScore() {
auto& gcs = gc_score_.back();
gcs.file_number = file.first;
if (file.second->file_size() <
titan_cf_options_.merge_small_file_threshold) {
cf_options_.merge_small_file_threshold) {
gcs.score = 1;
} else {
gcs.score = file.second->GetDiscardableRatio();
Expand Down
42 changes: 34 additions & 8 deletions src/blob_storage.h
@@ -1,5 +1,7 @@
#pragma once

#include <inttypes.h>

#include "rocksdb/options.h"
#include "blob_file_cache.h"
#include "blob_format.h"
Expand All @@ -12,15 +14,22 @@ namespace titandb {
// column family. The version must be valid when this storage is used.
class BlobStorage {
public:
BlobStorage(const BlobStorage& bs) : mutex_() {
BlobStorage(const BlobStorage& bs) : destroyed_(false) {
this->files_ = bs.files_;
this->file_cache_ = bs.file_cache_;
this->titan_cf_options_ = bs.titan_cf_options_;
this->db_options_ = bs.db_options_;
this->cf_options_ = bs.cf_options_;
}

BlobStorage(const TitanCFOptions& _options,
BlobStorage(const TitanDBOptions& _db_options, const TitanCFOptions& _cf_options,
std::shared_ptr<BlobFileCache> _file_cache)
: titan_cf_options_(_options), mutex_(), file_cache_(_file_cache) {}
: db_options_(_db_options), cf_options_(_cf_options), file_cache_(_file_cache), destroyed_(false) {}

~BlobStorage() {
for (auto& file: files_) {
file_cache_->Evict(file.second->file_number());
}
}

// Gets the blob record pointed by the blob index. The provided
// buffer is used to store the record data, so the buffer must be
Expand Down Expand Up @@ -48,19 +57,30 @@ class BlobStorage {
WriteLock wl(&mutex_);
for (auto& file : files_) {
file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
// file.second->marked_for_gc_ = true;
}
}

void MarkDestroyed() {
WriteLock wl(&mutex_);
destroyed_ = true;
}

bool MaybeRemove() const {
ReadLock rl(&mutex_);
return destroyed_ && obsolete_files_.empty();
}

const std::vector<GCScore> gc_score() { return gc_score_; }

void ComputeGCScore();

const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }
const TitanCFOptions& cf_options() { return cf_options_; }

void AddBlobFile(std::shared_ptr<BlobFileMeta>& file);

void DeleteBlobFile(uint64_t file);
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence);

void MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence);

private:
friend class VersionSet;
Expand All @@ -69,7 +89,8 @@ class BlobStorage {
friend class BlobGCJobTest;
friend class BlobFileSizeCollectorTest;

TitanCFOptions titan_cf_options_;
TitanDBOptions db_options_;
TitanCFOptions cf_options_;

// Read Write Mutex, which protects the `files_` structures
mutable port::RWMutex mutex_;
Expand All @@ -79,6 +100,11 @@ class BlobStorage {
std::shared_ptr<BlobFileCache> file_cache_;

std::vector<GCScore> gc_score_;

std::list<std::pair<uint64_t, SequenceNumber>> obsolete_files_;
// It is marked when the column family handle is destroyed, indicating the
// in-memory data structure can be destroyed. Physical files may still be kept.
bool destroyed_;
};

} // namespace titandb
Expand Down
34 changes: 18 additions & 16 deletions src/db_impl.cc
Expand Up @@ -270,15 +270,15 @@ Status TitanDBImpl::CreateColumnFamilies(
base_descs.emplace_back(desc.name, options);
}

MutexLock l(&mutex_);

Status s = db_impl_->CreateColumnFamilies(base_descs, handles);
assert(handles->size() == descs.size());

if (s.ok()) {
std::map<uint32_t, TitanCFOptions> column_families;
for (size_t i = 0; i < descs.size(); i++) {
column_families.emplace((*handles)[i]->GetID(), descs[i].options);
}
MutexLock l(&mutex_);
vset_->AddColumnFamilies(column_families);
}
return s;
Expand All @@ -287,24 +287,26 @@ Status TitanDBImpl::CreateColumnFamilies(
Status TitanDBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) {
std::vector<uint32_t> column_families;
std::vector<ColumnFamilyData*> cfds;
for (auto& handle : handles) {
column_families.push_back(handle->GetID());
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
cfds.push_back(cfd);
for (auto& handle: handles) {
column_families.emplace_back(handle->GetID());
}

MutexLock l(&mutex_);

// TODO:
// As rocksdb described, `DropColumnFamilies()` only records the drop of the column family specified by ColumnFamilyHandle.
// The actual data is not deleted until the client calls `delete column_family`, namely `DestroyColumnFamilyHandle()`.
// We can still continue using the column family if we have outstanding ColumnFamilyHandle pointer.
// So we should delete blob files in `DestroyColumnFamilyHandle()` but not here.
Status s = db_impl_->DropColumnFamilies(handles);
if (s.ok()) {
MutexLock l(&mutex_);
SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber();
vset_->DropColumnFamilies(column_families, obsolete_sequence);
s = vset_->DropColumnFamilies(column_families, obsolete_sequence);
}
return s;
}

Status TitanDBImpl::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
auto cf_id = column_family->GetID();
Status s = db_impl_->DestroyColumnFamilyHandle(column_family);

if (s.ok()) {
MutexLock l(&mutex_);
// it just changes some marks and doesn't delete blob files physically.
vset_->DestroyColumnFamily(cf_id);
}
return s;
}
Expand Down
2 changes: 2 additions & 0 deletions src/db_impl.h
Expand Up @@ -28,6 +28,8 @@ class TitanDBImpl : public TitanDB {
Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) override;

Status DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) override;

using TitanDB::CompactFiles;
Status CompactFiles(
const CompactionOptions& compact_options,
Expand Down
43 changes: 16 additions & 27 deletions src/db_impl_files.cc
Expand Up @@ -5,39 +5,28 @@ namespace titandb {

void TitanDBImpl::PurgeObsoleteFiles() {
Status s;
ObsoleteFiles obsolete_files;
std::vector<std::string> candidate_files;
auto oldest_sequence = GetOldestSnapshotSequence();
{
MutexLock l(&mutex_);
vset_->GetObsoleteFiles(&obsolete_files, oldest_sequence);
vset_->GetObsoleteFiles(&candidate_files, oldest_sequence);
}

{
std::vector<std::string> candidate_files;
for (auto& blob_file : obsolete_files.blob_files) {
candidate_files.emplace_back(
BlobFileName(db_options_.dirname, std::get<0>(blob_file)));
}
for (auto& manifest : obsolete_files.manifests) {
candidate_files.emplace_back(std::move(manifest));
}

// dedup state.inputs so we don't try to delete the same
// file twice
std::sort(candidate_files.begin(), candidate_files.end());
candidate_files.erase(
std::unique(candidate_files.begin(), candidate_files.end()),
candidate_files.end());
// dedup state.inputs so we don't try to delete the same
// file twice
std::sort(candidate_files.begin(), candidate_files.end());
candidate_files.erase(
std::unique(candidate_files.begin(), candidate_files.end()),
candidate_files.end());

for (const auto& candidate_file : candidate_files) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan deleting obsolete file [%s]",
candidate_file.c_str());
s = env_->DeleteFile(candidate_file);
if (!s.ok()) {
fprintf(stderr, "Titan deleting file [%s] failed, status:%s",
candidate_file.c_str(), s.ToString().c_str());
abort();
}
for (const auto& candidate_file : candidate_files) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan deleting obsolete file [%s]",
candidate_file.c_str());
s = env_->DeleteFile(candidate_file);
if (!s.ok()) {
fprintf(stderr, "Titan deleting file [%s] failed, status:%s",
candidate_file.c_str(), s.ToString().c_str());
abort();
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/db_impl_gc.cc
Expand Up @@ -69,9 +69,9 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
uint32_t column_family_id = PopFirstFromGCQueue();

auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
const auto& titan_cf_options = bs->titan_cf_options();
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, titan_cf_options);
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options);
blob_gc = blob_gc_picker->PickBlobGC(bs);

if (blob_gc) {
Expand Down
23 changes: 22 additions & 1 deletion src/titan_db_test.cc
Expand Up @@ -53,6 +53,7 @@ class TitanDBTest : public testing::Test {
}
cf_handles_.clear();
ASSERT_OK(TitanDB::Open(db_options, dbname_, descs, &cf_handles_, &db_));
db_impl_ = reinterpret_cast<TitanDBImpl*>(db_);
}
}

Expand Down Expand Up @@ -120,6 +121,8 @@ class TitanDBTest : public testing::Test {
}

void VerifyDB(const std::map<std::string, std::string>& data, ReadOptions ropts = ReadOptions()) {
db_impl_->PurgeObsoleteFiles();

for (auto& kv : data) {
std::string value;
ASSERT_OK(db_->Get(ropts, kv.first, &value));
Expand Down Expand Up @@ -378,7 +381,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) {
}
}

TEST_F(TitanDBTest, ReadAfterDropCF) {
TEST_F(TitanDBTest, DropColumnFamily) {
Open();
const uint64_t kNumCF = 3;
for(uint64_t i = 1; i <= kNumCF; i++) {
Expand All @@ -392,10 +395,28 @@ TEST_F(TitanDBTest, ReadAfterDropCF) {
VerifyDB(data);
Flush();
VerifyDB(data);

// Destroy column families handle, check whether the data is preserved after a round of GC and restart.
for (auto& handle : cf_handles_) {
db_->DestroyColumnFamilyHandle(handle);
}
cf_handles_.clear();
VerifyDB(data);
Reopen();
VerifyDB(data);

for(auto& handle : cf_handles_) {
// we can't drop default column family
if (handle->GetName() == kDefaultColumnFamilyName) {
continue;
}
ASSERT_OK(db_->DropColumnFamily(handle));
// The data is actually deleted only after destroying all outstanding column family handles,
// so we can still read from the dropped column family.
VerifyDB(data);
}

Close();
}

#ifndef NDEBUG
Expand Down
1 change: 1 addition & 0 deletions src/version_edit.cc
Expand Up @@ -24,6 +24,7 @@ void VersionEdit::EncodeTo(std::string* dst) const {
file->EncodeTo(dst);
}
for (auto& file : deleted_files_) {
// obsolete sequence is a inpersistent field, so no need to encode it.
PutVarint32Varint64(dst, kDeletedBlobFile, file.first);
}
}
Expand Down

0 comments on commit 05a3755

Please sign in to comment.