Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine gc strategy of pump (#646) #663

Merged
merged 4 commits into from Jul 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions pump/config.go
Expand Up @@ -50,12 +50,13 @@ type Config struct {
Socket string `toml:"socket" json:"socket"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
EtcdDialTimeout time.Duration
DataDir string `toml:"data-dir" json:"data-dir"`
HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"`
GC int `toml:"gc" json:"gc"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
Security security.Config `toml:"security" json:"security"`
DataDir string `toml:"data-dir" json:"data-dir"`
HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"`
// pump only stores binlog events whose ts >= current time - GC(day)
GC int `toml:"gc" json:"gc"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
Security security.Config `toml:"security" json:"security"`

GenFakeBinlogInterval int `toml:"gen-binlog-interval" json:"gen-binlog-interval"`

Expand Down
8 changes: 8 additions & 0 deletions pump/metrics.go
Expand Up @@ -28,6 +28,14 @@ var (
Name: "loss_binlog_count",
Help: "Total loss binlog count",
})

detectedDrainerBinlogPurged = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "binlog",
Subsystem: "pump",
Name: "detected_drainer_binlog_purge_count",
Help: "binlog purge count > 0 means some unread binlog was purged",
}, []string{"id"})
)

var registry = prometheus.NewRegistry()
Expand Down
77 changes: 49 additions & 28 deletions pump/server.go
Expand Up @@ -3,7 +3,6 @@ package pump
import (
"encoding/json"
"fmt"
"math"
"net"
"net/http"
"net/url"
Expand All @@ -24,7 +23,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tipb/go-binlog"
binlog "github.com/pingcap/tipb/go-binlog"
pb "github.com/pingcap/tipb/go-binlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -35,13 +34,13 @@ import (
"google.golang.org/grpc/credentials"
)

var notifyDrainerTimeout = time.Second * 10

// GlobalConfig is global config of pump
var GlobalConfig *globalConfig

const (
pdReconnTimes = 30
var (
notifyDrainerTimeout = time.Second * 10
pdReconnTimes = 30
earlyAlertGC = 20 * time.Hour
detectDrainerCheckpointInterval = 10 * time.Minute
// GlobalConfig is global config of pump
GlobalConfig *globalConfig
)

// Server implements the gRPC interface,
Expand Down Expand Up @@ -268,6 +267,11 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi
// don't use pos.Suffix now, use offset like last commitTS
last := in.StartFrom.Offset

gcTS := s.storage.GetGCTS()
if last <= gcTS {
log.Errorf("drainer request a purged binlog (gc ts = %d), request %+v, some binlog events may be loss", gcTS, in)
}

ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
binlogs := s.storage.PullCommitBinlog(ctx, last)
Expand Down Expand Up @@ -372,6 +376,9 @@ func (s *Server) Start() error {
s.wg.Add(1)
go s.printServerInfo()

s.wg.Add(1)
go s.detectDrainerCheckpoint()

// register pump with gRPC server and start to serve listeners
binlog.RegisterPumpServer(s.gs, s)

Expand Down Expand Up @@ -483,6 +490,29 @@ func (s *Server) genForwardBinlog() {
}
}

func (s *Server) detectDrainerCheckpoint() {
defer s.wg.Done()

ticker := time.NewTicker(detectDrainerCheckpointInterval)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
log.Info("detect drainer checkpoint routine exit")
return
case <-ticker.C:
gcTS := s.storage.GetGCTS()
alertGCMS := earlyAlertGC.Nanoseconds() / 1000 / 1000
alertGCTS := gcTS + int64(oracle.EncodeTSO(alertGCMS))

log.Infof("use gc ts %d to detect drainer checkpoint", gcTS)
// detect whether the binlog before drainer's checkpoint had been purged
s.detectDrainerCheckPoints(s.ctx, alertGCTS)
}
}
}

func (s *Server) printServerInfo() {
defer s.wg.Done()

Expand Down Expand Up @@ -519,43 +549,34 @@ func (s *Server) gcBinlogFile() {
continue
}

safeTSO, err := s.getSafeGCTSOForDrainers()
if err != nil {
log.Warn("get save gc tso for drainers failed: %+v", err)
continue
}
log.Infof("get safe ts for drainers success, ts: %d", safeTSO)

millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000
gcTS := int64(oracle.EncodeTSO(millisecond))
if safeTSO < gcTS {
gcTS = safeTSO
}

log.Infof("send gc request to storage, ts: %d", gcTS)
s.storage.GCTS(gcTS)
s.storage.GC(gcTS)
}
}

func (s *Server) getSafeGCTSOForDrainers() (int64, error) {
func (s *Server) detectDrainerCheckPoints(ctx context.Context, gcTS int64) {
pumpNode := s.node.(*pumpNode)

drainers, err := pumpNode.Nodes(s.ctx, "drainers")
drainers, err := pumpNode.Nodes(ctx, "drainers")
if err != nil {
return 0, errors.Trace(err)
log.Error("fail to query status of drainers: %v", err)
return
}

var minTSO int64 = math.MaxInt64
for _, drainer := range drainers {
if drainer.State == node.Offline {
continue
}

if drainer.MaxCommitTS < minTSO {
minTSO = drainer.MaxCommitTS
if drainer.MaxCommitTS < gcTS {
log.Errorf("drainer(%s) checkpoint(max commit ts = %d) is older than pump gc ts(%d), some binlogs are purged", drainer.NodeID, drainer.MaxCommitTS, gcTS)
// will add test when binlog have failpoint
detectedDrainerBinlogPurged.WithLabelValues(drainer.NodeID).Inc()
}
}

return minTSO, nil
}

func (s *Server) startMetrics() {
Expand Down
13 changes: 10 additions & 3 deletions pump/storage/storage.go
Expand Up @@ -49,7 +49,9 @@ type Storage interface {
WriteBinlog(binlog *pb.Binlog) error

// delete <= ts
GCTS(ts int64)
GC(ts int64)

GetGCTS() int64

MaxCommitTS() int64

Expand Down Expand Up @@ -533,8 +535,13 @@ func (a *Append) Close() error {
return err
}

// GCTS implement Storage.GCTS
func (a *Append) GCTS(ts int64) {
// GetGCTS implement Storage.GetGCTS
func (a *Append) GetGCTS() int64 {
return atomic.LoadInt64(&a.gcTS)
}

// GC implement Storage.GC
func (a *Append) GC(ts int64) {
lastTS := atomic.LoadInt64(&a.gcTS)
if ts <= lastTS {
log.Infof("ignore gc ts: %d, last gc ts: %d", ts, lastTS)
Expand Down