Skip to content

Commit

Permalink
Merge branch 'check-clean-expired' into 'master'
Browse files Browse the repository at this point in the history
fix: avoid wait future proposal response twice on chan

See merge request !18
  • Loading branch information
absolute8511 committed Jan 18, 2021
2 parents 2e28494 + dd587a2 commit a7229fd
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 9 deletions.
2 changes: 1 addition & 1 deletion cluster/register_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestRegisterWatchKeepAliveTimeoutInDeadConn(t *testing.T) {
close(stopC)
wg.Wait()
t.Logf("changed: %v , %v", atomic.LoadInt32(&pdLeaderChanged), atomic.LoadInt32(&dataNodeChanged))
assert.True(t, atomic.LoadInt32(&dataNodeChanged) >= 2)
assert.True(t, atomic.LoadInt32(&dataNodeChanged) >= 1)
assert.True(t, atomic.LoadInt32(&dataNodeChanged) <= 4)
assert.True(t, atomic.LoadInt32(&pdLeaderChanged) <= 8)
assert.True(t, atomic.LoadInt32(&pdLeaderChanged) >= 2)
Expand Down
10 changes: 6 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,10 +704,11 @@ func (nd *KVNode) ProposeRawAsyncFromSyncer(buffer []byte, reqList *BatchInterna
err = ErrProposalCanceled
}
nd.w.Trigger(reqList.ReqId, err)
<-wr.WaitC()
rsp = err
case <-wr.WaitC():
// WaitC should be called only once
rsp = wr.GetResult()
}
rsp = wr.GetResult()
cancel()
if err, ok = rsp.(error); ok {
rsp = nil
Expand Down Expand Up @@ -765,6 +766,7 @@ type FutureRsp struct {
rspHandle func(interface{}) (interface{}, error)
}

// note: should not call twice on wait
func (fr *FutureRsp) WaitRsp() (interface{}, error) {
rsp, err := fr.waitFunc()
if err != nil {
Expand Down Expand Up @@ -805,10 +807,10 @@ func (nd *KVNode) queueRequest(start time.Time, req InternalRaftRequest) (*Futur
err = ErrProposalCanceled
}
nd.w.Trigger(req.Header.ID, err)
<-wrh.wr.WaitC()
rsp = err
case <-wrh.wr.WaitC():
rsp = wrh.wr.GetResult()
}
rsp = wrh.wr.GetResult()
cancel()

defer wrh.release(err == nil)
Expand Down
47 changes: 47 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package node

import (
"context"
"errors"
"flag"
"os"
"testing"
Expand All @@ -9,6 +11,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/youzan/ZanRedisDB/common"
"github.com/youzan/ZanRedisDB/engine"
"github.com/youzan/ZanRedisDB/pkg/idutil"
"github.com/youzan/ZanRedisDB/pkg/wait"
"github.com/youzan/ZanRedisDB/rockredis"
)

Expand Down Expand Up @@ -52,6 +56,49 @@ func TestWaitReqPools(t *testing.T) {
wr.release(true)
}

func TestProposeWaitMoreThanOnceTrigger(t *testing.T) {
var reqList BatchInternalRaftRequest
w := wait.New()
reqIDGen := idutil.NewGenerator(uint16(1), time.Now())
reqList.ReqId = reqIDGen.Next()
// must register before propose
wr := w.Register(reqList.ReqId)
ctx, cancel := context.WithTimeout(context.Background(), proposeTimeout)

var futureRsp FutureRsp
futureRsp.waitFunc = func() (interface{}, error) {
var rsp interface{}
var ok bool
var err error
// will always return a response, timed out or get a error
select {
case <-ctx.Done():
err = ctx.Err()
if err == context.Canceled {
// proposal canceled can be caused by leader transfer or no leader
err = ErrProposalCanceled
}
w.Trigger(reqList.ReqId, err)
rsp = err
case <-wr.WaitC():
rsp = wr.GetResult()
}
cancel()
if err, ok = rsp.(error); ok {
rsp = nil
//nd.rn.Infof("request return error: %v, %v", req.String(), err.Error())
} else {
err = nil
}
return rsp, err
}
_, err := futureRsp.WaitRsp()
assert.Equal(t, context.DeadlineExceeded, err)
w.Trigger(reqList.ReqId, errors.New("unexpected"))
_, err = futureRsp.WaitRsp()
assert.Equal(t, context.DeadlineExceeded, err)
}

func BenchmarkBatchRequestMarshal(b *testing.B) {
br := &BatchInternalRaftRequest{}
br.ReqId = 1
Expand Down
7 changes: 6 additions & 1 deletion node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (rc *raftNode) restartAsStandaloneNode(cfg *raft.Config, snapshot *raftpb.S
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
// - ConfChangeAddLearnerNode, in which the contained ID will be added into the set.
func getIDsAndGroups(snap *raftpb.Snapshot, ents []raftpb.Entry) ([]uint64, map[uint64]raftpb.Group) {
ids := make(map[uint64]bool)
grps := make(map[uint64]raftpb.Group)
Expand All @@ -528,12 +529,16 @@ func getIDsAndGroups(snap *raftpb.Snapshot, ents []raftpb.Entry) ([]uint64, map[
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
switch cc.Type {
case raftpb.ConfChangeAddLearnerNode:
// https://github.com/etcd-io/etcd/pull/12288
ids[cc.ReplicaID] = true
grps[cc.NodeGroup.RaftReplicaId] = cc.NodeGroup
case raftpb.ConfChangeAddNode:
ids[cc.ReplicaID] = true
grps[cc.NodeGroup.RaftReplicaId] = cc.NodeGroup
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.ReplicaID)
delete(grps, cc.ReplicaID)
delete(grps, cc.NodeGroup.RaftReplicaId)
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
Expand Down
115 changes: 113 additions & 2 deletions pdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1814,8 +1814,119 @@ func TestClusterDecrReplicaOneByOne(t *testing.T) {
}
}

func TestRestartWithForceAlone(t *testing.T) {
// TODO: test force restart with alone
func TestRestartWithForceAloneWithLearnerAndRemovedNode(t *testing.T) {
// test force restart with alone
// test force start as alone for normal node and for learner node
// and test force restart as alone for have removed node before
node.EnableForTest()
ensureClusterReady(t, 3)

time.Sleep(time.Second)
ns := "test_force_restart_alone_after_add_learner_remove_node"
partNum := 1

pduri := "http://127.0.0.1:" + pdHttpPort
ensureDataNodesReady(t, pduri, len(gkvList))
enableAutoBalance(t, pduri, true)
ensureNamespace(t, pduri, ns, partNum, 3)
defer ensureDeleteNamespace(t, pduri, ns)

dnw, leaderNode := waitForLeader(t, ns, 0)
leader := dnw.s
assert.NotNil(t, leader)
leaderNode.Node.OptimizeDB("")

node.SetSyncerOnly(true)
defer node.SetSyncerOnly(false)
remotePD, remoteSrvs, remoteTmpDir := startRemoteSyncTestCluster(t, 1)
defer func() {
for _, kv := range remoteSrvs {
kv.s.Stop()
}
if remotePD != nil {
remotePD.Stop()
}
if strings.Contains(remoteTmpDir, "rocksdb-test") {
t.Logf("removing: %v", remoteTmpDir)
os.RemoveAll(remoteTmpDir)
}
}()
pduri = "http://127.0.0.1:" + pdRemoteHttpPort
for _, lrnSrv := range remoteSrvs {
lrnSrv.s.GetCoord().UpdateSyncerWriteOnly(true)
}
ensureDataNodesReady(t, pduri, len(remoteSrvs))
enableAutoBalance(t, pduri, true)
ensureNamespace(t, pduri, ns, partNum, 1)
defer ensureDeleteNamespace(t, pduri, ns)

learnerPD, learnerSrvs, tmpDir := startTestClusterForLearner(t, 1)
defer func() {
for _, kv := range learnerSrvs {
kv.s.Stop()
}
if learnerPD != nil {
learnerPD.Stop()
}
if strings.Contains(tmpDir, "learner-test") {
t.Logf("removing: %v", tmpDir)
os.RemoveAll(tmpDir)
}
}()
time.Sleep(time.Second * 3)

t.Logf("begin wait first before restart")
waitRemoteClusterSync(t, ns, leaderNode, learnerSrvs, remoteSrvs)
oldNs := getNsInfo(t, ns, 0)
t.Logf("new isr is: %v", oldNs)
assert.Equal(t, 1, len(oldNs.LearnerNodes))

err := gpdServer.pdCoord.ChangeNamespaceMetaParam(ns, 1, "", 0)
assert.Nil(t, err)

time.Sleep(time.Second * 10)
for {
time.Sleep(time.Second)
newNs := getNsInfo(t, ns, 0)
t.Logf("new isr is: %v", newNs)
waitForAllFullReady(t, ns, 0)
if len(newNs.GetISR()) < 2 {
break
}
}

waitEnoughReplica(t, ns, 0)
waitForAllFullReady(t, ns, 0)
waitBalancedLeader(t, ns, 0)

// restart leader as alone
leader.RestartAsStandalone(common.GetNsDesp(ns, 0))
time.Sleep(time.Second)

waitEnoughReplica(t, ns, 0)
waitForAllFullReady(t, ns, 0)

dnw, leaderNode = waitForLeader(t, ns, 0)
leader = dnw.s
assert.NotNil(t, leader)
leaderNode.Node.OptimizeDB("")
time.Sleep(time.Second * 3)

err = gpdServer.pdCoord.ChangeNamespaceMetaParam(ns, 2, "", 0)
assert.Nil(t, err)

for {
time.Sleep(time.Second)
newNs := getNsInfo(t, ns, 0)
t.Logf("new isr is: %v", newNs)
waitForAllFullReady(t, ns, 0)
if len(newNs.GetISR()) == 2 {
break
}
}
waitEnoughReplica(t, ns, 0)
waitForAllFullReady(t, ns, 0)
waitBalancedLeader(t, ns, 0)
}

func TestInstallSnapshotOnFollower(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion server/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ func (s *Server) ApplyRaftReqs(ctx context.Context, reqs *syncerpb.RaftReqs) (*s
if err != nil {
rpcErr.ErrCode = http.StatusInternalServerError
rpcErr.ErrMsg = err.Error()
return &rpcErr, nil
// we just set error and continue wait other future response
}
}
// should clean here to avoid wait response again in defer
futureList = futureList[:0]
return &rpcErr, nil
}

Expand Down
4 changes: 4 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func (s *Server) getOrInitSyncerNormalInit() error {
if s.dataCoord == nil {
return nil
}
if s.conf.LearnerRole == "" {
// non-syncer node no need write state to etcd
return nil
}
// if etcd key is exist, then use the value in etcd. If not, we set it to the initValue and update to etcd
origV, err := s.dataCoord.GetSyncerNormalInit()
if err == nil {
Expand Down

0 comments on commit a7229fd

Please sign in to comment.