Permalink
Browse files

sync with upstream @ 21409451

Check the NEWS file for details of what changed.

git-svn-id: https://leveldb.googlecode.com/svn/trunk@28 62dab493-f737-651d-591e-8d6aee1b9529
  • Loading branch information...
1 parent 3c11133 commit da7990950787257cb312ca562ce5977749afc3e9 dgrogan@chromium.org committed May 21, 2011
View
17 NEWS
@@ -0,0 +1,17 @@
+Release 1.2 2011-05-16
+----------------------
+
+Fixes for larger databases (tested up to one billion 100-byte entries,
+i.e., ~100GB).
+
+(1) Place hard limit on number of level-0 files. This fixes errors
+of the form "too many open files".
+
+(2) Fixed memtable management. Before the fix, a heavy write burst
+could cause unbounded memory usage.
+
+A fix for a logging bug where the reader would incorrectly complain
+about corruption.
+
+Allow public access to WriteBatch contents so that users can easily
+wrap a DB.
View
66 db/db_bench.cc
@@ -24,9 +24,10 @@
// overwrite -- overwrite N values in random key order in async mode
// fillsync -- write N/100 values in random key order in sync mode
// fill100K -- write N/1000 100K values in random order in async mode
-// readseq -- read N values sequentially
-// readreverse -- read N values in reverse order
-// readrandom -- read N values in random order
+// readseq -- read N times sequentially
+// readreverse -- read N times in reverse order
+// readrandom -- read N times in random order
+// readhot -- read N times in random order from 1% section of DB
// crc32c -- repeated crc32c of 4K of data
// Meta operations:
// compact -- Compact the entire DB
@@ -54,6 +55,9 @@ static const char* FLAGS_benchmarks =
// Number of key/values to place in database
static int FLAGS_num = 1000000;
+// Number of read operations to do. If negative, do FLAGS_num reads.
+static int FLAGS_reads = -1;
+
// Size of each value
static int FLAGS_value_size = 100;
@@ -72,6 +76,14 @@ static int FLAGS_write_buffer_size = 0;
// Negative means use default settings.
static int FLAGS_cache_size = -1;
+// Maximum number of files to keep open at the same time (use default if == 0)
+static int FLAGS_open_files = 0;
+
+// If true, do not destroy the existing database. If you set this
+// flag and also specify a benchmark that wants a fresh database, that
+// benchmark will fail.
+static bool FLAGS_use_existing_db = false;
+
namespace leveldb {
// Helper for quickly generating random data.
@@ -126,6 +138,7 @@ class Benchmark {
Cache* cache_;
DB* db_;
int num_;
+ int reads_;
int heap_counter_;
double start_;
double last_op_finish_;
@@ -298,6 +311,7 @@ class Benchmark {
: cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
db_(NULL),
num_(FLAGS_num),
+ reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
heap_counter_(0),
bytes_(0),
rand_(301) {
@@ -308,7 +322,9 @@ class Benchmark {
Env::Default()->DeleteFile("/tmp/dbbench/" + files[i]);
}
}
- DestroyDB("/tmp/dbbench", Options());
+ if (!FLAGS_use_existing_db) {
+ DestroyDB("/tmp/dbbench", Options());
+ }
}
~Benchmark() {
@@ -355,11 +371,13 @@ class Benchmark {
ReadReverse();
} else if (name == Slice("readrandom")) {
ReadRandom();
+ } else if (name == Slice("readhot")) {
+ ReadHot();
} else if (name == Slice("readrandomsmall")) {
- int n = num_;
- num_ /= 1000;
+ int n = reads_;
+ reads_ /= 1000;
ReadRandom();
- num_ = n;
+ reads_ = n;
} else if (name == Slice("compact")) {
Compact();
} else if (name == Slice("crc32c")) {
@@ -449,7 +467,7 @@ class Benchmark {
void Open() {
assert(db_ == NULL);
Options options;
- options.create_if_missing = true;
+ options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size;
Status s = DB::Open(options, "/tmp/dbbench", &db_);
@@ -462,6 +480,10 @@ class Benchmark {
void Write(const WriteOptions& options, Order order, DBState state,
int num_entries, int value_size, int entries_per_batch) {
if (state == FRESH) {
+ if (FLAGS_use_existing_db) {
+ message_ = "skipping (--use_existing_db is true)";
+ return;
+ }
delete db_;
db_ = NULL;
DestroyDB("/tmp/dbbench", Options());
@@ -499,7 +521,7 @@ class Benchmark {
void ReadSequential() {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
- for (iter->SeekToFirst(); i < num_ && iter->Valid(); iter->Next()) {
+ for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
++i;
@@ -510,7 +532,7 @@ class Benchmark {
void ReadReverse() {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
- for (iter->SeekToLast(); i < num_ && iter->Valid(); iter->Prev()) {
+ for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
++i;
@@ -521,7 +543,7 @@ class Benchmark {
void ReadRandom() {
ReadOptions options;
std::string value;
- for (int i = 0; i < num_; i++) {
+ for (int i = 0; i < reads_; i++) {
char key[100];
const int k = rand_.Next() % FLAGS_num;
snprintf(key, sizeof(key), "%016d", k);
@@ -530,6 +552,19 @@ class Benchmark {
}
}
+ void ReadHot() {
+ ReadOptions options;
+ std::string value;
+ const int range = (FLAGS_num + 99) / 100;
+ for (int i = 0; i < reads_; i++) {
+ char key[100];
+ const int k = rand_.Next() % range;
+ snprintf(key, sizeof(key), "%016d", k);
+ db_->Get(options, key, &value);
+ FinishedSingleOp();
+ }
+ }
+
void Compact() {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
@@ -582,6 +617,8 @@ class Benchmark {
int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
+ FLAGS_open_files = leveldb::Options().max_open_files;
+
for (int i = 1; i < argc; i++) {
double d;
int n;
@@ -593,14 +630,21 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_histogram = n;
+ } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
+ (n == 0 || n == 1)) {
+ FLAGS_use_existing_db = n;
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
FLAGS_num = n;
+ } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
+ FLAGS_reads = n;
} else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
FLAGS_value_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
FLAGS_cache_size = n;
+ } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
+ FLAGS_open_files = n;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);
View
64 db/db_impl.cc
@@ -126,6 +126,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
log_(NULL),
bg_compaction_scheduled_(false),
compacting_(false) {
+ mem_->Ref();
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
@@ -152,8 +153,8 @@ DBImpl::~DBImpl() {
}
delete versions_;
- delete mem_;
- delete imm_;
+ if (mem_ != NULL) mem_->Unref();
+ if (imm_ != NULL) imm_->Unref();
delete log_;
delete logfile_;
delete table_cache_;
@@ -344,7 +345,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
- log::Reader reader(file, &reporter, true/*checksum*/);
+ log::Reader reader(file, &reporter, true/*checksum*/,
+ 0/*initial_offset*/);
Log(env_, options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
@@ -364,6 +366,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
if (mem == NULL) {
mem = new MemTable(internal_comparator_);
+ mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
@@ -384,7 +387,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// file-systems cause the DB::Open() to fail.
break;
}
- delete mem;
+ mem->Unref();
mem = NULL;
}
}
@@ -395,7 +398,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// file-systems cause the DB::Open() to fail.
}
- delete mem;
+ if (mem != NULL) mem->Unref();
delete file;
return status;
}
@@ -443,11 +446,12 @@ Status DBImpl::CompactMemTable() {
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
- s = versions_->LogAndApply(&edit, imm_);
+ s = versions_->LogAndApply(&edit);
}
if (s.ok()) {
// Commit to the new state
+ imm_->Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
@@ -556,7 +560,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
- status = versions_->LogAndApply(c->edit(), NULL);
+ status = versions_->LogAndApply(c->edit());
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
@@ -697,7 +701,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
compact->outputs.clear();
- Status s = versions_->LogAndApply(compact->compaction->edit(), NULL);
+ Status s = versions_->LogAndApply(compact->compaction->edit());
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
@@ -754,9 +758,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
Slice key = input->key();
- InternalKey tmp_internal_key;
- tmp_internal_key.DecodeFrom(key);
- if (compact->compaction->ShouldStopBefore(tmp_internal_key) &&
+ if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
@@ -867,6 +869,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
compacting_ = false;
compacting_cv_.SignalAll();
+ VersionSet::LevelSummaryStorage tmp;
+ Log(env_, options_.info_log,
+ "compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
@@ -925,10 +930,11 @@ Status DBImpl::Get(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
- SequenceNumber sequence =
- (options.snapshot ? options.snapshot->number_ : latest_snapshot);
- return NewDBIterator(&dbname_, env_,
- user_comparator(), internal_iter, sequence);
+ return NewDBIterator(
+ &dbname_, env_, user_comparator(), internal_iter,
+ (options.snapshot != NULL
+ ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
+ : latest_snapshot));
}
void DBImpl::Unref(void* arg1, void* arg2) {
@@ -945,7 +951,7 @@ const Snapshot* DBImpl::GetSnapshot() {
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
MutexLock l(&mutex_);
- snapshots_.Delete(s);
+ snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
}
// Convenience methods
@@ -985,12 +991,26 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
+ bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
+ } else if (
+ allow_delay &&
+ versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
+ // We are getting close to hitting a hard limit on the number of
+ // L0 files. Rather than delaying a single write by several
+ // seconds when we hit the hard limit, start delaying each
+ // individual write by 1ms to reduce latency variance. Also,
+ // this delay hands over some CPU to the compaction thread in
+ // case it is sharing the same core as the writer.
+ mutex_.Unlock();
+ env_->SleepForMicroseconds(1000);
+ allow_delay = false; // Do not delay a single write more than once
+ mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
@@ -999,6 +1019,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
compacting_cv_.Wait();
+ } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
+ // There are too many level-0 files.
+ compacting_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
@@ -1011,7 +1034,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
VersionEdit edit;
edit.SetPrevLogNumber(versions_->LogNumber());
edit.SetLogNumber(new_log_number);
- s = versions_->LogAndApply(&edit, NULL);
+ s = versions_->LogAndApply(&edit);
if (!s.ok()) {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
@@ -1024,6 +1047,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
+ mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
@@ -1141,10 +1165,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->log_ = new log::Writer(lfile);
- s = impl->versions_->LogAndApply(&edit, NULL);
+ s = impl->versions_->LogAndApply(&edit);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
+ impl->MaybeScheduleCompaction();
}
}
impl->mutex_.Unlock();
@@ -1156,6 +1181,9 @@ Status DB::Open(const Options& options, const std::string& dbname,
return s;
}
+Snapshot::~Snapshot() {
+}
+
Status DestroyDB(const std::string& dbname, const Options& options) {
Env* env = options.env;
std::vector<std::string> filenames;
View
109 db/db_test.cc
@@ -3,7 +3,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/db.h"
-
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
@@ -802,8 +801,17 @@ TEST(DBTest, DBOpen_Options) {
db = NULL;
}
+namespace {
+typedef std::map<std::string, std::string> KVMap;
+}
+
class ModelDB: public DB {
public:
+ class ModelSnapshot : public Snapshot {
+ public:
+ KVMap map_;
+ };
+
explicit ModelDB(const Options& options): options_(options) { }
~ModelDB() { }
virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
@@ -824,35 +832,34 @@ class ModelDB: public DB {
return new ModelIter(saved, true);
} else {
const KVMap* snapshot_state =
- reinterpret_cast<const KVMap*>(options.snapshot->number_);
+ &(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
return new ModelIter(snapshot_state, false);
}
}
virtual const Snapshot* GetSnapshot() {
- KVMap* saved = new KVMap;
- *saved = map_;
- return snapshots_.New(
- reinterpret_cast<SequenceNumber>(saved));
+ ModelSnapshot* snapshot = new ModelSnapshot;
+ snapshot->map_ = map_;
+ return snapshot;
}
virtual void ReleaseSnapshot(const Snapshot* snapshot) {
- const KVMap* saved = reinterpret_cast<const KVMap*>(snapshot->number_);
- delete saved;
- snapshots_.Delete(snapshot);
+ delete reinterpret_cast<const ModelSnapshot*>(snapshot);
}
virtual Status Write(const WriteOptions& options, WriteBatch* batch) {
assert(options.post_write_snapshot == NULL); // Not supported
- for (WriteBatchInternal::Iterator it(*batch); !it.Done(); it.Next()) {
- switch (it.op()) {
- case kTypeValue:
- map_[it.key().ToString()] = it.value().ToString();
- break;
- case kTypeDeletion:
- map_.erase(it.key().ToString());
- break;
+ class Handler : public WriteBatch::Handler {
+ public:
+ KVMap* map_;
+ virtual void Put(const Slice& key, const Slice& value) {
+ (*map_)[key.ToString()] = value.ToString();
}
- }
- return Status::OK();
+ virtual void Delete(const Slice& key) {
+ map_->erase(key.ToString());
+ }
+ };
+ Handler handler;
+ handler.map_ = &map_;
+ return batch->Iterate(&handler);
}
virtual bool GetProperty(const Slice& property, std::string* value) {
@@ -864,7 +871,6 @@ class ModelDB: public DB {
}
}
private:
- typedef std::map<std::string, std::string> KVMap;
class ModelIter: public Iterator {
public:
ModelIter(const KVMap* map, bool owned)
@@ -897,7 +903,6 @@ class ModelDB: public DB {
};
const Options options_;
KVMap map_;
- SnapshotList snapshots_;
};
static std::string RandomKey(Random* rnd) {
@@ -1023,8 +1028,70 @@ TEST(DBTest, Randomized) {
if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
}
+std::string MakeKey(unsigned int num) {
+ char buf[30];
+ snprintf(buf, sizeof(buf), "%016u", num);
+ return std::string(buf);
+}
+
+void BM_LogAndApply(int iters, int num_base_files) {
+ std::string dbname = test::TmpDir() + "/leveldb_test_benchmark";
+ DestroyDB(dbname, Options());
+
+ DB* db = NULL;
+ Options opts;
+ opts.create_if_missing = true;
+ Status s = DB::Open(opts, dbname, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != NULL);
+
+ delete db;
+ db = NULL;
+
+ Env* env = Env::Default();
+
+ InternalKeyComparator cmp(BytewiseComparator());
+ Options options;
+ VersionSet vset(dbname, &options, NULL, &cmp);
+ ASSERT_OK(vset.Recover());
+ VersionEdit vbase;
+ uint64_t fnum = 1;
+ for (int i = 0; i < num_base_files; i++) {
+ InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
+ InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
+ vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
+ }
+ ASSERT_OK(vset.LogAndApply(&vbase));
+
+ uint64_t start_micros = env->NowMicros();
+
+ for (int i = 0; i < iters; i++) {
+ VersionEdit vedit;
+ vedit.DeleteFile(2, fnum);
+ InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
+ InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
+ vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
+ vset.LogAndApply(&vedit);
+ }
+ uint64_t stop_micros = env->NowMicros();
+ unsigned int us = stop_micros - start_micros;
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", num_base_files);
+ fprintf(stderr,
+ "BM_LogAndApply/%-6s %8d iters : %9u us (%7.0f us / iter)\n",
+ buf, iters, us, ((float)us) / iters);
+}
+
}
int main(int argc, char** argv) {
+ if (argc > 1 && std::string(argv[1]) == "--benchmark") {
+ leveldb::BM_LogAndApply(1000, 1);
+ leveldb::BM_LogAndApply(1000, 100);
+ leveldb::BM_LogAndApply(1000, 10000);
+ leveldb::BM_LogAndApply(100, 100000);
+ return 0;
+ }
+
return leveldb::test::RunAllTests();
}
View
10 db/dbformat.h
@@ -19,6 +19,16 @@ namespace leveldb {
// parameters set via options.
namespace config {
static const int kNumLevels = 7;
+
+// Level-0 compaction is started when we hit this many files.
+static const int kL0_CompactionTrigger = 4;
+
+// Soft limit on number of level-0 files. We slow down writes at this point.
+static const int kL0_SlowdownWritesTrigger = 8;
+
+// Maximum number of level-0 files. We stop writes at this point.
+static const int kL0_StopWritesTrigger = 12;
+
}
class InternalKey;
View
116 db/log_reader.cc
@@ -4,7 +4,6 @@
#include "db/log_reader.h"
-#include <stdint.h>
#include "leveldb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
@@ -15,78 +14,138 @@ namespace log {
Reader::Reporter::~Reporter() {
}
-Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum)
+Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
+ uint64_t initial_offset)
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
buffer_(),
- eof_(false) {
+ eof_(false),
+ last_record_offset_(0),
+ end_of_buffer_offset_(0),
+ initial_offset_(initial_offset) {
}
Reader::~Reader() {
delete[] backing_store_;
}
+bool Reader::SkipToInitialBlock() {
+ size_t offset_in_block = initial_offset_ % kBlockSize;
+ uint64_t block_start_location = initial_offset_ - offset_in_block;
+
+ // Don't search a block if we'd be in the trailer
+ if (offset_in_block > kBlockSize - 6) {
+ offset_in_block = 0;
+ block_start_location += kBlockSize;
+ }
+
+ end_of_buffer_offset_ = block_start_location;
+
+ // Skip to start of first block that can contain the initial record
+ if (block_start_location > 0) {
+ Status skip_status = file_->Skip(block_start_location);
+ if (!skip_status.ok()) {
+ ReportDrop(block_start_location, skip_status);
+ return false;
+ }
+ }
+
+ return true;
+}
+
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
+ if (last_record_offset_ < initial_offset_) {
+ if (!SkipToInitialBlock()) {
+ return false;
+ }
+ }
+
scratch->clear();
record->clear();
bool in_fragmented_record = false;
+ // Record offset of the logical record that we're reading
+ // 0 is a dummy value to make compilers happy
+ uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
+ uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
switch (ReadPhysicalRecord(&fragment)) {
case kFullType:
if (in_fragmented_record) {
- ReportDrop(scratch->size(), "partial record without end");
+ // Handle bug in earlier versions of log::Writer where
+ // it could emit an empty kFirstType record at the tail end
+ // of a block followed by a kFullType or kFirstType record
+ // at the beginning of the next block.
+ if (scratch->empty()) {
+ in_fragmented_record = false;
+ } else {
+ ReportCorruption(scratch->size(), "partial record without end(1)");
+ }
}
+ prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
+ last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
- ReportDrop(scratch->size(), "partial record without end");
+ // Handle bug in earlier versions of log::Writer where
+ // it could emit an empty kFirstType record at the tail end
+ // of a block followed by a kFullType or kFirstType record
+ // at the beginning of the next block.
+ if (scratch->empty()) {
+ in_fragmented_record = false;
+ } else {
+ ReportCorruption(scratch->size(), "partial record without end(2)");
+ }
}
+ prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
- ReportDrop(fragment.size(), "missing start of fragmented record");
+ ReportCorruption(fragment.size(),
+ "missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
if (!in_fragmented_record) {
- ReportDrop(fragment.size(), "missing start of fragmented record");
+ ReportCorruption(fragment.size(),
+ "missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
+ last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
- ReportDrop(scratch->size(), "partial record without end");
+ ReportCorruption(scratch->size(), "partial record without end(3)");
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
- ReportDrop(scratch->size(), "error in middle of record");
+ ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default:
- ReportDrop(
+ ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
"unknown record type");
in_fragmented_record = false;
@@ -97,9 +156,18 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
return false;
}
-void Reader::ReportDrop(size_t bytes, const char* reason) {
- if (reporter_ != NULL) {
- reporter_->Corruption(bytes, Status::Corruption(reason));
+uint64_t Reader::LastRecordOffset() {
+ return last_record_offset_;
+}
+
+void Reader::ReportCorruption(size_t bytes, const char* reason) {
+ ReportDrop(bytes, Status::Corruption(reason));
+}
+
+void Reader::ReportDrop(size_t bytes, const Status& reason) {
+ if (reporter_ != NULL &&
+ end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
+ reporter_->Corruption(bytes, reason);
}
}
@@ -110,11 +178,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
+ end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
- if (reporter_ != NULL) {
- reporter_->Corruption(kBlockSize, status);
- }
buffer_.clear();
+ ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
@@ -125,8 +192,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
// End of file
return kEof;
} else {
- ReportDrop(buffer_.size(), "truncated record at end of file");
+ size_t drop_size = buffer_.size();
buffer_.clear();
+ ReportCorruption(drop_size, "truncated record at end of file");
return kEof;
}
}
@@ -138,8 +206,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
- ReportDrop(buffer_.size(), "bad record length");
+ size_t drop_size = buffer_.size();
buffer_.clear();
+ ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
@@ -160,13 +229,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
- ReportDrop(buffer_.size(), "checksum mismatch");
+ size_t drop_size = buffer_.size();
buffer_.clear();
+ ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
+
+ // Skip physical record that started before initial_offset_
+ if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
+ initial_offset_) {
+ result->clear();
+ return kBadRecord;
+ }
+
*result = Slice(header + kHeaderSize, length);
return type;
}
View
37 db/log_reader.h
@@ -5,6 +5,8 @@
#ifndef STORAGE_LEVELDB_DB_LOG_READER_H_
#define STORAGE_LEVELDB_DB_LOG_READER_H_
+#include <stdint.h>
+
#include "db/log_format.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
@@ -35,7 +37,11 @@ class Reader {
// live while this Reader is in use.
//
// If "checksum" is true, verify checksums if available.
- Reader(SequentialFile* file, Reporter* reporter, bool checksum);
+ //
+ // The Reader will start reading at the first record located at physical
+ // position >= initial_offset within the file.
+ Reader(SequentialFile* file, Reporter* reporter, bool checksum,
+ uint64_t initial_offset);
~Reader();
@@ -46,6 +52,11 @@ class Reader {
// reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch);
+ // Returns the physical offset of the last record returned by ReadRecord.
+ //
+ // Undefined before the first call to ReadRecord.
+ uint64_t LastRecordOffset();
+
private:
SequentialFile* const file_;
Reporter* const reporter_;
@@ -54,15 +65,37 @@ class Reader {
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
+ // Offset of the last record returned by ReadRecord.
+ uint64_t last_record_offset_;
+ // Offset of the first location past the end of buffer_.
+ uint64_t end_of_buffer_offset_;
+
+ // Offset at which to start looking for the first record to return
+ uint64_t const initial_offset_;
+
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
+ // Returned whenever we find an invalid physical record.
+ // Currently there are three situations in which this happens:
+ // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
+ // * The record is a 0-length record (No drop is reported)
+ // * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2
};
+ // Skips all blocks that are completely before "initial_offset_".
+ //
+ // Returns true on success. Handles reporting.
+ bool SkipToInitialBlock();
+
// Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result);
- void ReportDrop(size_t bytes, const char* reason);
+
+ // Reports dropped bytes to the reporter.
+ // buffer_ must be updated to remove the dropped bytes prior to invocation.
+ void ReportCorruption(size_t bytes, const char* reason);
+ void ReportDrop(size_t bytes, const Status& reason);
// No copying allowed
Reader(const Reader&);
View
143 db/log_test.cc
@@ -60,7 +60,6 @@ class LogTest {
virtual Status Read(size_t n, Slice* result, char* scratch) {
ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
- ASSERT_EQ(kBlockSize, n);
if (force_error_) {
force_error_ = false;
@@ -76,6 +75,17 @@ class LogTest {
contents_.remove_prefix(n);
return Status::OK();
}
+
+ virtual Status Skip(size_t n) {
+ if (n > contents_.size()) {
+ contents_.clear();
+ return Status::NotFound("in-memory file skipepd past end");
+ }
+
+ contents_.remove_prefix(n);
+
+ return Status::OK();
+ }
};
class ReportCollector : public Reader::Reporter {
@@ -97,10 +107,15 @@ class LogTest {
Writer writer_;
Reader reader_;
+ // Record metadata for testing initial offset functionality
+ static size_t initial_offset_record_sizes_[];
+ static uint64_t initial_offset_last_record_offsets_[];
+
public:
LogTest() : reading_(false),
writer_(&dest_),
- reader_(&source_, &report_, true/*checksum*/) {
+ reader_(&source_, &report_, true/*checksum*/,
+ 0/*initial_offset*/) {
}
void Write(const std::string& msg) {
@@ -153,6 +168,10 @@ class LogTest {
return report_.dropped_bytes_;
}
+ std::string ReportMessage() const {
+ return report_.message_;
+ }
+
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
if (report_.message_.find(msg) == std::string::npos) {
@@ -161,8 +180,61 @@ class LogTest {
return "OK";
}
}
+
+ void WriteInitialOffsetLog() {
+ for (int i = 0; i < 4; i++) {
+ std::string record(initial_offset_record_sizes_[i],
+ static_cast<char>('a' + i));
+ Write(record);
+ }
+ }
+
+ void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
+ WriteInitialOffsetLog();
+ reading_ = true;
+ source_.contents_ = Slice(dest_.contents_);
+ Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
+ WrittenBytes() + offset_past_end);
+ Slice record;
+ std::string scratch;
+ ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
+ delete offset_reader;
+ }
+
+ void CheckInitialOffsetRecord(uint64_t initial_offset,
+ int expected_record_offset) {
+ WriteInitialOffsetLog();
+ reading_ = true;
+ source_.contents_ = Slice(dest_.contents_);
+ Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
+ initial_offset);
+ Slice record;
+ std::string scratch;
+ ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
+ ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
+ record.size());
+ ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
+ offset_reader->LastRecordOffset());
+ ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
+ delete offset_reader;
+ }
+
};
+size_t LogTest::initial_offset_record_sizes_[] =
+ {10000, // Two sizable records in first block
+ 10000,
+ 2 * log::kBlockSize - 1000, // Span three blocks
+ 1};
+
+uint64_t LogTest::initial_offset_last_record_offsets_[] =
+ {0,
+ kHeaderSize + 10000,
+ 2 * (kHeaderSize + 10000),
+ 2 * (kHeaderSize + 10000) +
+ (2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
+
+
TEST(LogTest, Empty) {
ASSERT_EQ("EOF", Read());
}
@@ -213,6 +285,19 @@ TEST(LogTest, MarginalTrailer) {
ASSERT_EQ("EOF", Read());
}
+TEST(LogTest, MarginalTrailer2) {
+ // Make a trailer that is exactly the same length as an empty record.
+ const int n = kBlockSize - 2*kHeaderSize;
+ Write(BigString("foo", n));
+ ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
+ Write("bar");
+ ASSERT_EQ(BigString("foo", n), Read());
+ ASSERT_EQ("bar", Read());
+ ASSERT_EQ("EOF", Read());
+ ASSERT_EQ(0, DroppedBytes());
+ ASSERT_EQ("", ReportMessage());
+}
+
TEST(LogTest, ShortTrailer) {
const int n = kBlockSize - 2*kHeaderSize + 4;
Write(BigString("foo", n));
@@ -353,6 +438,60 @@ TEST(LogTest, ErrorJoinsRecords) {
ASSERT_GE(dropped, 2*kBlockSize);
}
+TEST(LogTest, ReadStart) {
+ CheckInitialOffsetRecord(0, 0);
+}
+
+TEST(LogTest, ReadSecondOneOff) {
+ CheckInitialOffsetRecord(1, 1);
+}
+
+TEST(LogTest, ReadSecondTenThousand) {
+ CheckInitialOffsetRecord(10000, 1);
+}
+
+TEST(LogTest, ReadSecondStart) {
+ CheckInitialOffsetRecord(10007, 1);
+}
+
+TEST(LogTest, ReadThirdOneOff) {
+ CheckInitialOffsetRecord(10008, 2);
+}
+
+TEST(LogTest, ReadThirdStart) {
+ CheckInitialOffsetRecord(20014, 2);
+}
+
+TEST(LogTest, ReadFourthOneOff) {
+ CheckInitialOffsetRecord(20015, 3);
+}
+
+TEST(LogTest, ReadFourthFirstBlockTrailer) {
+ CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
+}
+
+TEST(LogTest, ReadFourthMiddleBlock) {
+ CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
+}
+
+TEST(LogTest, ReadFourthLastBlock) {
+ CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
+}
+
+TEST(LogTest, ReadFourthStart) {
+ CheckInitialOffsetRecord(
+ 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
+ 3);
+}
+
+TEST(LogTest, ReadEnd) {
+ CheckOffsetPastEndReturnsNoRecords(0);
+}
+
+TEST(LogTest, ReadPastEnd) {
+ CheckOffsetPastEndReturnsNoRecords(5);
+}
+
}
}
View
3 db/log_writer.cc
@@ -32,6 +32,7 @@ Status Writer::AddRecord(const Slice& slice) {
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
+ bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
@@ -52,7 +53,6 @@ Status Writer::AddRecord(const Slice& slice) {
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
- const bool begin = (ptr == slice.data());
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
@@ -67,6 +67,7 @@ Status Writer::AddRecord(const Slice& slice) {
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
+ begin = false;
} while (s.ok() && left > 0);
return s;
}
View
14 db/memtable.cc
@@ -20,10 +20,12 @@ static Slice GetLengthPrefixedSlice(const char* data) {
MemTable::MemTable(const InternalKeyComparator& cmp)
: comparator_(cmp),
+ refs_(0),
table_(comparator_, &arena_) {
}
MemTable::~MemTable() {
+ assert(refs_ == 0);
}
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
@@ -48,10 +50,15 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
- explicit MemTableIterator(MemTable::Table* table) {
+ explicit MemTableIterator(MemTable* mem, MemTable::Table* table) {
+ mem_ = mem;
iter_ = new MemTable::Table::Iterator(table);
+ mem->Ref();
+ }
+ virtual ~MemTableIterator() {
+ delete iter_;
+ mem_->Unref();
}
- virtual ~MemTableIterator() { delete iter_; }
virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
@@ -68,6 +75,7 @@ class MemTableIterator: public Iterator {
virtual Status status() const { return Status::OK(); }
private:
+ MemTable* mem_;
MemTable::Table::Iterator* iter_;
std::string tmp_; // For passing to EncodeKey
@@ -77,7 +85,7 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator() {
- return new MemTableIterator(&table_);
+ return new MemTableIterator(this, &table_);
}
void MemTable::Add(SequenceNumber s, ValueType type,
View
18 db/memtable.h
@@ -19,8 +19,21 @@ class MemTableIterator;
class MemTable {
public:
+ // MemTables are reference counted. The initial reference count
+ // is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);
- ~MemTable();
+
+ // Increase reference count.
+ void Ref() { ++refs_; }
+
+ // Drop reference count. Delete if no more references exist.
+ void Unref() {
+ --refs_;
+ assert(refs_ >= 0);
+ if (refs_ <= 0) {
+ delete this;
+ }
+ }
// Returns an estimate of the number of bytes of data in use by this
// data structure.
@@ -45,6 +58,8 @@ class MemTable {
const Slice& value);
private:
+ ~MemTable(); // Private since only Unref() should be used to delete it
+
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
@@ -56,6 +71,7 @@ class MemTable {
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_;
+ int refs_;
Arena arena_;
Table table_;
View
12 db/repair.cc
@@ -183,13 +183,15 @@ class Repairer {
// corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence
// numbers).
- log::Reader reader(lfile, &reporter, false/*do not checksum*/);
+ log::Reader reader(lfile, &reporter, false/*do not checksum*/,
+ 0/*initial_offset*/);
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
- MemTable mem(icmp_);
+ MemTable* mem = new MemTable(icmp_);
+ mem->Ref();
int counter = 0;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
@@ -198,7 +200,7 @@ class Repairer {
continue;
}
WriteBatchInternal::SetContents(&batch, record);
- status = WriteBatchInternal::InsertInto(&batch, &mem);
+ status = WriteBatchInternal::InsertInto(&batch, mem);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
@@ -215,10 +217,12 @@ class Repairer {
VersionEdit skipped;
FileMetaData meta;
meta.number = next_file_number_++;
- Iterator* iter = mem.NewIterator();
+ Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, table_cache_, iter,
&meta, &skipped);
delete iter;
+ mem->Unref();
+ mem = NULL;
if (status.ok()) {
if (meta.file_size > 0) {
table_numbers_.push_back(meta.number);
View
22 db/snapshot.h
@@ -12,17 +12,17 @@ namespace leveldb {
class SnapshotList;
// Snapshots are kept in a doubly-linked list in the DB.
-// Each Snapshot corresponds to a particular sequence number.
-class Snapshot {
+// Each SnapshotImpl corresponds to a particular sequence number.
+class SnapshotImpl : public Snapshot {
public:
SequenceNumber number_; // const after creation
private:
friend class SnapshotList;
- // Snapshot is kept in a doubly-linked circular list
- Snapshot* prev_;
- Snapshot* next_;
+ // SnapshotImpl is kept in a doubly-linked circular list
+ SnapshotImpl* prev_;
+ SnapshotImpl* next_;
SnapshotList* list_; // just for sanity checks
};
@@ -35,11 +35,11 @@ class SnapshotList {
}
bool empty() const { return list_.next_ == &list_; }
- Snapshot* oldest() const { assert(!empty()); return list_.next_; }
- Snapshot* newest() const { assert(!empty()); return list_.prev_; }
+ SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; }
+ SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
- const Snapshot* New(SequenceNumber seq) {
- Snapshot* s = new Snapshot;
+ const SnapshotImpl* New(SequenceNumber seq) {
+ SnapshotImpl* s = new SnapshotImpl;
s->number_ = seq;
s->list_ = this;
s->next_ = &list_;
@@ -49,7 +49,7 @@ class SnapshotList {
return s;
}
- void Delete(const Snapshot* s) {
+ void Delete(const SnapshotImpl* s) {
assert(s->list_ == this);
s->prev_->next_ = s->next_;
s->next_->prev_ = s->prev_;
@@ -58,7 +58,7 @@ class SnapshotList {
private:
// Dummy head of doubly-linked list of snapshots
- Snapshot list_;
+ SnapshotImpl list_;
};
}
View
309 db/version_set.cc
@@ -57,17 +57,22 @@ std::string IntSetToString(const std::set<uint64_t>& s) {
Version::~Version() {
assert(refs_ == 0);
+
+ // Remove from linked list
+ prev_->next_ = next_;
+ next_->prev_ = prev_;
+
+ // Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
- assert(f->refs >= 0);
+ assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
- delete cleanup_mem_;
}
// An internal iterator. For a given version/level pair, yields
@@ -77,9 +82,9 @@ Version::~Version() {
// encoded using EncodeFixed64.
class Version::LevelFileNumIterator : public Iterator {
public:
- LevelFileNumIterator(const Version* version,
+ LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
- : icmp_(version->vset_->icmp_.user_comparator()),
+ : icmp_(icmp),
flist_(flist),
index_(flist->size()) { // Marks as invalid
}
@@ -157,7 +162,7 @@ static Iterator* GetFileIterator(void* arg,
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
- new LevelFileNumIterator(this, &files_[level]),
+ new LevelFileNumIterator(vset_->icmp_, &files_[level]),
&GetFileIterator, vset_->table_cache_, options);
}
@@ -185,11 +190,11 @@ void Version::Ref() {
}
void Version::Unref() {
+ assert(this != &vset_->dummy_versions_);
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
- vset_->MaybeDeleteOldVersions();
- // TODO: try to delete obsolete files
+ delete this;
}
}
@@ -222,37 +227,58 @@ std::string Version::DebugString() const {
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
private:
- typedef std::map<uint64_t, FileMetaData*> FileMap;
+ // Helper to sort by v->files_[file_number].smallest
+ struct BySmallestKey {
+ const InternalKeyComparator* internal_comparator;
+
+ bool operator()(FileMetaData* f1, FileMetaData* f2) const {
+ int r = internal_comparator->Compare(f1->smallest, f2->smallest);
+ if (r != 0) {
+ return (r < 0);
+ } else {
+ // Break ties by file number
+ return (f1->number < f2->number);
+ }
+ }
+ };
+
+ typedef std::set<FileMetaData*, BySmallestKey> FileSet;
+ struct LevelState {
+ std::set<uint64_t> deleted_files;
+ FileSet* added_files;
+ };
+
VersionSet* vset_;
- FileMap files_[config::kNumLevels];
+ Version* base_;
+ LevelState levels_[config::kNumLevels];
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base)
- : vset_(vset) {
+ : vset_(vset),
+ base_(base) {
+ base_->Ref();
+ BySmallestKey cmp;
+ cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
- const std::vector<FileMetaData*>& files = base->files_[level];
- for (size_t i = 0; i < files.size(); i++) {
- FileMetaData* f = files[i];
- f->refs++;
- files_[level].insert(std::make_pair(f->number, f));
- }
+ levels_[level].added_files = new FileSet(cmp);
}
}
~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
- const FileMap& fmap = files_[level];
- for (FileMap::const_iterator iter = fmap.begin();
- iter != fmap.end();
- ++iter) {
- FileMetaData* f = iter->second;
+ std::vector<FileMetaData*> to_unref(levels_[level].added_files->begin(),
+ levels_[level].added_files->end());
+ delete levels_[level].added_files;
+ for (int i = 0; i < to_unref.size(); i++) {
+ FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
+ base_->Unref();
}
// Apply all of the edits in *edit to the current state.
@@ -271,39 +297,74 @@ class VersionSet::Builder {
++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
- FileMap::iterator fiter = files_[level].find(number);
- assert(fiter != files_[level].end()); // Sanity check for debug mode
- if (fiter != files_[level].end()) {
- FileMetaData* f = fiter->second;
- f->refs--;
- if (f->refs <= 0) {
- delete f;
- }
- files_[level].erase(fiter);
- }
+ levels_[level].deleted_files.insert(number);
}
// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
- assert(files_[level].count(f->number) == 0);
- files_[level].insert(std::make_pair(f->number, f));
+ levels_[level].deleted_files.erase(f->number);
+ levels_[level].added_files->insert(f);
}
}
// Save the current state in *v.
void SaveTo(Version* v) {
+ BySmallestKey cmp;
+ cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
- const FileMap& fmap = files_[level];
- for (FileMap::const_iterator iter = fmap.begin();
- iter != fmap.end();
- ++iter) {
- FileMetaData* f = iter->second;
- f->refs++;
- v->files_[level].push_back(f);
+ // Merge the set of added files with the set of pre-existing files.
+ // Drop any deleted files. Store the result in *v.
+ const std::vector<FileMetaData*>& base_files = base_->files_[level];
+ std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
+ std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
+ const FileSet* added = levels_[level].added_files;
+ v->files_[level].reserve(base_files.size() + added->size());
+ for (FileSet::const_iterator added_iter = added->begin();
+ added_iter != added->end();
+ ++added_iter) {
+ // Add all smaller files listed in base_
+ for (std::vector<FileMetaData*>::const_iterator bpos
+ = std::upper_bound(base_iter, base_end, *added_iter, cmp);
+ base_iter != bpos;
+ ++base_iter) {
+ MaybeAddFile(v, level, *base_iter);
+ }
+
+ MaybeAddFile(v, level, *added_iter);
+ }
+
+ // Add remaining base files
+ for (; base_iter != base_end; ++base_iter) {
+ MaybeAddFile(v, level, *base_iter);
}
+
+#ifndef NDEBUG
+ // Make sure there is no overlap in levels > 0
+ if (level > 0) {
+ for (int i = 1; i < v->files_[level].size(); i++) {
+ const InternalKey& prev_end = v->files_[level][i-1]->largest;
+ const InternalKey& this_begin = v->files_[level][i]->smallest;
+ if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
+ fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
+ EscapeString(prev_end.Encode()).c_str(),
+ EscapeString(this_begin.Encode()).c_str());
+ abort();
+ }
+ }
+ }
+#endif
+ }
+ }
+
+ void MaybeAddFile(Version* v, int level, FileMetaData* f) {
+ if (levels_[level].deleted_files.count(f->number) > 0) {
+ // File is deleted: do nothing
+ } else {
+ f->refs++;
+ v->files_[level].push_back(f);
}
}
};
@@ -324,22 +385,36 @@ VersionSet::VersionSet(const std::string& dbname,
prev_log_number_(0),
descriptor_file_(NULL),
descriptor_log_(NULL),
- current_(new Version(this)),
- oldest_(current_) {
+ dummy_versions_(this),
+ current_(NULL) {
+ AppendVersion(new Version(this));
}
VersionSet::~VersionSet() {
- for (Version* v = oldest_; v != NULL; ) {
- Version* next = v->next_;
- assert(v->refs_ == 0);
- delete v;
- v = next;
- }
+ current_->Unref();
+ assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty
delete descriptor_log_;
delete descriptor_file_;
}
-Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
+void VersionSet::AppendVersion(Version* v) {
+ // Make "v" current
+ assert(v->refs_ == 0);
+ assert(v != current_);
+ if (current_ != NULL) {
+ current_->Unref();
+ }
+ current_ = v;
+ v->Ref();
+
+ // Append to linked list
+ v->prev_ = dummy_versions_.prev_;
+ v->next_ = &dummy_versions_;
+ v->prev_->next_ = v;
+ v->next_->prev_ = v;
+}
+
+Status VersionSet::LogAndApply(VersionEdit* edit) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
@@ -360,22 +435,20 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
builder.Apply(edit);
builder.SaveTo(v);
}
-
- std::string new_manifest_file;
- Status s = Finalize(v);
+ Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
- if (s.ok()) {
- if (descriptor_log_ == NULL) {
- assert(descriptor_file_ == NULL);
- new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
- edit->SetNextFile(next_file_number_);
- s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
- if (s.ok()) {
- descriptor_log_ = new log::Writer(descriptor_file_);
- s = WriteSnapshot(descriptor_log_);
- }
+ std::string new_manifest_file;
+ Status s;
+ if (descriptor_log_ == NULL) {
+ assert(descriptor_file_ == NULL);
+ new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
+ edit->SetNextFile(next_file_number_);
+ s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
+ if (s.ok()) {
+ descriptor_log_ = new log::Writer(descriptor_file_);
+ s = WriteSnapshot(descriptor_log_);
}
}
@@ -397,12 +470,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
// Install the new version
if (s.ok()) {
- assert(current_->next_ == NULL);
- assert(current_->cleanup_mem_ == NULL);
- current_->cleanup_mem_ = cleanup_mem;
- v->next_ = NULL;
- current_->next_ = v;
- current_ = v;
+ AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
@@ -458,7 +526,7 @@ Status VersionSet::Recover() {
{
LogReporter reporter;
reporter.status = &s;
- log::Reader reader(file, &reporter, true/*checksum*/);
+ log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -518,20 +586,14 @@ Status VersionSet::Recover() {
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
- s = Finalize(v);
- if (!s.ok()) {
- delete v;
- } else {
- // Install recovered version
- v->next_ = NULL;
- current_->next_ = v;
- current_ = v;
- manifest_file_number_ = next_file;
- next_file_number_ = next_file + 1;
- last_sequence_ = last_sequence;
- log_number_ = log_number;
- prev_log_number_ = prev_log_number;
- }
+ // Install recovered version
+ Finalize(v);
+ AppendVersion(v);
+ manifest_file_number_ = next_file;
+ next_file_number_ = next_file + 1;
+ last_sequence_ = last_sequence;
+ log_number_ = log_number;
+ prev_log_number_ = prev_log_number;
}
return s;
@@ -545,15 +607,12 @@ static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}
-Status VersionSet::Finalize(Version* v) {
+void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
- Status s;
- for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) {
- s = SortLevel(v, level);
-
+ for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
@@ -567,7 +626,8 @@ Status VersionSet::Finalize(Version* v) {
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
- score = v->files_[level].size() / 4.0;
+ score = v->files_[level].size() /
+ static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
@@ -582,7 +642,6 @@ Status VersionSet::Finalize(Version* v) {
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
- return s;
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
@@ -615,44 +674,27 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return log->AddRecord(record);
}
-// Helper to sort by tables_[file_number].smallest
-struct VersionSet::BySmallestKey {
- const InternalKeyComparator* internal_comparator;
-
- bool operator()(FileMetaData* f1, FileMetaData* f2) const {
- return internal_comparator->Compare(f1->smallest, f2->smallest) < 0;
- }
-};
-
-Status VersionSet::SortLevel(Version* v, uint64_t level) {
- Status result;
- BySmallestKey cmp;
- cmp.internal_comparator = &icmp_;
- std::sort(v->files_[level].begin(), v->files_[level].end(), cmp);
-
- if (result.ok() && level > 0) {
- // There should be no overlap
- for (size_t i = 1; i < v->files_[level].size(); i++) {
- const InternalKey& prev_end = v->files_[level][i-1]->largest;
- const InternalKey& this_begin = v->files_[level][i]->smallest;
- if (icmp_.Compare(prev_end, this_begin) >= 0) {
- result = Status::Corruption(
- "overlapping ranges in same level",
- (EscapeString(prev_end.Encode()) + " vs. " +
- EscapeString(this_begin.Encode())));
- break;
- }
- }
- }
- return result;
-}
-
int VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
+const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
+ // Update code if kNumLevels changes
+ assert(config::kNumLevels == 7);
+ snprintf(scratch->buffer, sizeof(scratch->buffer),
+ "files[ %d %d %d %d %d %d %d ]",
+ int(current_->files_[0].size()),
+ int(current_->files_[1].size()),
+ int(current_->files_[2].size()),
+ int(current_->files_[3].size()),
+ int(current_->files_[4].size()),
+ int(current_->files_[5].size()),
+ int(current_->files_[6].size()));
+ return scratch->buffer;
+}
+
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
@@ -685,19 +727,10 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
return result;
}
-void VersionSet::MaybeDeleteOldVersions() {
- // Note: it is important to delete versions in order since a newer
- // version with zero refs may be holding a pointer to a memtable
- // that is used by somebody who has a ref on an older version.
- while (oldest_ != current_ && oldest_->refs_ == 0) {
- Version* next = oldest_->next_;
- delete oldest_;
- oldest_ = next;
- }
-}
-
void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
- for (Version* v = oldest_; v != NULL; v = v->next_) {
+ for (Version* v = dummy_versions_.next_;
+ v != &dummy_versions_;
+ v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
@@ -809,8 +842,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
- new Version::LevelFileNumIterator(
- c->input_version_, &c->inputs_[which]),
+ new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
@@ -996,11 +1028,12 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
return true;
}
-bool Compaction::ShouldStopBefore(const InternalKey& key) {
+bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
while (grandparent_index_ < grandparents_.size() &&
- icmp->Compare(key, grandparents_[grandparent_index_]->largest) > 0) {
+ icmp->Compare(internal_key,
+ grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
}
View
38 db/version_set.h
@@ -59,8 +59,8 @@ class Version {
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
+ Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
- MemTable* cleanup_mem_; // NULL, or table to delete when version dropped
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
@@ -72,8 +72,7 @@ class Version {
int compaction_level_;
explicit Version(VersionSet* vset)
- : vset_(vset), next_(NULL), refs_(0),
- cleanup_mem_(NULL),
+ : vset_(vset), next_(this), prev_(this), refs_(0),
compaction_score_(-1),
compaction_level_(-1) {
}
@@ -95,10 +94,8 @@ class VersionSet {
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
- // current version. Iff Apply() returns OK, arrange to delete
- // cleanup_mem (if cleanup_mem != NULL) when it is no longer needed
- // by older versions.
- Status LogAndApply(VersionEdit* edit, MemTable* cleanup_mem);
+ // current version.
+ Status LogAndApply(VersionEdit* edit);
// Recover the last saved descriptor from persistent storage.
Status Recover();
@@ -171,19 +168,20 @@ class VersionSet {
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);
+ // Return a human-readable short (single-line) summary of the number
+ // of files per level. Uses *scratch as backing store.
+ struct LevelSummaryStorage {
+ char buffer[100];
+ };
+ const char* LevelSummary(LevelSummaryStorage* scratch) const;
+
private:
class Builder;
friend class Compaction;
friend class Version;
- Status Finalize(Version* v);
-
- // Delete any old versions that are no longer needed.
- void MaybeDeleteOldVersions();
-
- struct BySmallestKey;
- Status SortLevel(Version* v, uint64_t level);
+ void Finalize(Version* v);
void GetOverlappingInputs(
int level,
@@ -202,6 +200,8 @@ class VersionSet {
void SetupOtherInputs(Compaction* c);
+ void AppendVersion(Version* v);
+
Env* const env_;
const std::string dbname_;
const Options* const options_;
@@ -216,10 +216,8 @@ class VersionSet {
// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
-
- // Versions are kept in a singly linked list that is never empty
- Version* current_; // Pointer to the last (newest) list entry
- Version* oldest_; // Pointer to the first (oldest) list entry
+ Version dummy_versions_; // Head of circular doubly-linked list of versions.
+ Version* current_; // == dummy_versions_.prev_
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
@@ -265,8 +263,8 @@ class Compaction {
bool IsBaseLevelForKey(const Slice& user_key);
// Returns true iff we should stop building the current output
- // before processing "key".
- bool ShouldStopBefore(const InternalKey& key);
+ // before processing "internal_key".
+ bool ShouldStopBefore(const Slice& internal_key);
// Release the input version for the compaction, once the compaction
// is successful.
View
136 db/write_batch.cc
@@ -29,11 +29,53 @@ WriteBatch::WriteBatch() {
WriteBatch::~WriteBatch() { }
+WriteBatch::Handler::~Handler() { }
+
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(12);
}
+Status WriteBatch::Iterate(Handler* handler) const {
+ Slice input(rep_);
+ if (input.size() < 12) {
+ return Status::Corruption("malformed WriteBatch (too small)");
+ }
+
+ input.remove_prefix(12);
+ Slice key, value;
+ int found = 0;
+ while (!input.empty()) {
+ found++;
+ char tag = input[0];
+ input.remove_prefix(1);
+ switch (tag) {
+ case kTypeValue:
+ if (GetLengthPrefixedSlice(&input, &key) &&
+ GetLengthPrefixedSlice(&input, &value)) {
+ handler->Put(key, value);
+ } else {
+ return Status::Corruption("bad WriteBatch Put");
+ }
+ break;
+ case kTypeDeletion:
+ if (GetLengthPrefixedSlice(&input, &key)) {
+ handler->Delete(key);
+ } else {
+ return Status::Corruption("bad WriteBatch Delete");
+ }
+ break;
+ default:
+ return Status::Corruption("unknown WriteBatch tag");
+ }
+ }
+ if (found != WriteBatchInternal::Count(this)) {
+ return Status::Corruption("WriteBatch has wrong count");
+ } else {
+ return Status::OK();
+ }
+}
+
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
@@ -63,86 +105,34 @@ void WriteBatch::Delete(const Slice& key) {
PutLengthPrefixedSlice(&rep_, key);
}
-Status WriteBatchInternal::InsertInto(const WriteBatch* b,
- MemTable* memtable) {
- const int count = WriteBatchInternal::Count(b);
- int found = 0;
- Iterator it(*b);
- for (; !it.Done(); it.Next()) {
- switch (it.op()) {
- case kTypeDeletion:
- memtable->Add(it.sequence_number(), kTypeDeletion, it.key(), Slice());
- break;
- case kTypeValue:
- memtable->Add(it.sequence_number(), kTypeValue, it.key(), it.value());
- break;
- }
- found++;
+namespace {
+class MemTableInserter : public WriteBatch::Handler {
+ public:
+ SequenceNumber sequence_;
+ MemTable* mem_;
+
+ virtual void Put(const Slice& key, const Slice& value) {
+ mem_->Add(sequence_, kTypeValue, key, value);
+ sequence_++;
}
- if (!it.status().ok()) {
- return it.status();
- } else if (found != count) {
- return Status::Corruption("wrong count in WriteBatch");
+ virtual void Delete(const Slice& key) {
+ mem_->Add(sequence_, kTypeDeletion, key, Slice());
+ sequence_++;
}
- return Status::OK();
+};
+}
+
+Status WriteBatchInternal::InsertInto(const WriteBatch* b,
+ MemTable* memtable) {
+ MemTableInserter inserter;
+ inserter.sequence_ = WriteBatchInternal::Sequence(b);
+ inserter.mem_ = memtable;
+ return b->Iterate(&inserter);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= 12);
b->rep_.assign(contents.data(), contents.size());
}
-WriteBatchInternal::Iterator::Iterator(const WriteBatch& batch)
- : input_(WriteBatchInternal::Contents(&batch)),
- done_(false) {
- if (input_.size() < 12) {
- done_ = true;
- } else {
- seq_ = WriteBatchInternal::Sequence(&batch),
- input_.remove_prefix(12);
- GetNextEntry();
- }
-}
-
-void WriteBatchInternal::Iterator::Next() {
- assert(!done_);
- seq_++;
- GetNextEntry();
-}
-
-void WriteBatchInternal::Iterator::GetNextEntry() {
- if (input_.empty()) {