Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add storeID to metrics back #1506

Merged
merged 2 commits into from Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 20 additions & 15 deletions server/coordinator.go
Expand Up @@ -15,6 +15,7 @@ package server

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -304,45 +305,49 @@ func (c *coordinator) collectHotSpotMetrics() {
status := s.Scheduler.(hasHotStatus).GetHotWriteStatus()
for _, s := range stores {
storeAddress := s.GetAddress()
stat, ok := status.AsPeer[s.GetID()]
storeID := s.GetID()
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsPeer[storeID]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(hotWriteRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(hotWriteRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0)
}

stat, ok = status.AsLeader[s.GetID()]
stat, ok = status.AsLeader[storeID]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(hotWriteRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(hotWriteRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
}
}

// Collects hot read region metrics.
status = s.Scheduler.(hasHotStatus).GetHotReadStatus()
for _, s := range stores {
storeAddress := s.GetAddress()
stat, ok := status.AsLeader[s.GetID()]
storeID := s.GetID()
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsLeader[storeID]
if ok {
totalReadBytes := float64(stat.TotalFlowBytes)
hotReadRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(totalReadBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(hotReadRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(totalReadBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(hotReadRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}
}

Expand Down
16 changes: 9 additions & 7 deletions server/grpc_service.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -349,44 +350,45 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
}

storeID := request.GetLeader().GetStoreId()
storeLabel := strconv.FormatUint(storeID, 10)
store, err := cluster.GetStore(storeID)
if err != nil {
return err
}
storeAddress := store.GetAddress()

regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "recv").Inc()
regionHeartbeatLatency.WithLabelValues(storeAddress).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp()))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "recv").Inc()
regionHeartbeatLatency.WithLabelValues(storeAddress, storeLabel).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp()))

cluster.RLock()
hbStreams := cluster.coordinator.hbStreams
cluster.RUnlock()

if time.Since(lastBind) > s.cfg.heartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "bind").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
hbStreams.bindStream(storeID, server)
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
if region.GetID() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
continue
}
if region.GetLeader() == nil {
msg := fmt.Sprintf("invalid request leader, %v", request)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
continue
}

err = cluster.HandleRegionHeartbeat(region)
if err != nil {
msg := err.Error()
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
}

regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()
}
}

Expand Down
17 changes: 10 additions & 7 deletions server/heartbeat_streams.go
Expand Up @@ -15,6 +15,7 @@ package server

import (
"context"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -79,6 +80,7 @@ func (s *heartbeatStreams) run() {
s.streams[update.storeID] = update.stream
case msg := <-s.msgCh:
storeID := msg.GetTargetPeer().GetStoreId()
storeLabel := strconv.FormatUint(storeID, 10)
store, err := s.cluster.GetStore(storeID)
if err != nil {
log.Error("fail to get store",
Expand All @@ -94,15 +96,15 @@ func (s *heartbeatStreams) run() {
log.Error("send heartbeat message fail",
zap.Uint64("region-id", msg.RegionId), zap.Error(err))
delete(s.streams, storeID)
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "err").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "err").Inc()
} else {
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "ok").Inc()
}
} else {
log.Debug("heartbeat stream not found, skip send message",
zap.Uint64("region-id", msg.RegionId),
zap.Uint64("store-id", storeID))
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "skip").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "skip").Inc()
}
case <-keepAliveTicker.C:
for storeID, stream := range s.streams {
Expand All @@ -113,14 +115,15 @@ func (s *heartbeatStreams) run() {
continue
}
storeAddress := store.GetAddress()
storeLabel := strconv.FormatUint(storeID, 10)
if err := stream.Send(keepAlive); err != nil {
log.Error("send keepalive message fail",
zap.Uint64("target-store-id", storeID),
zap.Error(err))
delete(s.streams, storeID)
regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "err").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "keepalive", "err").Inc()
} else {
regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "keepalive", "ok").Inc()
}
}
case <-s.ctx.Done():
Expand Down Expand Up @@ -161,8 +164,8 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear
}
}

func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc()
func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string, storeLabel string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc()

msg := &pdpb.RegionHeartbeatResponse{
Header: &pdpb.ResponseHeader{
Expand Down
8 changes: 4 additions & 4 deletions server/metrics.go
Expand Up @@ -95,7 +95,7 @@ var (
Subsystem: "scheduler",
Name: "region_heartbeat",
Help: "Counter of region hearbeat.",
}, []string{"address", "type", "status"})
}, []string{"address", "store", "type", "status"})

regionHeartbeatLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -104,23 +104,23 @@ var (
Name: "region_heartbeat_latency_seconds",
Help: "Bucketed histogram of latency (s) of receiving heartbeat.",
Buckets: prometheus.ExponentialBuckets(1, 2, 12),
}, []string{"address"})
}, []string{"address", "store"})

storeStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "store_status",
Help: "Store status for schedule",
}, []string{"namespace", "address", "type"})
}, []string{"namespace", "address", "store", "type"})

hotSpotStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "hotspot",
Name: "status",
Help: "Status of the hotspot.",
}, []string{"address", "type"})
}, []string{"address", "store", "type"})

tsoCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
8 changes: 6 additions & 2 deletions server/schedule/filters.go
Expand Up @@ -14,6 +14,8 @@
package schedule

import (
"fmt"

"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
Expand All @@ -33,9 +35,10 @@ type Filter interface {
// FilterSource checks if store can pass all Filters as source store.
func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeAddress := store.GetAddress()
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if filter.FilterSource(opt, store) {
filterCounter.WithLabelValues("filter-source", storeAddress, filter.Type()).Inc()
filterCounter.WithLabelValues("filter-source", storeAddress, storeID, filter.Type()).Inc()
return true
}
}
Expand All @@ -45,9 +48,10 @@ func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool {
// FilterTarget checks if store can pass all Filters as target store.
func FilterTarget(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeAddress := store.GetAddress()
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if filter.FilterTarget(opt, store) {
filterCounter.WithLabelValues("filter-target", storeAddress, filter.Type()).Inc()
filterCounter.WithLabelValues("filter-target", storeAddress, storeID, filter.Type()).Inc()
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/metrics.go
Expand Up @@ -47,7 +47,7 @@ var (
Subsystem: "schedule",
Name: "filter",
Help: "Counter of the filter",
}, []string{"action", "address", "type"})
}, []string{"action", "address", "store", "type"})

operatorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
59 changes: 36 additions & 23 deletions server/schedulers/balance_leader.go
Expand Up @@ -14,6 +14,8 @@
package schedulers

import (
"strconv"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
Expand Down Expand Up @@ -88,40 +90,45 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*schedule.
return nil
}

log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("max-store", source.GetID()), zap.Uint64("min-store", target.GetID()))
sourceID := source.GetID()
targetID := target.GetID()
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("max-store", sourceID), zap.Uint64("min-store", targetID))
sourceStoreLabel := strconv.FormatUint(sourceID, 10)
targetStoreLabel := strconv.FormatUint(targetID, 10)
sourceAddress := source.GetAddress()
targetAddress := target.GetAddress()
balanceLeaderCounter.WithLabelValues("high_score", sourceAddress).Inc()
balanceLeaderCounter.WithLabelValues("low_score", targetAddress).Inc()
balanceLeaderCounter.WithLabelValues("high_score", sourceAddress, sourceStoreLabel).Inc()
balanceLeaderCounter.WithLabelValues("low_score", targetAddress, targetStoreLabel).Inc()

opInfluence := l.opController.GetOpInfluence(cluster)
for i := 0; i < balanceLeaderRetryLimit; i++ {
if op := l.transferLeaderOut(source, cluster, opInfluence); op != nil {
balanceLeaderCounter.WithLabelValues("transfer_out", sourceAddress).Inc()
balanceLeaderCounter.WithLabelValues("transfer_out", sourceAddress, sourceStoreLabel).Inc()
return op
}
if op := l.transferLeaderIn(target, cluster, opInfluence); op != nil {
balanceLeaderCounter.WithLabelValues("transfer_in", targetAddress).Inc()
balanceLeaderCounter.WithLabelValues("transfer_in", targetAddress, targetStoreLabel).Inc()
return op
}
}

// If no operator can be created for the selected stores, ignore them for a while.
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", source.GetID()), zap.Uint64("target", target.GetID()))
balanceLeaderCounter.WithLabelValues("add_taint", sourceAddress).Inc()
l.taintStores.Put(source.GetID())
balanceLeaderCounter.WithLabelValues("add_taint", targetAddress).Inc()
l.taintStores.Put(target.GetID())
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", sourceID), zap.Uint64("target", targetID))
balanceLeaderCounter.WithLabelValues("add_taint", sourceAddress, sourceStoreLabel).Inc()
l.taintStores.Put(sourceID)
balanceLeaderCounter.WithLabelValues("add_taint", targetAddress, targetStoreLabel).Inc()
l.taintStores.Put(targetID)
return nil
}

