Skip to content

Commit

Permalink
refine gc strategy of pump (#646) (#663)
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL committed Jul 13, 2019
1 parent d3361c7 commit 150b982
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 37 deletions.
13 changes: 7 additions & 6 deletions pump/config.go
Expand Up @@ -50,12 +50,13 @@ type Config struct {
Socket string `toml:"socket" json:"socket"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
EtcdDialTimeout time.Duration
DataDir string `toml:"data-dir" json:"data-dir"`
HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"`
GC int `toml:"gc" json:"gc"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
Security security.Config `toml:"security" json:"security"`
DataDir string `toml:"data-dir" json:"data-dir"`
HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"`
// pump only stores binlog events whose ts >= current time - GC(day)
GC int `toml:"gc" json:"gc"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
Security security.Config `toml:"security" json:"security"`

GenFakeBinlogInterval int `toml:"gen-binlog-interval" json:"gen-binlog-interval"`

Expand Down
8 changes: 8 additions & 0 deletions pump/metrics.go
Expand Up @@ -28,6 +28,14 @@ var (
Name: "loss_binlog_count",
Help: "Total loss binlog count",
})

detectedDrainerBinlogPurged = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "binlog",
Subsystem: "pump",
Name: "detected_drainer_binlog_purge_count",
Help: "binlog purge count > 0 means some unread binlog was purged",
}, []string{"id"})
)

var registry = prometheus.NewRegistry()
Expand Down
77 changes: 49 additions & 28 deletions pump/server.go
Expand Up @@ -3,7 +3,6 @@ package pump
import (
"encoding/json"
"fmt"
"math"
"net"
"net/http"
"net/url"
Expand All @@ -24,7 +23,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tipb/go-binlog"
binlog "github.com/pingcap/tipb/go-binlog"
pb "github.com/pingcap/tipb/go-binlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -35,13 +34,13 @@ import (
"google.golang.org/grpc/credentials"
)

var notifyDrainerTimeout = time.Second * 10

// GlobalConfig is global config of pump
var GlobalConfig *globalConfig

const (
pdReconnTimes = 30
var (
notifyDrainerTimeout = time.Second * 10
pdReconnTimes = 30
earlyAlertGC = 20 * time.Hour
detectDrainerCheckpointInterval = 10 * time.Minute
// GlobalConfig is global config of pump
GlobalConfig *globalConfig
)

// Server implements the gRPC interface,
Expand Down Expand Up @@ -268,6 +267,11 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi
// don't use pos.Suffix now, use offset like last commitTS
last := in.StartFrom.Offset

gcTS := s.storage.GetGCTS()
if last <= gcTS {
log.Errorf("drainer request a purged binlog (gc ts = %d), request %+v, some binlog events may be loss", gcTS, in)
}

ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
binlogs := s.storage.PullCommitBinlog(ctx, last)
Expand Down Expand Up @@ -372,6 +376,9 @@ func (s *Server) Start() error {
s.wg.Add(1)
go s.printServerInfo()

s.wg.Add(1)
go s.detectDrainerCheckpoint()

// register pump with gRPC server and start to serve listeners
binlog.RegisterPumpServer(s.gs, s)

Expand Down Expand Up @@ -483,6 +490,29 @@ func (s *Server) genForwardBinlog() {
}
}

func (s *Server) detectDrainerCheckpoint() {
defer s.wg.Done()

ticker := time.NewTicker(detectDrainerCheckpointInterval)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
log.Info("detect drainer checkpoint routine exit")
return
case <-ticker.C:
gcTS := s.storage.GetGCTS()
alertGCMS := earlyAlertGC.Nanoseconds() / 1000 / 1000
alertGCTS := gcTS + int64(oracle.EncodeTSO(alertGCMS))

log.Infof("use gc ts %d to detect drainer checkpoint", gcTS)
// detect whether the binlog before drainer's checkpoint had been purged
s.detectDrainerCheckPoints(s.ctx, alertGCTS)
}
}
}

func (s *Server) printServerInfo() {
defer s.wg.Done()

Expand Down Expand Up @@ -519,43 +549,34 @@ func (s *Server) gcBinlogFile() {
continue
}

safeTSO, err := s.getSafeGCTSOForDrainers()
if err != nil {
log.Warn("get save gc tso for drainers failed: %+v", err)
continue
}
log.Infof("get safe ts for drainers success, ts: %d", safeTSO)

millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000
gcTS := int64(oracle.EncodeTSO(millisecond))
if safeTSO < gcTS {
gcTS = safeTSO
}

log.Infof("send gc request to storage, ts: %d", gcTS)
s.storage.GCTS(gcTS)
s.storage.GC(gcTS)
}
}

func (s *Server) getSafeGCTSOForDrainers() (int64, error) {
func (s *Server) detectDrainerCheckPoints(ctx context.Context, gcTS int64) {
pumpNode := s.node.(*pumpNode)

drainers, err := pumpNode.Nodes(s.ctx, "drainers")
drainers, err := pumpNode.Nodes(ctx, "drainers")
if err != nil {
return 0, errors.Trace(err)
log.Error("fail to query status of drainers: %v", err)
return
}

var minTSO int64 = math.MaxInt64
for _, drainer := range drainers {
if drainer.State == node.Offline {
continue
}

if drainer.MaxCommitTS < minTSO {
minTSO = drainer.MaxCommitTS
if drainer.MaxCommitTS < gcTS {
log.Errorf("drainer(%s) checkpoint(max commit ts = %d) is older than pump gc ts(%d), some binlogs are purged", drainer.NodeID, drainer.MaxCommitTS, gcTS)
// will add test when binlog have failpoint
detectedDrainerBinlogPurged.WithLabelValues(drainer.NodeID).Inc()
}
}

return minTSO, nil
}

func (s *Server) startMetrics() {
Expand Down
13 changes: 10 additions & 3 deletions pump/storage/storage.go
Expand Up @@ -49,7 +49,9 @@ type Storage interface {
WriteBinlog(binlog *pb.Binlog) error

// delete <= ts
GCTS(ts int64)
GC(ts int64)

GetGCTS() int64

MaxCommitTS() int64

Expand Down Expand Up @@ -533,8 +535,13 @@ func (a *Append) Close() error {
return err
}

// GCTS implement Storage.GCTS
func (a *Append) GCTS(ts int64) {
// GetGCTS implement Storage.GetGCTS
func (a *Append) GetGCTS() int64 {
return atomic.LoadInt64(&a.gcTS)
}

// GC implement Storage.GC
func (a *Append) GC(ts int64) {
lastTS := atomic.LoadInt64(&a.gcTS)
if ts <= lastTS {
log.Infof("ignore gc ts: %d, last gc ts: %d", ts, lastTS)
Expand Down

0 comments on commit 150b982

Please sign in to comment.