Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simulator: impl snapshot stats in simulator #6579

Merged
merged 3 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

var (
pdAddr = flag.String("pd", "", "pd address")
pdAddr = flag.String("pd-endpoints", "", "pd address")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to change Readme.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configFile = flag.String("config", "conf/simconfig.toml", "config file")
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
caseName = flag.String("case", "", "case name")
serverLogLevel = flag.String("serverLog", "info", "pd server log level")
Expand Down
3 changes: 0 additions & 3 deletions tools/pd-simulator/simulator/cases/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cases
import (
"time"

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
Expand Down Expand Up @@ -55,8 +54,6 @@ func newRedundantBalanceRegion() *Case {
ID: IDAllocator.nextID(),
Peers: peers,
Leader: peers[0],
Size: 96 * units.MiB,
Keys: 960000,
})
}

Expand Down
3 changes: 3 additions & 0 deletions tools/pd-simulator/simulator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (sc *SimConfig) Adjust(meta *toml.MetaData) error {

return sc.ServerConfig.Adjust(meta, false)
}
func (sc *SimConfig) speed() uint64 {
return uint64(time.Second / sc.SimTickInterval.Duration)
}

// PDConfig saves some config which may be changed in PD.
type PDConfig struct {
Expand Down
19 changes: 17 additions & 2 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Node struct {
limiter *ratelimit.RateLimiter
sizeMutex sync.Mutex
hasExtraUsedSpace bool
snapStats []*pdpb.SnapshotStat
}

// NewNode returns a Node.
Expand Down Expand Up @@ -91,8 +92,8 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) {
cancel()
return nil, err
}
ratio := int64(time.Second) / config.SimTickInterval.Milliseconds()
speed := config.StoreIOMBPerSecond * units.MiB * ratio
ratio := config.speed()
speed := config.StoreIOMBPerSecond * units.MiB * int64(ratio)
return &Node{
Store: store,
stats: stats,
Expand All @@ -104,6 +105,7 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) {
limiter: ratelimit.NewRateLimiter(float64(speed), int(speed)),
tick: uint64(rand.Intn(storeHeartBeatPeriod)),
hasExtraUsedSpace: s.HasExtraUsedSpace,
snapStats: make([]*pdpb.SnapshotStat, 0),
}, nil
}

Expand Down Expand Up @@ -191,6 +193,10 @@ func (n *Node) storeHeartBeat() {
return
}
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
stats := make([]*pdpb.SnapshotStat, len(n.snapStats))
copy(stats, n.snapStats)
n.snapStats = n.snapStats[:0]
n.stats.SnapshotStats = stats
err := n.client.StoreHeartbeat(ctx, &n.stats.StoreStats)
if err != nil {
simutil.Logger.Info("report heartbeat error",
Expand Down Expand Up @@ -279,3 +285,12 @@ func (n *Node) decUsedSize(size uint64) {
defer n.sizeMutex.Unlock()
n.stats.ToCompactionSize += size
}

func (n *Node) registerSnapStats(generate, send, total uint64) {
stat := pdpb.SnapshotStat{
GenerateDurationSec: generate,
SendDurationSec: send,
TotalDurationSec: total,
}
n.snapStats = append(n.snapStats, &stat)
}
20 changes: 13 additions & 7 deletions tools/pd-simulator/simulator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,14 @@ func (a *addPeer) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *
pendingPeers := append(region.GetPendingPeers(), a.peer)
return region.Clone(core.WithAddPeer(a.peer), core.WithIncConfVer(), core.WithPendingPeers(pendingPeers)), false
}
speed := engine.storeConfig.speed()
// Step 2: Process Snapshot
if !processSnapshot(sendNode, a.sendingStat) {
if !processSnapshot(sendNode, a.sendingStat, speed) {
return nil, false
}
sendStoreID := fmt.Sprintf("store-%d", sendNode.Id)
snapshotCounter.WithLabelValues(sendStoreID, "send").Inc()
if !processSnapshot(recvNode, a.receivingStat) {
if !processSnapshot(recvNode, a.receivingStat, speed) {
return nil, false
}
recvStoreID := fmt.Sprintf("store-%d", recvNode.Id)
Expand Down Expand Up @@ -492,10 +493,11 @@ func removeDownPeers(region *core.RegionInfo, removePeer *metapb.Peer) core.Regi
}

type snapshotStat struct {
action snapAction
remainSize int64
status snapStatus
start time.Time
action snapAction
remainSize int64
status snapStatus
start time.Time
generateStart time.Time
}

func newSnapshotState(size int64, action snapAction) *snapshotStat {
Expand All @@ -510,7 +512,7 @@ func newSnapshotState(size int64, action snapAction) *snapshotStat {
}
}

func processSnapshot(n *Node, stat *snapshotStat) bool {
func processSnapshot(n *Node, stat *snapshotStat, speed uint64) bool {
if stat.status == finished {
return true
}
Expand All @@ -522,6 +524,7 @@ func processSnapshot(n *Node, stat *snapshotStat) bool {
return false
}
stat.status = running
stat.generateStart = time.Now()
// If the statement is true, it will start to send or Receive the snapshot.
if stat.action == generate {
n.stats.SendingSnapCount++
Expand All @@ -542,6 +545,9 @@ func processSnapshot(n *Node, stat *snapshotStat) bool {
}
if stat.status == running {
stat.status = finished
totalSec := uint64(time.Since(stat.start).Seconds()) * speed
generateSec := uint64(time.Since(stat.generateStart).Seconds()) * speed
n.registerSnapStats(generateSec, 0, totalSec)
if stat.action == generate {
n.stats.SendingSnapCount--
} else {
Expand Down