Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add conf change test and fix bugs #75

Merged
merged 2 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 73 additions & 41 deletions kv/test_raftstore/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/Connor1996/badger"
"github.com/ngaut/log"
"github.com/pingcap-incubator/tinykv/kv/config"
"github.com/pingcap-incubator/tinykv/kv/pd"
"github.com/pingcap-incubator/tinykv/kv/tikv/dbreader"
"github.com/pingcap-incubator/tinykv/kv/tikv/raftstore"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
Expand All @@ -32,7 +31,7 @@ type Simulator interface {
}

type Cluster struct {
pdClient pd.Client
pdClient *MockPDClient
count int
engines map[uint64]*engine_util.Engines
snapPaths map[uint64]string
Expand All @@ -41,7 +40,7 @@ type Cluster struct {
cfg *config.Config
}

func NewCluster(count int, pdClient pd.Client, simulator Simulator, cfg *config.Config) *Cluster {
func NewCluster(count int, pdClient *MockPDClient, simulator Simulator, cfg *config.Config) *Cluster {
return &Cluster{
count: count,
pdClient: pdClient,
Expand Down Expand Up @@ -100,7 +99,7 @@ func (c *Cluster) Start() {

for storeID, engine := range c.engines {
peer := NewPeer(storeID, storeID)
firstRegion.Peers = append(firstRegion.Peers, &peer)
firstRegion.Peers = append(firstRegion.Peers, peer)
err := raftstore.BootstrapStore(engine, clusterID, storeID)
if err != nil {
panic(err)
Expand Down Expand Up @@ -149,6 +148,34 @@ func (c *Cluster) Shutdown() {
}
}

func (c *Cluster) AddFilter(filter Filter) {
c.simulator.AddFilter(filter)
}

func (c *Cluster) ClearFilters() {
c.simulator.ClearFilters()
}

func (c *Cluster) StopServer(storeID uint64) {
c.simulator.StopStore(storeID)
}

func (c *Cluster) StartServer(storeID uint64) {
engine := c.engines[storeID]
err := c.simulator.RunStore(c.cfg, engine, context.TODO())
if err != nil {
panic(err)
}
}

func (c *Cluster) AllocPeer(storeID uint64) *metapb.Peer {
id, err := c.pdClient.AllocID(context.TODO())
if err != nil {
panic(err)
}
return NewPeer(storeID, id)
}

func (c *Cluster) Request(key []byte, reqs []*raft_cmdpb.Request, timeout time.Duration) (*raft_cmdpb.RaftCmdResponse, *badger.Txn) {
startTime := time.Now()
for i := 0; i < 10 || time.Now().Sub(startTime) < timeout; i++ {
Expand All @@ -161,7 +188,7 @@ func (c *Cluster) Request(key []byte, reqs []*raft_cmdpb.Request, timeout time.D
SleepMS(100)
continue
}
if resp.Header.Error != nil && resp.Header.Error.GetEpochNotMatch() != nil {
if resp.Header.Error != nil {
SleepMS(100)
continue
}
Expand Down Expand Up @@ -232,31 +259,6 @@ func (c *Cluster) LeaderOfRegion(regionID uint64) *metapb.Peer {
return nil
}

func (c *Cluster) QueryLeader(storeID, regionID uint64, timeout time.Duration) *metapb.Peer {
// To get region leader, we don't care real peer id, so use 0 instead.
peer := NewPeer(storeID, 0)
findLeader := NewStatusRequest(regionID, &peer, NewRegionLeaderCmd())
resp, _ := c.CallCommand(findLeader, timeout)
if resp == nil {
panic(fmt.Sprintf("fail to get leader of region %d on store %d", regionID, storeID))
}
regionLeader := resp.StatusResponse.RegionLeader
if regionLeader != nil && c.ValidLeaderID(regionID, regionLeader.Leader.StoreId) {
return regionLeader.Leader
}
return nil
}

func (c *Cluster) ValidLeaderID(regionID, leaderID uint64) bool {
storeIds := c.GetStoreIdsOfRegion(regionID)
for _, storeID := range storeIds {
if leaderID == storeID {
return true
}
}
return false
}

func (c *Cluster) GetRegion(key []byte) *metapb.Region {
for i := 0; i < 100; i++ {
region, _, _ := c.pdClient.GetRegion(context.TODO(), key)
Expand All @@ -270,6 +272,10 @@ func (c *Cluster) GetRegion(key []byte) *metapb.Region {
panic(fmt.Sprintf("find no region for %s", hex.EncodeToString(key)))
}

func (c *Cluster) GetRandomRegion() *metapb.Region {
return c.pdClient.getRandomRegion()
}

func (c *Cluster) GetStoreIdsOfRegion(regionID uint64) []uint64 {
region, _, err := c.pdClient.GetRegionByID(context.TODO(), regionID)
if err != nil {
Expand Down Expand Up @@ -410,22 +416,48 @@ func (c *Cluster) MustTransferLeader(regionID uint64, leader *metapb.Peer) {
}
}

func (c *Cluster) AddFilter(filter Filter) {
c.simulator.AddFilter(filter)
func (c *Cluster) MustAddPeer(regionID uint64, peer *metapb.Peer) {
c.pdClient.AddPeer(regionID, peer)
c.MustHavePeer(regionID, peer)
}

func (c *Cluster) ClearFilters() {
c.simulator.ClearFilters()
func (c *Cluster) MustRemovePeer(regionID uint64, peer *metapb.Peer) {
c.pdClient.RemovePeer(regionID, peer)
c.MustNonePeer(regionID, peer)
}

func (c *Cluster) StopServer(storeID uint64) {
c.simulator.StopStore(storeID)
func (c *Cluster) MustHavePeer(regionID uint64, peer *metapb.Peer) {
for i := 0; i < 500; i++ {
region, _, err := c.pdClient.GetRegionByID(context.TODO(), regionID)
if err != nil {
panic(err)
}
if region != nil {
if p := FindPeer(region, peer.GetStoreId()); p != nil {
if p.GetId() == peer.GetId() {
return
}
}
}
SleepMS(10)
}
}

func (c *Cluster) StartServer(storeID uint64) {
engine := c.engines[storeID]
err := c.simulator.RunStore(c.cfg, engine, context.TODO())
if err != nil {
panic(err)
func (c *Cluster) MustNonePeer(regionID uint64, peer *metapb.Peer) {
for i := 0; i < 500; i++ {
region, _, err := c.pdClient.GetRegionByID(context.TODO(), regionID)
if err != nil {
panic(err)
}
if region != nil {
if p := FindPeer(region, peer.GetStoreId()); p != nil {
if p.GetId() != peer.GetId() {
return
}
} else {
return
}
}
SleepMS(10)
}
}
2 changes: 1 addition & 1 deletion kv/test_raftstore/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (c *NodeSimulator) CallCommandOnStore(storeID uint64, request *raft_cmdpb.R
cb := message.NewCallback()
err := router.SendRaftCommand(request, cb)
if err != nil {
panic(err)
return nil, nil
}

resp := cb.WaitRespWithTimeout(timeout)
Expand Down
71 changes: 40 additions & 31 deletions kv/test_raftstore/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test_raftstore
import (
"bytes"
"context"
"fmt"
"sync"

"github.com/google/btree"
Expand Down Expand Up @@ -46,16 +47,16 @@ type Operator struct {
}

type OpAddPeer struct {
peer metapb.Peer
peer *metapb.Peer
pending bool
}

type OpRemovePeer struct {
peer metapb.Peer
peer *metapb.Peer
}

type OpTransferLeader struct {
peer metapb.Peer
peer *metapb.Peer
}

type Store struct {
Expand Down Expand Up @@ -89,7 +90,7 @@ type MockPDClient struct {
bootstrapped bool
}

func NewMockPDClient(clusterID uint64) *MockPDClient {
func NewMockPDClient(clusterID uint64, baseID uint64) *MockPDClient {
return &MockPDClient{
clusterID: clusterID,
meta: metapb.Cluster{
Expand All @@ -98,6 +99,7 @@ func NewMockPDClient(clusterID uint64) *MockPDClient {
stores: make(map[uint64]*Store),
regionsRange: btree.New(2),
regionsKey: make(map[uint64][]byte),
baseID: baseID,
operators: make(map[uint64]*Operator),
leaders: make(map[uint64]*metapb.Peer),
pendingPeers: make(map[uint64]*metapb.Peer),
Expand Down Expand Up @@ -348,9 +350,10 @@ func (m *MockPDClient) handleHeartbeatConfVersion(region *metapb.Region) error {
// So scheduler and TinyKV can't have same peer count and can only have
// only one different peer.
if searchRegionPeerLen > regionPeerLen {
if regionPeerLen-searchRegionPeerLen != 1 {
if searchRegionPeerLen-regionPeerLen != 1 {
panic("should only one conf change")
}
fmt.Println(searchRegion, region)
if len(GetDiffPeers(searchRegion, region)) != 1 {
panic("should only one different peer")
}
Expand Down Expand Up @@ -426,19 +429,19 @@ func (m *MockPDClient) makeRegionHeartbeatResponse(op *Operator, resp *pdpb.Regi
if !add.pending {
resp.ChangePeer = &pdpb.ChangePeer{
ChangeType: eraftpb.ConfChangeType_AddNode,
Peer: &add.peer,
Peer: add.peer,
}
}
case OperatorTypeRemovePeer:
remove := op.Data.(OpRemovePeer)
resp.ChangePeer = &pdpb.ChangePeer{
ChangeType: eraftpb.ConfChangeType_RemoveNode,
Peer: &remove.peer,
Peer: remove.peer,
}
case OperatorTypeTransferLeader:
transfer := op.Data.(OpTransferLeader)
resp.TransferLeader = &pdpb.TransferLeader{
Peer: &transfer.peer,
Peer: transfer.peer,
}
}
}
Expand Down Expand Up @@ -488,34 +491,45 @@ func (m *MockPDClient) removeRegionLocked(region *metapb.Region) {
}

// Extra API for tests
func (m *MockPDClient) AddPeer(regionID uint64, peer metapb.Peer) {
func (m *MockPDClient) AddPeer(regionID uint64, peer *metapb.Peer) {
m.scheduleOperator(regionID, &Operator{
Type: OperatorTypeAddPeer,
Data: &OpAddPeer{
Data: OpAddPeer{
peer: peer,
pending: false,
},
})
}

func (m *MockPDClient) RemovePeer(regionID uint64, peer metapb.Peer) {
func (m *MockPDClient) RemovePeer(regionID uint64, peer *metapb.Peer) {
m.scheduleOperator(regionID, &Operator{
Type: OperatorTypeRemovePeer,
Data: &OpRemovePeer{
Data: OpRemovePeer{
peer: peer,
},
})
}

func (m *MockPDClient) TransferLeader(regionID uint64, peer metapb.Peer) {
func (m *MockPDClient) TransferLeader(regionID uint64, peer *metapb.Peer) {
m.scheduleOperator(regionID, &Operator{
Type: OperatorTypeTransferLeader,
Data: &OpTransferLeader{
Data: OpTransferLeader{
peer: peer,
},
})
}

func (m *MockPDClient) getRandomRegion() *metapb.Region {
m.RLock()
defer m.RUnlock()

for regionID := range m.leaders {
region, _, _ := m.getRegionByIDLocked(regionID)
return region
}
return nil
}

func (m *MockPDClient) scheduleOperator(regionID uint64, op *Operator) {
m.Lock()
defer m.Unlock()
Expand All @@ -528,32 +542,27 @@ func MustSamePeers(left *metapb.Region, right *metapb.Region) {
panic("unmatched peers length")
}
for _, p := range left.GetPeers() {
found := false
for _, p1 := range right.GetPeers() {
if p.GetStoreId() == p1.GetStoreId() {
found = true
break
}
}
if !found {
if FindPeer(right, p.GetStoreId()) == nil {
panic("not found the peer")
}
}
}

func GetDiffPeers(left *metapb.Region, right *metapb.Region) []*metapb.Peer {
peers := make([]*metapb.Peer, 1)
peers := make([]*metapb.Peer, 0, 1)
for _, p := range left.GetPeers() {
found := false
for _, p1 := range right.GetPeers() {
if p.GetStoreId() == p1.GetStoreId() {
found = true
break
}
}
if !found {
if FindPeer(right, p.GetStoreId()) == nil {
peers = append(peers, p)
}
}
return peers
}

func FindPeer(region *metapb.Region, storeID uint64) *metapb.Peer {
for _, p := range region.GetPeers() {
if p.GetStoreId() == storeID {
return p
}
}
return nil
}
Loading