Skip to content

Commit

Permalink
leveldb: throttle transaction if there is to many tables in level-0
Browse files Browse the repository at this point in the history
  • Loading branch information
syndtr committed Oct 10, 2016
1 parent 1996ac2 commit 62a36c4
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
24 changes: 22 additions & 2 deletions leveldb/db_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ func (tr *Transaction) Commit() error {
// Committing transaction.
tr.rec.setSeqNum(tr.seq)
tr.db.compCommitLk.Lock()
defer tr.db.compCommitLk.Unlock()
for retry := 0; retry < 3; retry++ {
if err := tr.db.s.commit(&tr.rec); err != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, err)
select {
case <-time.After(time.Second):
case _, _ = <-tr.db.closeC:
tr.db.logf("transaction@commit exiting")
tr.db.compCommitLk.Unlock()
return err
}
} else {
Expand All @@ -219,6 +219,12 @@ func (tr *Transaction) Commit() error {
}
// Trigger table auto-compaction.
tr.db.compTrigger(tr.db.tcompCmdC)
tr.db.compCommitLk.Unlock()

// Additionally, wait compaction when certain threshold reached.
if err := tr.db.waitCompaction(); err != nil {
return err
}
}
return nil
}
Expand All @@ -245,10 +251,19 @@ func (tr *Transaction) Discard() {
tr.lk.Unlock()
}

func (db *DB) waitCompaction() error {
if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
return db.compTriggerWait(db.tcompCmdC)
}
return nil
}

// OpenTransaction opens an atomic DB transaction. Only one transaction can be
// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
// until in-flight transaction is committed or discarded.
// The returned transaction handle is goroutine-safe.
// The returned transaction handle is safe for concurrent use.
//
// Transaction is expensive and put restrains on the compaction.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction.
Expand Down Expand Up @@ -278,6 +293,11 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
}
}

// Wait compaction when certain threshold reached.
if err := db.waitCompaction(); err != nil {
return nil, err
}

tr := &Transaction{
db: db,
seq: db.seq,
Expand Down
9 changes: 5 additions & 4 deletions leveldb/db_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {

func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
flush := func() (retry bool) {
v := db.s.version()
defer v.release()
mdb = db.getEffectiveMem()
if mdb == nil {
err = ErrClosed
Expand All @@ -69,14 +69,15 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
mdb = nil
}
}()
tLen := db.s.tLen(0)
mdbFree = mdb.Free()
switch {
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
case tLen >= slowdownTrigger && !delayed:
delayed = true
time.Sleep(time.Millisecond)
case mdbFree >= n:
return false
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
case tLen >= pauseTrigger:
delayed = true
err = db.compTriggerWait(db.tcompCmdC)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions leveldb/session_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func (s *session) version() *version {
return s.stVersion
}

func (s *session) tLen(level int) int {
s.vmu.Lock()
defer s.vmu.Unlock()
return s.stVersion.tLen(level)
}

// Set current version to v.
func (s *session) setVersion(v *version) {
s.vmu.Lock()
Expand Down

0 comments on commit 62a36c4

Please sign in to comment.