Skip to content

Commit

Permalink
storage: Revise storage interface and improves its implementations
Browse files Browse the repository at this point in the history
* This includes some changes related to the storage subsystem throughout the leveldb.
* ReadAt now replaced with Seek and Read instead.
* Rename, Size and Exist method are removed to simplify a storage implementation.
* Close method are now included in the Storage interface.
* GetFiles will also returns error.
* Print method renamed to Log.
  • Loading branch information
syndtr committed Oct 2, 2013
1 parent e76b19d commit 0e05891
Show file tree
Hide file tree
Showing 16 changed files with 709 additions and 750 deletions.
4 changes: 1 addition & 3 deletions leveldb/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var benchDB = filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbbench-%d", os.Ge

type dbBench struct {
b *testing.B
stor *storage.FileStorage
stor storage.Storage
db *DB

o *opt.Options
Expand Down Expand Up @@ -110,8 +110,6 @@ func openDBBench(b *testing.B, noCompress bool) *dbBench {

func (p *dbBench) reopen() {
p.db.Close()
p.stor.Close()

var err error
p.db, err = Open(p.stor, p.o)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions leveldb/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func (h *dbCorruptHarness) corrupt(ft storage.FileType, offset, n int) {
t := p.t

var file storage.File
for _, f := range p.stor.GetFiles(ft) {
ff, _ := p.stor.GetFiles(ft)
for _, f := range ff {
if file == nil || f.Num() > file.Num() {
file = f
}
Expand All @@ -77,11 +78,14 @@ func (h *dbCorruptHarness) corrupt(ft storage.FileType, offset, n int) {
if err != nil {
t.Fatal("cannot open file: ", err)
}
x, err := file.Size()
x, err := r.Seek(0, 2)
if err != nil {
t.Fatal("cannot query file size: ", err)
}
m := int(x)
if _, err := r.Seek(0, 0); err != nil {
t.Fatal(err)
}

if offset < 0 {
if -offset > m {
Expand Down
70 changes: 45 additions & 25 deletions leveldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func openDB(s *session) (*DB, error) {
}

// Remove any obsolete files.
db.cleanFiles()
if err := db.cleanFiles(); err != nil {
return nil, err
}

// Don't include compaction error goroutine into wait group.
go db.compactionError()
Expand Down Expand Up @@ -174,13 +176,14 @@ func OpenFile(path string, o *opt.Options) (*DB, error) {
// Also Recover will ignore opt.OFCreateIfMissing and opt.OFErrorIfExist flags.
//
// The DB must be closed after use, by calling Close method.
func Recover(p storage.Storage, o *opt.Options) (*DB, error) {
func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) {
if o.HasFlag(opt.OFStrict) {
return nil, errors.New("leveldb: cannot recovers the DB with strict flag")
err = errors.New("leveldb: cannot recovers the DB with strict flag")
return
}
s, err := newSession(p, o)
if err != nil {
return nil, err
return
}
defer func() {
if err != nil {
Expand All @@ -189,7 +192,11 @@ func Recover(p storage.Storage, o *opt.Options) (*DB, error) {
}()

// get all files
ff := files(s.getFiles(storage.TypeAll))
ff0, err := s.getFiles(storage.TypeAll)
if err != nil {
return
}
ff := files(ff0)
ff.sort()

s.printf("Recover: started, files=%d", len(ff))
Expand All @@ -204,39 +211,47 @@ func Recover(p storage.Storage, o *opt.Options) (*DB, error) {
continue
}

var size uint64
size, err = f.Size()
var r storage.Reader
r, err = f.Open()
if err != nil {
return nil, err
return
}
var size int64
size, err = r.Seek(0, 2)
r.Close()
if err != nil {
return
}

t := newTFile(f, size, nil, nil)
t := newTFile(f, uint64(size), nil, nil)
iter := s.tops.newIterator(t, ro)
// min ikey
if iter.First() {
t.min = iter.Key()
} else if err := iter.Error(); err != nil {
iter.Release()
return nil, err
} else {
err = iter.Error()
iter.Release()
continue
if err != nil {
return
} else {
continue
}
}
// max ikey
if iter.Last() {
t.max = iter.Key()
} else if err := iter.Error(); err != nil {
iter.Release()
return nil, err
} else {
err = iter.Error()
iter.Release()
continue
if err != nil {
return
} else {
continue
}
}
iter.Release()

// add table to level 0
rec.addTableFile(0, t)

nt = t
}

Expand All @@ -261,14 +276,15 @@ func Recover(p storage.Storage, o *opt.Options) (*DB, error) {
s.stFileNum = ff[len(ff)-1].Num() + 1

// create brand new manifest
if err = s.create(); err != nil {
return nil, err
err = s.create()
if err != nil {
return
}
// commit record
if err = s.commit(rec); err != nil {
return nil, err
err = s.commit(rec)
if err != nil {
return
}

return openDB(s)
}

Expand All @@ -278,7 +294,11 @@ func (d *DB) recoverJournal() error {

s.printf("JournalRecovery: started, min=%d", s.stJournalNum)

jfiles := files(s.getFiles(storage.TypeJournal))
ff0, err := s.getFiles(storage.TypeJournal)
if err != nil {
return err
}
jfiles := files(ff0)
jfiles.sort()
rJfiles := make([]storage.File, 0, len(jfiles))
for _, file := range jfiles {
Expand Down
15 changes: 9 additions & 6 deletions leveldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func tval(seed, n int) []byte {
type dbHarness struct {
t *testing.T

stor *testingStorage
stor *testStorage
db *DB
o *opt.Options
oo opt.OptionsSetter
Expand All @@ -63,7 +63,7 @@ func newDbHarness(t *testing.T) *dbHarness {

func (h *dbHarness) init(t *testing.T, o *opt.Options) {
h.t = t
h.stor = newTestingStorage(t)
h.stor = newTestStorage(t)
h.o = o
h.ro = &opt.ReadOptions{}
h.wo = &opt.WriteOptions{}
Expand All @@ -72,6 +72,7 @@ func (h *dbHarness) init(t *testing.T, o *opt.Options) {
}

func (h *dbHarness) openDB() {
h.t.Log("opening DB")
var err error
h.db, err = Open(h.stor, h.o)
if err != nil {
Expand All @@ -81,10 +82,12 @@ func (h *dbHarness) openDB() {
}

func (h *dbHarness) closeDB() {
h.t.Log("closing DB")
err := h.db.Close()
if err != nil {
h.t.Error("Close: got error: ", err)
}
h.stor.CheckClosed()
runtime.GC()
}

Expand Down Expand Up @@ -1526,23 +1529,23 @@ func TestDb_BloomFilter(t *testing.T) {
h.stor.DelaySync(storage.TypeTable)

// Lookup present keys. Should rarely read from small sstable.
h.stor.SetReadAtCounter(storage.TypeTable)
h.stor.SetReadCounter(storage.TypeTable)
for i := 0; i < n; i++ {
h.getVal(key(i), key(i))
}
cnt := int(h.stor.ReadAtCounter())
cnt := int(h.stor.ReadCounter())
t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)

if min, max := n, n+2*n/100; cnt < min || cnt > max {
t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
}

// Lookup missing keys. Should rarely read from either sstable.
h.stor.ResetReadAtCounter()
h.stor.ResetReadCounter()
for i := 0; i < n; i++ {
h.get(key(i)+".missing", false)
}
cnt = int(h.stor.ReadAtCounter())
cnt = int(h.stor.ReadCounter())
t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
if max := 3 * n / 100; cnt > max {
t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
Expand Down
13 changes: 10 additions & 3 deletions leveldb/db_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (p Sizes) Sum() (n uint64) {
}

// Remove unused files.
func (d *DB) cleanFiles() {
func (d *DB) cleanFiles() error {
s := d.s

v := s.version_NB()
Expand All @@ -50,7 +50,11 @@ func (d *DB) cleanFiles() {
}
}

for _, f := range s.getFiles(storage.TypeAll) {
ff, err := s.getFiles(storage.TypeAll)
if err != nil {
return err
}
for _, f := range ff {
keep := true
switch f.Type() {
case storage.TypeManifest:
Expand All @@ -66,7 +70,10 @@ func (d *DB) cleanFiles() {
}

if !keep {
f.Remove()
if err := f.Remove(); err != nil {
return err
}
}
}
return nil
}
2 changes: 1 addition & 1 deletion leveldb/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// Package memdb provide in-memory key/value database implementation.
// Package memdb provides in-memory key/value database implementation.
package memdb

import (
Expand Down
3 changes: 2 additions & 1 deletion leveldb/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)

// session represent a persistent database session.
Expand All @@ -28,7 +29,7 @@ type session struct {
stSeq uint64 // last mem compacted seq; need external synchronization

stor storage.Storage
storLock storage.Locker
storLock util.Releaser
o *iOptions
cmp *iComparer
tops *tOps
Expand Down
6 changes: 3 additions & 3 deletions leveldb/session_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func (d dropper) Drop(err error) {
}

func (s *session) print(v ...interface{}) {
s.stor.Print(fmt.Sprint(v...))
s.stor.Log(fmt.Sprint(v...))
}

func (s *session) printf(format string, v ...interface{}) {
s.stor.Print(fmt.Sprintf(format, v...))
s.stor.Log(fmt.Sprintf(format, v...))
}

// file utils
Expand All @@ -47,7 +47,7 @@ func (s *session) getTableFile(num uint64) storage.File {
return s.stor.GetFile(num, storage.TypeTable)
}

func (s *session) getFiles(t storage.FileType) []storage.File {
func (s *session) getFiles(t storage.FileType) ([]storage.File, error) {
return s.stor.GetFiles(t)
}

Expand Down
23 changes: 16 additions & 7 deletions leveldb/sorted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type stConstructor_Table struct {
func (p *stConstructor_Table) init(t *testing.T, ho *stHarnessOpt) error {
p.t = t

p.file = newTestingStorage(nil).GetFile(0, storage.TypeTable)
p.file = newTestStorage(t).GetFile(0, storage.TypeTable)
p.w, _ = p.file.Create()

o := &opt.Options{
Expand All @@ -69,14 +69,23 @@ func (p *stConstructor_Table) finish() (size int, err error) {

p.t.Logf("table: contains %d entries and %d blocks", p.tw.EntriesLen(), p.tw.BlocksLen())

tsize := uint64(p.tw.BytesLen())
tsize := int64(p.tw.BytesLen())

fsize, _ := p.file.Size()
r, err := p.file.Open()
if err != nil {
p.t.Fatal(err)
}
fsize, err := r.Seek(0, 2)
if err != nil {
p.t.Fatal(err)
}
if fsize != tsize {
p.t.Errorf("table: calculated size doesn't equal with actual size, calculated=%d actual=%d", tsize, fsize)
}

p.r, _ = p.file.Open()
if _, err := r.Seek(0, 0); err != nil {
p.t.Fatal(err)
}
p.r = r
o := &opt.Options{
BlockRestartInterval: 3,
Filter: filter.NewBloomFilter(10),
Expand Down Expand Up @@ -177,7 +186,7 @@ func (p *stConstructor_MergedMemDB) customTest(h *stHarness) {}
type stConstructor_DB struct {
t *testing.T

stor *testingStorage
stor *testStorage
ro *opt.ReadOptions
wo *opt.WriteOptions
db *DB
Expand All @@ -186,7 +195,7 @@ type stConstructor_DB struct {
func (p *stConstructor_DB) init(t *testing.T, ho *stHarnessOpt) (err error) {
ho.Randomize = true
p.t = t
p.stor = newTestingStorage(nil)
p.stor = newTestStorage(t)
o := &opt.Options{
Flag: opt.OFCreateIfMissing,
WriteBuffer: 2800,
Expand Down

0 comments on commit 0e05891

Please sign in to comment.