Skip to content

Commit

Permalink
simulator: impl snapshot stats in simulator (#6579)
Browse files Browse the repository at this point in the history
close #6580

tiup playground v7.1.0 --host 10.200.14.54 --pd.binpath=./bin/pd-server  --kv.binpath ./bin/pd-simulator --kv=1 --db=0 --kv.config=./tikv.config

tikv.config
case-name="redundant-balance-region"
sim-tick-interval="100ms"
store-io-per-second=100

[coprocessor]
region-split-size = "10GB"
[raftstore]
capacity= "10TB"

Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Jun 21, 2023
1 parent 58d9208 commit d9d9184
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 13 deletions.
2 changes: 1 addition & 1 deletion tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

var (
pdAddr = flag.String("pd", "", "pd address")
pdAddr = flag.String("pd-endpoints", "", "pd address")
configFile = flag.String("config", "conf/simconfig.toml", "config file")
caseName = flag.String("case", "", "case name")
serverLogLevel = flag.String("serverLog", "info", "pd server log level")
Expand Down
3 changes: 0 additions & 3 deletions tools/pd-simulator/simulator/cases/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cases
import (
"time"

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
Expand Down Expand Up @@ -55,8 +54,6 @@ func newRedundantBalanceRegion() *Case {
ID: IDAllocator.nextID(),
Peers: peers,
Leader: peers[0],
Size: 96 * units.MiB,
Keys: 960000,
})
}

Expand Down
3 changes: 3 additions & 0 deletions tools/pd-simulator/simulator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (sc *SimConfig) Adjust(meta *toml.MetaData) error {

return sc.ServerConfig.Adjust(meta, false)
}
func (sc *SimConfig) speed() uint64 {
return uint64(time.Second / sc.SimTickInterval.Duration)
}

// PDConfig saves some config which may be changed in PD.
type PDConfig struct {
Expand Down
19 changes: 17 additions & 2 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Node struct {
limiter *ratelimit.RateLimiter
sizeMutex sync.Mutex
hasExtraUsedSpace bool
snapStats []*pdpb.SnapshotStat
}

// NewNode returns a Node.
Expand Down Expand Up @@ -91,8 +92,8 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) {
cancel()
return nil, err
}
ratio := int64(time.Second) / config.SimTickInterval.Milliseconds()
speed := config.StoreIOMBPerSecond * units.MiB * ratio
ratio := config.speed()
speed := config.StoreIOMBPerSecond * units.MiB * int64(ratio)
return &Node{
Store: store,
stats: stats,
Expand All @@ -104,6 +105,7 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) {
limiter: ratelimit.NewRateLimiter(float64(speed), int(speed)),
tick: uint64(rand.Intn(storeHeartBeatPeriod)),
hasExtraUsedSpace: s.HasExtraUsedSpace,
snapStats: make([]*pdpb.SnapshotStat, 0),
}, nil
}

Expand Down Expand Up @@ -191,6 +193,10 @@ func (n *Node) storeHeartBeat() {
return
}
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
stats := make([]*pdpb.SnapshotStat, len(n.snapStats))
copy(stats, n.snapStats)
n.snapStats = n.snapStats[:0]
n.stats.SnapshotStats = stats
err := n.client.StoreHeartbeat(ctx, &n.stats.StoreStats)
if err != nil {
simutil.Logger.Info("report heartbeat error",
Expand Down Expand Up @@ -279,3 +285,12 @@ func (n *Node) decUsedSize(size uint64) {
defer n.sizeMutex.Unlock()
n.stats.ToCompactionSize += size
}

func (n *Node) registerSnapStats(generate, send, total uint64) {
stat := pdpb.SnapshotStat{
GenerateDurationSec: generate,
SendDurationSec: send,
TotalDurationSec: total,
}
n.snapStats = append(n.snapStats, &stat)
}
20 changes: 13 additions & 7 deletions tools/pd-simulator/simulator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,14 @@ func (a *addPeer) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *
pendingPeers := append(region.GetPendingPeers(), a.peer)
return region.Clone(core.WithAddPeer(a.peer), core.WithIncConfVer(), core.WithPendingPeers(pendingPeers)), false
}
speed := engine.storeConfig.speed()
// Step 2: Process Snapshot
if !processSnapshot(sendNode, a.sendingStat) {
if !processSnapshot(sendNode, a.sendingStat, speed) {
return nil, false
}
sendStoreID := fmt.Sprintf("store-%d", sendNode.Id)
snapshotCounter.WithLabelValues(sendStoreID, "send").Inc()
if !processSnapshot(recvNode, a.receivingStat) {
if !processSnapshot(recvNode, a.receivingStat, speed) {
return nil, false
}
recvStoreID := fmt.Sprintf("store-%d", recvNode.Id)
Expand Down Expand Up @@ -492,10 +493,11 @@ func removeDownPeers(region *core.RegionInfo, removePeer *metapb.Peer) core.Regi
}

type snapshotStat struct {
action snapAction
remainSize int64
status snapStatus
start time.Time
action snapAction
remainSize int64
status snapStatus
start time.Time
generateStart time.Time
}

func newSnapshotState(size int64, action snapAction) *snapshotStat {
Expand All @@ -510,7 +512,7 @@ func newSnapshotState(size int64, action snapAction) *snapshotStat {
}
}

func processSnapshot(n *Node, stat *snapshotStat) bool {
func processSnapshot(n *Node, stat *snapshotStat, speed uint64) bool {
if stat.status == finished {
return true
}
Expand All @@ -522,6 +524,7 @@ func processSnapshot(n *Node, stat *snapshotStat) bool {
return false
}
stat.status = running
stat.generateStart = time.Now()
// If the statement is true, it will start to send or Receive the snapshot.
if stat.action == generate {
n.stats.SendingSnapCount++
Expand All @@ -542,6 +545,9 @@ func processSnapshot(n *Node, stat *snapshotStat) bool {
}
if stat.status == running {
stat.status = finished
totalSec := uint64(time.Since(stat.start).Seconds()) * speed
generateSec := uint64(time.Since(stat.generateStart).Seconds()) * speed
n.registerSnapStats(generateSec, 0, totalSec)
if stat.action == generate {
n.stats.SendingSnapCount--
} else {
Expand Down

0 comments on commit d9d9184

Please sign in to comment.