// transferLeaderOut transfers leader from the source store.
// It randomly selects a health region from the source store, then picks
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(source *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator {
region := cluster.RandLeaderRegion(source.GetID(), core.HealthRegion())
sourceID := source.GetID()
region := cluster.RandLeaderRegion(sourceID, core.HealthRegion())
if region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", source.GetID()))
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", sourceID))
schedulerCounter.WithLabelValues(l.GetName(), "no_leader_region").Inc()
return nil
}
Expand All @@ -138,9 +145,10 @@ func (l *balanceLeaderScheduler) transferLeaderOut(source *core.StoreInfo, clust
// It randomly selects a health region from the target store, then picks
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(target *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator {
region := cluster.RandFollowerRegion(target.GetID(), core.HealthRegion())
targetID := target.GetID()
region := cluster.RandFollowerRegion(targetID, core.HealthRegion())
if region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", target.GetID()))
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", targetID))
schedulerCounter.WithLabelValues(l.GetName(), "no_follower_region").Inc()
return nil
}
Expand All @@ -158,28 +166,33 @@ func (l *balanceLeaderScheduler) transferLeaderIn(target *core.StoreInfo, cluste
// no new operator need to be created, otherwise create an operator that transfers
// the leader from the source store to the target store for the region.
func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, target *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator {
if cluster.IsRegionHot(region.GetID()) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", region.GetID()))
regionID := region.GetID()
if cluster.IsRegionHot(regionID) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", regionID))
schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc()
return nil
}

sourceID := source.GetID()
targetID := target.GetID()
if !shouldBalance(cluster, source, target, region, core.LeaderKind, opInfluence) {
log.Debug("skip balance region",
zap.String("scheduler", l.GetName()), zap.Uint64("region-id", region.GetID()), zap.Uint64("source-store", source.GetID()), zap.Uint64("target-store", target.GetID()),
zap.String("scheduler", l.GetName()), zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID),
zap.Int64("source-size", source.GetLeaderSize()), zap.Float64("source-score", source.LeaderScore(0)),
zap.Int64("source-influence", opInfluence.GetStoreInfluence(source.GetID()).ResourceSize(core.LeaderKind)),
zap.Int64("source-influence", opInfluence.GetStoreInfluence(sourceID).ResourceSize(core.LeaderKind)),
zap.Int64("target-size", target.GetLeaderSize()), zap.Float64("target-score", target.LeaderScore(0)),
zap.Int64("target-influence", opInfluence.GetStoreInfluence(target.GetID()).ResourceSize(core.LeaderKind)),
zap.Int64("target-influence", opInfluence.GetStoreInfluence(targetID).ResourceSize(core.LeaderKind)),
zap.Int64("average-region-size", cluster.GetAverageRegionSize()))
schedulerCounter.WithLabelValues(l.GetName(), "skip").Inc()
return nil
}

schedulerCounter.WithLabelValues(l.GetName(), "new_operator").Inc()
balanceLeaderCounter.WithLabelValues("move_leader", source.GetAddress()+"-out").Inc()
balanceLeaderCounter.WithLabelValues("move_leader", target.GetAddress()+"-in").Inc()
step := schedule.TransferLeader{FromStore: region.GetLeader().GetStoreId(), ToStore: target.GetID()}
op := schedule.NewOperator("balance-leader", region.GetID(), region.GetRegionEpoch(), schedule.OpBalance|schedule.OpLeader, step)
sourceLabel := strconv.FormatUint(sourceID, 10)
targetLabel := strconv.FormatUint(targetID, 10)
balanceLeaderCounter.WithLabelValues("move_leader", source.GetAddress()+"-out", sourceLabel).Inc()
balanceLeaderCounter.WithLabelValues("move_leader", target.GetAddress()+"-in", targetLabel).Inc()
step := schedule.TransferLeader{FromStore: region.GetLeader().GetStoreId(), ToStore: targetID}
op := schedule.NewOperator("balance-leader", regionID, region.GetRegionEpoch(), schedule.OpBalance|schedule.OpLeader, step)
return []*schedule.Operator{op}
}