Skip to content

Commit

Permalink
pump/: Refine gc of pump
Browse files Browse the repository at this point in the history
1, Add a API to trigger gc
2, Check L0 files by *GetProperty* instead of Stats
Stats can't get the right L0 files num, see:
https://github.com/syndtr/goleveldb/pull/283/files
3, call vlog.gcTS step by step, help free space quickly
  • Loading branch information
july2993 committed Jun 23, 2019
1 parent 0b0000d commit b3e2cc3
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 22 deletions.
48 changes: 32 additions & 16 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Server struct {
cancel context.CancelFunc
wg sync.WaitGroup
gcDuration time.Duration
triggerGC chan time.Time
metrics *util.MetricClient
// save the last time we write binlog to Storage
// if long time not write, we can write a fake binlog
Expand Down Expand Up @@ -172,6 +173,7 @@ func NewServer(cfg *Config) (*Server, error) {
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
pdCli: pdCli,
cfg: cfg,
triggerGC: make(chan time.Time),
}, nil
}

Expand Down Expand Up @@ -389,6 +391,7 @@ func (s *Server) Start() error {
router.HandleFunc("/state/{nodeID}/{action}", s.ApplyAction).Methods("PUT")
router.HandleFunc("/drainers", s.AllDrainers).Methods("GET")
router.HandleFunc("/debug/binlog/{ts}", s.BinlogByTS).Methods("GET")
router.HandleFunc("/debug/gc/trigger", s.TriggerGC).Methods("GET")
http.Handle("/", router)
prometheus.DefaultGatherer = registry
http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -500,26 +503,29 @@ func (s *Server) gcBinlogFile() {
case <-s.ctx.Done():
log.Info("gcBinlogFile exit")
return
case <-s.triggerGC:
log.Info("trigger gc now")
case <-time.After(gcInterval):
if s.gcDuration == 0 {
continue
}
}

safeTSO, err := s.getSafeGCTSOForDrainers(s.ctx)
if err != nil {
log.Warn("get save gc tso for drainers failed", zap.Error(err))
continue
}
log.Info("get safe ts for drainers success", zap.Int64("ts", safeTSO))
if s.gcDuration == 0 {
continue
}

millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000
gcTS := int64(oracle.EncodeTSO(millisecond))
if safeTSO < gcTS {
gcTS = safeTSO
}
log.Info("send gc request to storage", zap.Int64("ts", gcTS))
s.storage.GCTS(gcTS)
safeTSO, err := s.getSafeGCTSOForDrainers(s.ctx)
if err != nil {
log.Warn("get save gc tso for drainers failed", zap.Error(err))
continue
}
log.Info("get safe ts for drainers success", zap.Int64("ts", safeTSO))

millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000
gcTS := int64(oracle.EncodeTSO(millisecond))
if safeTSO < gcTS {
gcTS = safeTSO
}
log.Info("send gc request to storage", zap.Int64("ts", gcTS))
s.storage.GCTS(gcTS)
}
}

Expand Down Expand Up @@ -577,6 +583,16 @@ func (s *Server) Status(w http.ResponseWriter, r *http.Request) {
s.PumpStatus().Status(w, r)
}

// TriggerGC trigger pump to gc now
func (s *Server) TriggerGC(w http.ResponseWriter, r *http.Request) {
select {
case s.triggerGC <- time.Now():
fmt.Fprintln(w, "trigger gc success")
case <-time.After(time.Second):
fmt.Fprintln(w, "gc is working")
}
}

// BinlogByTS exposes api get get binlog by ts
func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) {
tsStr := mux.Vars(r)["ts"]
Expand Down
27 changes: 21 additions & 6 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -597,7 +598,6 @@ func (a *Append) GCTS(ts int64) {
// so we forward a little bit to make sure we can get the according P binlog
a.doGCTS(ts - int64(oracle.EncodeTSO(maxTxnTimeoutSecond*1000)))
}()

}

func (a *Append) doGCTS(ts int64) {
Expand All @@ -610,15 +610,22 @@ func (a *Append) doGCTS(ts int64) {
}

for {
var stats leveldb.DBStats
err := a.metadata.Stats(&stats)
nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0")
if err != nil {
log.Error("Stats failed", zap.Error(err))
log.Error("GetProperty failed", zap.Error(err))
time.Sleep(5 * time.Second)
continue
}
if len(stats.LevelTablesCounts) > 0 && stats.LevelTablesCounts[0] >= l0Trigger {
log.Info("wait some time to gc cause too many L0 file", zap.Int("files", stats.LevelTablesCounts[0]))

l0Num, err := strconv.Atoi(nStr)
if err != nil {
log.Error("parse int failed", zap.String("str", nStr), zap.Error(err))
time.Sleep(5 * time.Second)
continue
}

if l0Num >= l0Trigger {
log.Info("wait some time to gc cause too many L0 file", zap.Int("files", l0Num))
time.Sleep(5 * time.Second)
continue
}
Expand All @@ -630,8 +637,12 @@ func (a *Append) doGCTS(ts int64) {
iter := a.metadata.NewIterator(irange, nil)

deleteBatch := 0
var lastKey []byte

for iter.Next() && deleteBatch < 100 {
batch.Delete(iter.Key())
lastKey = iter.Key()

if batch.Len() == 1024 {
err := a.metadata.Write(batch, nil)
if err != nil {
Expand All @@ -654,6 +665,10 @@ func (a *Append) doGCTS(ts int64) {
}
break
}

if len(lastKey) > 0 {
a.vlog.gcTS(decodeTSKey(lastKey))
}
}

a.vlog.gcTS(ts)
Expand Down
1 change: 1 addition & 0 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func (vlog *valueLog) gcTS(gcTS int64) {
if err != nil {
log.Error("remove file failed", zap.String("path", logFile.path), zap.Error(err))
}
log.Info("remove file", zap.String("path", logFile.path))
logFile.lock.Unlock()
}
}

0 comments on commit b3e2cc3

Please sign in to comment.