Skip to content

Commit

Permalink
leveldb: Improvements and some bugfix in compaction code
Browse files Browse the repository at this point in the history
- Split mem and table compaction into two goroutine
- CompactRange now also flush the memdb if it's within range
- Fix (*DB).flush that return invalid memdb free bytes
  • Loading branch information
syndtr committed Jul 5, 2014
1 parent 5b52393 commit de350e4
Show file tree
Hide file tree
Showing 4 changed files with 404 additions and 307 deletions.
131 changes: 44 additions & 87 deletions leveldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,28 @@ type DB struct {
snapsRoot snapshotElement

// Write
writeCh chan *Batch
writeMergedCh chan bool
writeLockCh chan struct{}
writeAckCh chan error
journalCh chan *Batch
journalAckCh chan error
writeC chan *Batch
writeMergedC chan bool
writeLockC chan struct{}
writeAckC chan error
journalC chan *Batch
journalAckC chan error

// Compaction
compCh chan chan<- struct{}
compMemCh chan chan<- struct{}
compMemAckCh chan struct{}
compReqCh chan *cReq
compErrCh chan error
compErrSetCh chan error
compStats [kNumLevels]cStats
tcompCmdC chan cCmd
tcompPauseC chan chan<- struct{}
tcompTriggerC chan struct{}
mcompCmdC chan cCmd
mcompTriggerC chan struct{}
compErrC chan error
compErrSetC chan error
compStats [kNumLevels]cStats

// Close
closeWg sync.WaitGroup
closeCh chan struct{}
closed uint32
closer io.Closer
closeW sync.WaitGroup
closeC chan struct{}
closed uint32
closer io.Closer
}

func openDB(s *session) (*DB, error) {
Expand All @@ -85,24 +86,24 @@ func openDB(s *session) (*DB, error) {
// Initial sequence
seq: s.stSeq,
// Write
writeCh: make(chan *Batch),
writeMergedCh: make(chan bool),
writeLockCh: make(chan struct{}, 1),
writeAckCh: make(chan error),
journalCh: make(chan *Batch),
journalAckCh: make(chan error),
writeC: make(chan *Batch),
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error),
journalC: make(chan *Batch),
journalAckC: make(chan error),
// Compaction
compCh: make(chan chan<- struct{}, 1),
compMemCh: make(chan chan<- struct{}, 1),
compMemAckCh: make(chan struct{}, 1),
compReqCh: make(chan *cReq),
compErrCh: make(chan error),
compErrSetCh: make(chan error),
tcompCmdC: make(chan cCmd),
tcompPauseC: make(chan chan<- struct{}),
tcompTriggerC: make(chan struct{}, 1),
mcompCmdC: make(chan cCmd),
mcompTriggerC: make(chan struct{}, 1),
compErrC: make(chan error),
compErrSetC: make(chan error),
// Close
closeCh: make(chan struct{}),
closeC: make(chan struct{}),
}
db.initSnapshot()
db.compMemAckCh <- struct{}{}

if err := db.recoverJournal(); err != nil {
return nil, err
Expand All @@ -116,10 +117,10 @@ func openDB(s *session) (*DB, error) {
// Don't include compaction error goroutine into wait group.
go db.compactionError()

db.closeWg.Add(2)
go db.compaction()
go db.writeJournal()
db.wakeCompaction(0)
db.closeW.Add(3)
go db.tCompaction()
go db.mCompaction()
go db.jWriter()

s.logf("db@open done T·%v", time.Since(start))

Expand Down Expand Up @@ -535,8 +536,8 @@ func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err
value, cSched, err := v.get(ikey, ro)
v.release()
if cSched {
// Wake compaction.
d.wakeCompaction(0)
// Trigger table compaction.
d.compTrigger(d.tcompTriggerC)
}
return
}
Expand Down Expand Up @@ -693,46 +694,6 @@ func (d *DB) GetApproximateSizes(ranges []util.Range) (Sizes, error) {
return sizes, nil
}

// CompactRange compacts the underlying DB for the given key range.
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
// be invoked by users who understand the underlying implementation.
//
// A nil Range.Start is treated as a key before all keys in the DB.
// And a nil Range.Limit is treated as a key after all keys in the DB.
// Therefore if both is nil then it will compact entire DB.
func (d *DB) CompactRange(r util.Range) error {
err := d.ok()
if err != nil {
return err
}

cch := make(chan struct{})
req := &cReq{
level: -1,
min: r.Start,
max: r.Limit,
cch: cch,
}

// Push manual compaction request.
select {
case _, _ = <-d.closeCh:
return ErrClosed
case err := <-d.compErrCh:
return err
case d.compReqCh <- req:
}
// Wait for compaction
select {
case _, _ = <-d.closeCh:
return ErrClosed
case <-cch:
}
return nil
}

// Close closes the DB. This will also releases any outstanding snapshot.
//
// It is not safe to close a DB until all outstanding iterators are released.
Expand All @@ -753,22 +714,22 @@ func (d *DB) Close() error {
// Get compaction error.
var err error
select {
case err = <-d.compErrCh:
case err = <-d.compErrC:
default:
}

close(d.closeCh)
close(d.closeC)

// wait for the WaitGroup
d.closeWg.Wait()
// Wait for the close WaitGroup.
d.closeW.Wait()

// close journal
// Close journal.
if d.journal != nil {
d.journal.Close()
d.journalWriter.Close()
}

// close session
// Close session.
s.close()
s.logf("db@close done T·%v", time.Since(start))
s.release()
Expand All @@ -789,9 +750,5 @@ func (d *DB) Close() error {
d.snapsRoot = snapshotElement{}
d.closer = nil

close(d.writeCh)
close(d.journalCh)
close(d.compReqCh)

return err
}

0 comments on commit de350e4

Please sign in to comment.