Skip to content

Commit

Permalink
add statistics for simulator
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx committed Aug 28, 2018
1 parent 47a950e commit dfae1a3
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 20 deletions.
1 change: 1 addition & 0 deletions cmd/simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ EXIT:
}

fmt.Printf("%s [%s] total iteration: %d, time cost: %v\n", simResult, confName, driver.TickCount(), time.Since(start))
driver.Print()

if simResult != "OK" {
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/faketikv/cases/add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func newAddNodes() *Conf {
if regionCount > 390 || regionCount < 360 {
res = false
}

}

simutil.Logger.Infof("leader counts: %v", leaderCounts)
simutil.Logger.Infof("region counts: %v", regionCounts)
return res
Expand Down
3 changes: 2 additions & 1 deletion pkg/faketikv/cases/add_nodes_dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func newAddNodesDynamic() *Conf {
Status: metapb.StoreState_Up,
Capacity: 1 * TB,
Available: 900 * GB,
Version: "2.1.0",
})
}

Expand Down Expand Up @@ -80,8 +81,8 @@ func newAddNodesDynamic() *Conf {
if regionCount > 390 || regionCount < 360 {
res = false
}

}

simutil.Logger.Infof("leader counts: %v", leaderCounts)
simutil.Logger.Infof("region counts: %v", regionCounts)
return res
Expand Down
2 changes: 1 addition & 1 deletion pkg/faketikv/cases/delete_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func newDeleteNodes() *Conf {
if regionCount > 443 || regionCount < 413 {
res = false
}

}

