From 4f4bfef49d37a52b25b57dc80635e14a25f501ea Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 20 Aug 2019 11:01:31 +0800 Subject: [PATCH] Avoid repeated called of NewIterator which might be slow (#718) --- pump/storage/metrics.go | 18 ++++++++++++++++++ pump/storage/storage.go | 21 ++++++++++++--------- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/pump/storage/metrics.go b/pump/storage/metrics.go index ed03ca87c..97db9439f 100644 --- a/pump/storage/metrics.go +++ b/pump/storage/metrics.go @@ -26,6 +26,22 @@ var ( Help: "gc ts of storage", }) + doneGcTSGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "binlog", + Subsystem: "pump_storage", + Name: "done_gc_ts", + Help: "the metadata and vlog after this gc ts has been collected", + }) + + deletedKv = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "binlog", + Subsystem: "pump_storage", + Name: "deleted_kv_total", + Help: "deleted kv number", + }) + storageSizeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "binlog", @@ -97,6 +113,8 @@ var ( // InitMetircs register the metrics to registry func InitMetircs(registry *prometheus.Registry) { registry.MustRegister(gcTSGauge) + registry.MustRegister(doneGcTSGauge) + registry.MustRegister(deletedKv) registry.MustRegister(maxCommitTSGauge) registry.MustRegister(tikvQueryCount) registry.MustRegister(errorCount) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 9360b89ed..fab8d5053 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -660,6 +660,12 @@ func (a *Append) doGCTS(ts int64) { deleteNum := 0 + irange := &util.Range{ + Start: encodeTSKey(0), + Limit: encodeTSKey(ts + 1), + } + iter := a.metadata.NewIterator(irange, nil) + for { nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0") if err != nil { @@ -679,12 +685,6 @@ func (a *Append) doGCTS(ts int64) { continue } - irange := &util.Range{ - Start: encodeTSKey(0), - Limit: encodeTSKey(ts + 1), - } - iter := a.metadata.NewIterator(irange, nil) - deleteBatch := 0 var lastKey []byte @@ -698,19 +698,19 @@ func (a *Append) doGCTS(ts int64) { if err != nil { log.Error("write batch failed", zap.Error(err)) } + deletedKv.Add(float64(batch.Len())) batch.Reset() deleteBatch++ } } - iter.Release() - if deleteBatch < 100 { if batch.Len() > 0 { err := a.metadata.Write(batch, nil) if err != nil { log.Error("write batch failed", zap.Error(err)) } + deletedKv.Add(float64(batch.Len())) batch.Reset() } break @@ -718,12 +718,15 @@ func (a *Append) doGCTS(ts int64) { if len(lastKey) > 0 { a.vlog.gcTS(decodeTSKey(lastKey)) + doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(decodeTSKey(lastKey))))) } - log.Info("has delete", zap.Int("delete num", deleteNum)) } + iter.Release() + a.vlog.gcTS(ts) + doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(ts)))) log.Info("finish gc", zap.Int64("ts", ts), zap.Int("delete num", deleteNum)) }