Skip to content

Commit

Permalink
pump: Clean up obsolete value logs concurrently (#726)
Browse files Browse the repository at this point in the history
* pump: Clean up obsolete value logs concurrently

Deleting value log files before deleting the corresponding rows in
GoLevelDB doesn't affect correctness in this case, because:

1. the specified Ts is stored in LevelDB as gcTS before actually doing GC
2. PullCommitBinlogs uses `gcTS + 1` as the start of the range when iterating
   binlogs, so even if all the records in LevelDB are not deleted, we can
   still be sure that the garbage collected rows are not sent to drainers

So we don't have to wait for the potentially slow LevelDB cleanup to
remove unused value log files and reclaim most of the disk space.
  • Loading branch information
suzaku authored and IANTHEREAL committed Oct 8, 2019
1 parent 11b9d1d commit ff6f730
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 17 deletions.
23 changes: 14 additions & 9 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
return nil, errors.Trace(err)
}

vlog := new(valueLog)
err = vlog.open(valueDir, options)
vlog, err := newValueLog(valueDir, options)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -677,7 +676,15 @@ func (a *Append) GC(ts int64) {
}

func (a *Append) doGCTS(ts int64) {
log.Info("start gc", zap.Int64("ts", ts))
log.Info("Starting GC", zap.Int64("ts", ts))

var wg sync.WaitGroup
wg.Add(1)
go func() {
a.vlog.gcTS(ts)
log.Info("Finish VLog GC", zap.Int64("ts", ts))
wg.Done()
}()

batch := new(leveldb.Batch)
l0Trigger := defaultStorageKVConfig.CompactionL0Trigger
Expand All @@ -703,13 +710,13 @@ func (a *Append) doGCTS(ts int64) {
nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0")
if err != nil {
log.Error("get `leveldb.num-files-at-level0` property failed", zap.Error(err))
return
break
}

l0Num, err := strconv.Atoi(nStr)
if err != nil {
log.Error("parse `leveldb.num-files-at-level0` result to int failed", zap.String("str", nStr), zap.Error(err))
return
break
}

if l0Num >= l0Trigger {
Expand Down Expand Up @@ -757,20 +764,18 @@ func (a *Append) doGCTS(ts int64) {
deletedKv.Add(float64(batch.Len()))
batch.Reset()
}
log.Info("Finish KV GC", zap.Int64("ts", ts), zap.Int("delete num", deleteNum))
break
}

if len(lastKey) > 0 {
irange.Start = lastKey
a.vlog.gcTS(decodeTSKey(lastKey))
doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(decodeTSKey(lastKey)))))
}
log.Info("has delete", zap.Int("delete num", deleteNum))
}

a.vlog.gcTS(ts)
wg.Wait()
doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(ts))))
log.Info("finish gc", zap.Int64("ts", ts), zap.Int("delete num", deleteNum))
}

// MaxCommitTS implement Storage.MaxCommitTS
Expand Down
15 changes: 14 additions & 1 deletion pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,20 @@ type valueLog struct {
sync bool
maxFid uint32
filesLock sync.RWMutex
gcLock sync.Mutex
filesMap map[uint32]*logFile

opt *Options
}

func newValueLog(valueDir string, options *Options) (*valueLog, error) {
vlog := new(valueLog)
if err := vlog.open(valueDir, options); err != nil {
return nil, errors.Trace(err)
}
return vlog, nil
}

func (vlog *valueLog) filePath(fid uint32) string {
return filepath.Join(vlog.dirPath, fmt.Sprintf("%06d%s", fid, fileExt))
}
Expand Down Expand Up @@ -428,6 +437,8 @@ func (vlog *valueLog) scanRequests(start valuePointer, fn func(*request) error)
// that every single binlog added would be visited
// 2. If GC is running concurrently, logFiles may be closed and deleted, thus breaking the scanning.
func (vlog *valueLog) scan(start valuePointer, fn func(vp valuePointer, record *Record) error) error {
vlog.gcLock.Lock()
defer vlog.gcLock.Unlock()
vlog.filesLock.RLock()
fids := vlog.sortedFids()
var lfs []*logFile
Expand Down Expand Up @@ -455,7 +466,9 @@ func (vlog *valueLog) scan(start valuePointer, fn func(vp valuePointer, record *

// delete data <= gcTS
func (vlog *valueLog) gcTS(gcTS int64) {
log.Info("gc vlog", zap.Int64("ts", gcTS))
log.Info("GC vlog", zap.Int64("ts", gcTS))
vlog.gcLock.Lock()
defer vlog.gcLock.Unlock()

vlog.filesLock.Lock()
var toDeleteFiles []*logFile
Expand Down
25 changes: 18 additions & 7 deletions pump/storage/vlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func newVlogWithOptions(c *check.C, options *Options) *valueLog {
err = os.Mkdir(dir, 0777)
c.Assert(err, check.IsNil)

vlog := new(valueLog)
err = vlog.open(dir, options)
vlog, err := newValueLog(dir, options)
c.Assert(err, check.IsNil)

return vlog
Expand Down Expand Up @@ -144,8 +143,7 @@ func (vs *VlogSuit) TestCloseAndOpen(c *check.C) {
var err = vlog.close()
c.Assert(err, check.IsNil)

vlog = new(valueLog)
err = vlog.open(dirPath, opt)
vlog, err = newValueLog(dirPath, opt)
c.Assert(err, check.IsNil)

batch = batch[:0]
Expand Down Expand Up @@ -193,10 +191,24 @@ func (vs *VlogSuit) TestGCTS(c *check.C) {

before := len(vlog.filesMap)
c.Logf("before log file num: %d", before)
vlog.gcTS(90)

vlog.gcLock.Lock()

gcDone := make(chan struct{})
go func() {
// The following call should block waiting for the gcLock
vlog.gcTS(90)
close(gcDone)
}()

after := len(vlog.filesMap)
c.Logf("after log file num: %d", after)
c.Assert(after, check.Equals, before, check.Commentf("gc is not prevented"))

vlog.gcLock.Unlock()
<-gcDone
after = len(vlog.filesMap)
c.Logf("after log file num: %d", after)
c.Assert(after, check.Less, before, check.Commentf("no file is deleted"))

// ts 0 has been gc
Expand Down Expand Up @@ -247,8 +259,7 @@ func (vs *VlogSuit) TestNoSpace(c *check.C) {
c.Assert(err, check.IsNil)
}()

vlog := new(valueLog)
err = vlog.open(dir, DefaultOptions())
vlog, err := newValueLog(dir, DefaultOptions())
c.Assert(err, check.IsNil)

// Size of the encoded record should be 1024 + headerLength = 1040
Expand Down

0 comments on commit ff6f730

Please sign in to comment.