simutil.Logger.Infof("leader counts: %v", leaderCounts)
simutil.Logger.Infof("region counts: %v", regionCounts)
return res
Expand Down
8 changes: 4 additions & 4 deletions pkg/faketikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func NewClient(pdAddr string, tag string) (Client, <-chan *pdpb.RegionHeartbeatR
simutil.Logger.Infof("[%s][pd] create pd client with endpoints %v", tag, pdAddr)
ctx, cancel := context.WithCancel(context.Background())
c := &client{
url: pdAddr,
url: pdAddr,
reportRegionHeartbeatCh: make(chan *core.RegionInfo, 1),
receiveRegionHeartbeatCh: make(chan *pdpb.RegionHeartbeatResponse, 1),
ctx: ctx,
cancel: cancel,
tag: tag,
ctx: ctx,
cancel: cancel,
tag: tag,
}
cc, err := c.createConn()
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/faketikv/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (d *Driver) Check() bool {
return d.conf.Checker(d.raftEngine.regionsInfo)
}

// Print prints the statistics of schedulers.
func (d *Driver) Print() {
d.raftEngine.schedulerStats.Print()
}

// Stop stops all nodes.
func (d *Driver) Stop() {
for _, n := range d.clusterInfo.Nodes {
Expand Down
14 changes: 7 additions & 7 deletions pkg/faketikv/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func NewNode(s *cases.Store, pdAddr string) (*Node, error) {
return nil, err
}
return &Node{
Store: store,
stats: stats,
client: client,
ctx: ctx,
cancel: cancel,
tasks: make(map[uint64]Task),
state: Down,
Store: store,
stats: stats,
client: client,
ctx: ctx,
cancel: cancel,
tasks: make(map[uint64]Task),
state: Down,
receiveRegionHeartbeatCh: receiveRegionHeartbeatCh,
// FIXME: This value should be adjusted to a appropriate one.
ioRate: 40 * 1000 * 1000,
Expand Down
14 changes: 8 additions & 6 deletions pkg/faketikv/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ import (
// RaftEngine records all raft infomations.
type RaftEngine struct {
sync.RWMutex
regionsInfo *core.RegionsInfo
conn *Conn
regionchange map[uint64][]uint64
regionsInfo *core.RegionsInfo
conn *Conn
regionchange map[uint64][]uint64
schedulerStats *schedulerStatistics
}

// NewRaftEngine creates the initialized raft with the configuration.
func NewRaftEngine(conf *cases.Conf, conn *Conn) (*RaftEngine, error) {
r := &RaftEngine{
regionsInfo: core.NewRegionsInfo(),
conn: conn,
regionchange: make(map[uint64][]uint64),
regionsInfo: core.NewRegionsInfo(),
conn: conn,
regionchange: make(map[uint64][]uint64),
schedulerStats: newSchedulerStatistics(),
}

splitKeys := generateKeys(len(conf.Regions) - 1)
Expand Down
144 changes: 144 additions & 0 deletions pkg/faketikv/statistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package faketikv

import (
"fmt"
"math"
)

type taskStatistics struct {
addPeer map[uint64]int
removePeer map[uint64]int
addLearner map[uint64]int
promoteLeaner map[uint64]int
transferLeader map[uint64]map[uint64]int
mergeRegion int
}

func newTaskStatistics() *taskStatistics {
return &taskStatistics{
addPeer: make(map[uint64]int),
removePeer: make(map[uint64]int),
addLearner: make(map[uint64]int),
promoteLeaner: make(map[uint64]int),
transferLeader: make(map[uint64]map[uint64]int),
}
}

func (t *taskStatistics) getStatistics() map[string]int {
stats := make(map[string]int)
addpeer := getSum(t.addPeer)
removePeer := getSum(t.removePeer)
addLearner := getSum(t.addLearner)
promoteLeaner := getSum(t.promoteLeaner)

var transferLeader int
for _, to := range t.transferLeader {
for _, v := range to {
transferLeader += v
}
}

stats["Add Peer (task)"] = addpeer
stats["Remove Peer (task)"] = removePeer
stats["Add Learner (task)"] = addLearner
stats["Promote Learner (task)"] = promoteLeaner
stats["Transfer Leader (task)"] = transferLeader
stats["Merge Region (task)"] = t.mergeRegion

return stats
}

type snapshotStatistics struct {
receive map[uint64]int
send map[uint64]int
}

func newSnapshotStatistics() *snapshotStatistics {
return &snapshotStatistics{
receive: make(map[uint64]int),
send: make(map[uint64]int),
}
}

type schedulerStatistics struct {
taskStats *taskStatistics
snapshotStats *snapshotStatistics
}

func newSchedulerStatistics() *schedulerStatistics {
return &schedulerStatistics{
taskStats: newTaskStatistics(),
snapshotStats: newSnapshotStatistics(),
}
}

func (s *snapshotStatistics) getStatistics() map[string]int {
maxSend := getMax(s.send)
maxReceive := getMax(s.receive)
minSend := getMin(s.send)
minReceive := getMin(s.receive)

stats := make(map[string]int)
stats["Send Maximum (snapshot)"] = maxSend
stats["Receive Maximum (snapshot)"] = maxReceive
if minSend != math.MaxInt32 {
stats["Send Minimum (snapshot)"] = minSend
}
if minReceive != math.MaxInt32 {
stats["Receive Minimum (snapshot)"] = minReceive
}

return stats
}

func (s *schedulerStatistics) Print() {
task := s.taskStats.getStatistics()
snap := s.snapshotStats.getStatistics()
for t, count := range task {
fmt.Println(t, count)
}
for s, count := range snap {
fmt.Println(s, count)
}
}

func getMax(m map[uint64]int) int {
var max int
for _, v := range m {
if v > max {
max = v
}
}
return max
}

func getMin(m map[uint64]int) int {
min := math.MaxInt32
for _, v := range m {
if v < min {
min = v
}
}
return min
}

func getSum(m map[uint64]int) int {
var sum int
for _, v := range m {
sum += v
}
return sum
}
17 changes: 17 additions & 0 deletions pkg/faketikv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (m *mergeRegion) Step(r *RaftEngine) {

r.SetRegion(targetRegion)
r.recordRegionChange(targetRegion)
r.schedulerStats.taskStats.mergeRegion++
m.finished = true
}

Expand Down Expand Up @@ -179,6 +180,16 @@ func (t *transferLeader) Step(r *RaftEngine) {
t.finished = true
r.SetRegion(region)
r.recordRegionChange(region)
fromPeerID := t.fromPeer.GetId()
toPeerID := t.peer.GetId()
_, ok := r.schedulerStats.taskStats.transferLeader[fromPeerID]
if ok {
r.schedulerStats.taskStats.transferLeader[fromPeerID][toPeerID]++
} else {
m := make(map[uint64]int)
m[toPeerID]++
r.schedulerStats.taskStats.transferLeader[fromPeerID] = m
}
}

func (t *transferLeader) RegionID() uint64 {
Expand Down Expand Up @@ -225,6 +236,7 @@ func (a *addPeer) Step(r *RaftEngine) {
if !processSnapshot(sendNode, a.sendingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.send[sendNode.Id]++

recvNode := r.conn.Nodes[a.peer.GetStoreId()]
if recvNode == nil {
Expand All @@ -235,13 +247,16 @@ func (a *addPeer) Step(r *RaftEngine) {
if !processSnapshot(recvNode, a.receivingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.receive[recvNode.Id]++

a.size -= a.speed
if a.size < 0 {
if region.GetPeer(a.peer.GetId()) == nil {
region.AddPeer(a.peer)
r.schedulerStats.taskStats.addPeer[region.GetId()]++
} else {
region.GetPeer(a.peer.GetId()).IsLearner = false
r.schedulerStats.taskStats.promoteLeaner[region.GetId()]++
}
region.RegionEpoch.ConfVer++
r.SetRegion(region)
Expand Down Expand Up @@ -300,6 +315,7 @@ func (a *removePeer) Step(r *RaftEngine) {
}
r.SetRegion(region)
r.recordRegionChange(region)
r.schedulerStats.taskStats.removePeer[region.GetId()]++
if r.conn.Nodes[storeID] == nil {
a.finished = true
return
Expand Down Expand Up @@ -351,6 +367,7 @@ func (a *addLearner) Step(r *RaftEngine) {
region.RegionEpoch.ConfVer++
r.SetRegion(region)
r.recordRegionChange(region)
r.schedulerStats.taskStats.addLearner[region.GetId()]++
}
a.finished = true
}
Expand Down

0 comments on commit dfae1a3

Please sign in to comment.