Skip to content

Commit

Permalink
chore: synchronize the code to the community (#443)
Browse files Browse the repository at this point in the history
Signed-off-by: shilinlee <836160610@qq.com>
  • Loading branch information
shilinlee committed Dec 4, 2023
1 parent efaa1de commit 16d6c75
Show file tree
Hide file tree
Showing 249 changed files with 19,983 additions and 4,123 deletions.
4 changes: 2 additions & 2 deletions app/ts-meta/meta/assign_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestAssignEventStateTransition(t *testing.T) {
if err != nil {
t.Fatal(err)
}
globalService.clusterManager.addClusterMember(1)
defer globalService.clusterManager.removeClusterMember(1)
globalService.clusterManager.handleClusterMember(1, &serf.MemberEvent{Type: serf.EventMemberJoin, Members: nil, EventTime: 1})
defer globalService.clusterManager.handleClusterMember(1, &serf.MemberEvent{Type: serf.EventMemberFailed, Members: nil, EventTime: 2})
// active take over
cmd = GenerateCreateDataNodeCmd("127.0.0.1:8400", "127.0.0.1:8401")
if err = globalService.store.ApplyCmd(cmd); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions app/ts-meta/meta/balance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (b *BalanceManager) Start() {
}

func (b *BalanceManager) balanceIfNeeded() {
logger.GetLogger().Info("[balancer] 1.0 algo start")
defer b.wg.Done()
for {
if atomic.LoadInt32(&b.stopped) == 1 || config.GetHaPolicy() != config.SharedStorage {
Expand All @@ -73,6 +74,7 @@ func (b *BalanceManager) balanceIfNeeded() {
}

func (b *BalanceManager) balanceIfNeededEx() {
logger.GetLogger().Info("[balancer] 1.1 algo start")
defer b.wg.Done()
for {
if atomic.LoadInt32(&b.stopped) == 1 || config.GetHaPolicy() != config.SharedStorage {
Expand Down
32 changes: 16 additions & 16 deletions app/ts-meta/meta/balance_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func TestSelectDbPtsToMove(t *testing.T) {
TakeOverEnabled: true,
},
}
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402")
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402", "")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402", "")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402", "")
_ = store.data.UpdateNodeStatus(n1, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n2, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n3, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1))
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1, nil))

store.data.PtView = map[string]meta.DBPtInfos{
"db0": []meta.PtInfo{meta.PtInfo{PtId: 0, Owner: meta.PtOwner{NodeID: n1}, Status: meta.Online},
Expand Down Expand Up @@ -118,13 +118,13 @@ func TestBalanceDBPts1(t *testing.T) {
TakeOverEnabled: true,
},
}
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402")
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402", "")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402", "")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402", "")
_ = store.data.UpdateNodeStatus(n1, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n2, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n3, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1))
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1, nil))

store.data.PtView = map[string]meta.DBPtInfos{
"db0": []meta.PtInfo{meta.PtInfo{PtId: 0, Owner: meta.PtOwner{NodeID: n2}, Status: meta.Online},
Expand Down Expand Up @@ -155,13 +155,13 @@ func TestBalanceDBPts2(t *testing.T) {
TakeOverEnabled: true,
},
}
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402")
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402", "")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402", "")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402", "")
_ = store.data.UpdateNodeStatus(n1, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n2, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n3, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1))
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1, nil))

store.data.PtView = map[string]meta.DBPtInfos{
"db0": []meta.PtInfo{meta.PtInfo{PtId: 0, Owner: meta.PtOwner{NodeID: n1}, Status: meta.Online},
Expand Down Expand Up @@ -193,13 +193,13 @@ func TestBalanceDBPts3(t *testing.T) {
TakeOverEnabled: true,
},
}
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402")
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402", "")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402", "")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402", "")
_ = store.data.UpdateNodeStatus(n1, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n2, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n3, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1))
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1, nil))

store.data.PtView = map[string]meta.DBPtInfos{
"db0": []meta.PtInfo{meta.PtInfo{PtId: 0, Owner: meta.PtOwner{NodeID: n1}, Status: meta.Online},
Expand Down
7 changes: 7 additions & 0 deletions app/ts-meta/meta/balancer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,10 @@ func TestBalanceIfNeeded(t *testing.T) {
assert.Equal(t, 3, nodePtNumMap[3])
assert.Equal(t, 3, nodePtNumMap[4])
}

func TestBalanceIfNeededStart(t *testing.T) {
bm := NewBalanceManager("v1.0")
bm.stopped = 1
bm.wg.Add(1)
bm.balanceIfNeeded()
}
45 changes: 27 additions & 18 deletions app/ts-meta/meta/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ func (cm *ClusterManager) processEvent(event serf.Event) {

func (cm *ClusterManager) addEventMap(name string, event *serf.MemberEvent) {
cm.mu.Lock()
cm.eventMap[name] = event
e, ok := cm.eventMap[name]
if !ok || e.EventTime < event.EventTime {
cm.eventMap[name] = event
}
cm.mu.Unlock()
}

Expand All @@ -281,17 +284,19 @@ func (cm *ClusterManager) isNodeAlive(id uint64) bool {
return ok
}

// addClusterMember adds ts-store node id to memberIds
func (cm *ClusterManager) addClusterMember(id uint64) {
cm.mu.Lock()
cm.memberIds[id] = struct{}{}
cm.mu.Unlock()
}

func (cm *ClusterManager) removeClusterMember(id uint64) {
// handleClusterMember adds or removes ts-store node id to memberIds
func (cm *ClusterManager) handleClusterMember(id uint64, e *serf.MemberEvent) {
cm.mu.Lock()
delete(cm.memberIds, id)
cm.mu.Unlock()
defer cm.mu.Unlock()
gotEvent, ok := cm.eventMap[strconv.FormatUint(id, 10)]
if ok && e.EventTime < gotEvent.EventTime {
return
}
if e.EventType() == serf.EventMemberJoin {
cm.memberIds[id] = struct{}{}
} else if e.EventType() == serf.EventMemberFailed {
delete(cm.memberIds, id)
}
}

// only choose the pt owner as the dest node in write-available-first policy
Expand Down Expand Up @@ -447,18 +452,22 @@ func (cm *ClusterManager) processReplication(dbPt *meta.DbPtInfo) error {
rgId := dbPt.Pti.RGID
rg := &rgs[rgId]
if rg.MasterPtID == dbPt.Pti.PtId {
masterId, peers, success := electRgMaster(rg, ptInfos)
logger.NewLogger(errno.ModuleHA).Info("master failed, elect rg new master", zap.Uint32("oldMaster", rg.MasterPtID),
zap.Uint32("newMaster", masterId), zap.Any("old peers", rg.Peers), zap.Any("new peers", peers), zap.Bool("success", success))
masterId, newPeers, success := electRgMaster(rg, ptInfos)
if success {
return globalService.store.updateReplication(dbPt.Db, rgId, masterId, peers, uint32(meta.SubHealth))
logger.NewLogger(errno.ModuleHA).Info("master failed, elect rg new master success", zap.String("db", dbPt.Db),
zap.Uint32("oldMaster", rg.MasterPtID), zap.Uint32("newMaster", masterId),
zap.Any("old peers", rg.Peers), zap.Any("new peers", newPeers))
return globalService.store.updateReplication(dbPt.Db, rgId, masterId, newPeers, uint32(meta.SubHealth))
}
logger.NewLogger(errno.ModuleHA).Info("master failed, elect rg new master failed", zap.String("db", dbPt.Db),
zap.Uint32("oldMaster", rg.MasterPtID), zap.Any("old peers", rg.Peers))
return nil
}

peers := generatePeers(rg, dbPt)
logger.NewLogger(errno.ModuleHA).Info("slave failed, update peers", zap.Any("old peers", rg.Peers), zap.Any("new peers", peers))
return globalService.store.updateReplication(dbPt.Db, rgId, rg.MasterPtID, peers, uint32(meta.SubHealth))
newPeers := generatePeers(rg, dbPt)
logger.NewLogger(errno.ModuleHA).Info("slave failed, update peers", zap.String("db", dbPt.Db),
zap.Any("old peers", rg.Peers), zap.Any("new peers", newPeers))
return globalService.store.updateReplication(dbPt.Db, rgId, rg.MasterPtID, newPeers, uint32(meta.SubHealth))
}

func (cm *ClusterManager) enableTakeover(enable bool) {
Expand Down
10 changes: 5 additions & 5 deletions app/ts-meta/meta/cluster_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func TestClusterManager_PassiveTakeOver(t *testing.T) {
// do not take over when takeoverEnabled is false
globalService.store.data.TakeOverEnabled = false
globalService.clusterManager.enableTakeover(false)
e = *generateMemberEvent(serf.EventMemberFailed, "2", 1, serf.StatusFailed)
e = *generateMemberEvent(serf.EventMemberFailed, "2", 2, serf.StatusFailed)
globalService.clusterManager.eventCh <- e
time.Sleep(time.Second)
assert.Equal(t, serf.StatusAlive, globalService.store.data.DataNode(2).Status)
Expand Down Expand Up @@ -468,13 +468,13 @@ func TestClusterManager_PassiveTakeOver_WhenDropDB(t *testing.T) {
BalancerEnabled: true,
},
}
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402")
_, n1 := store.data.CreateDataNode("127.0.0.1:8401", "127.0.0.1:8402", "")
_, n2 := store.data.CreateDataNode("127.0.0.2:8401", "127.0.0.2:8402", "")
_, n3 := store.data.CreateDataNode("127.0.0.3:8401", "127.0.0.3:8402", "")
_ = store.data.UpdateNodeStatus(n1, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n2, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
_ = store.data.UpdateNodeStatus(n3, int32(serf.StatusAlive), 1, "127.0.0.1:8011")
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1))
assert.NoError(t, store.data.CreateDatabase("db0", nil, nil, false, 1, nil))
c.store = store

dbPt := &meta.DbPtInfo{
Expand Down
4 changes: 2 additions & 2 deletions app/ts-meta/meta/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func TestHttpHandler_ServeHTTP(t *testing.T) {
}
defer mms.Close()

err, node1 := mms.GetStore().data.CreateDataNode("127.0.0.1:8400", "127.0.0.1:8401")
err, node1 := mms.GetStore().data.CreateDataNode("127.0.0.1:8400", "127.0.0.1:8401", "")
if err != nil {
t.Fatal(err)
}
err, node2 := mms.GetStore().data.CreateDataNode("127.0.0.2:8400", "127.0.0.2:8401")
err, node2 := mms.GetStore().data.CreateDataNode("127.0.0.2:8400", "127.0.0.2:8401", "")
if err != nil {
t.Fatal(err)
}
Expand Down
21 changes: 21 additions & 0 deletions app/ts-meta/meta/handlers.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 42 additions & 1 deletion app/ts-meta/meta/handlers_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"sync"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/openGemini/openGemini/lib/metaclient"
"github.com/openGemini/openGemini/open_src/influx/meta"
proto2 "github.com/openGemini/openGemini/open_src/influx/meta/proto"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -59,7 +61,8 @@ func (h *CreateNode) Process() (transport.Codec, error) {

httpAddr := h.req.WriteHost
tcpAddr := h.req.QueryHost
b, err := h.store.createDataNode(httpAddr, tcpAddr)
role := h.req.Role
b, err := h.store.createDataNode(httpAddr, tcpAddr, role)
if err != nil {
h.logger.Error("createNode fail", zap.Error(err))
rsp.Err = err.Error()
Expand Down Expand Up @@ -413,3 +416,41 @@ func (h *VerifyDataNodeStatus) Process() (transport.Codec, error) {
}
return rsp, nil
}

func (h *SendSysCtrlToMeta) Process() (transport.Codec, error) {
rsp := &message.SendSysCtrlToMetaResponse{}

var inner = func() (bool, string, string, error) {
switchStr, ok := h.req.Param["switchon"]
if !ok {
return false, "", "", fmt.Errorf("missing the required parameter 'switchon' for failpoint")
}
switchon, err := strconv.ParseBool(switchStr)
if err != nil {
return false, "", "", err
}
point, ok := h.req.Param["point"]
if !ok {
return false, "", "", fmt.Errorf("missing the required parameter 'point' for failpoint")
}
if !switchon {
return false, point, "", nil
}
term, ok := h.req.Param["term"]
if !ok {
return false, "", "", fmt.Errorf("missing the required parameter 'term' for failpoint")
}
return switchon, point, term, nil
}

switchon, point, term, err := inner()
if err == nil && !switchon {
err = failpoint.Disable(point)
} else if err == nil {
err = failpoint.Enable(point, term)
}
if err != nil {
rsp.Err = err.Error()
}
return rsp, nil
}
62 changes: 62 additions & 0 deletions app/ts-meta/meta/handlers_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,65 @@ func TestVerifyDataNodeStatusProcess(t *testing.T) {
_, err = h.Process()
assert.NoError(t, err)
}

func TestSendSysCtrlToMetaProcess(t *testing.T) {
mockStore := NewMockRPCStore()
type TestCase struct {
Param map[string]string
ExpectErr string
}
for _, testcase := range []TestCase{
{
Param: map[string]string{},
ExpectErr: "missing the required parameter 'switchon' for failpoint",
},
{
Param: map[string]string{
"switchon": "not bool value",
},
ExpectErr: `strconv.ParseBool: parsing "not bool value": invalid syntax`,
},
{
Param: map[string]string{
"switchon": "true",
},
ExpectErr: "missing the required parameter 'point' for failpoint",
},
{
Param: map[string]string{
"switchon": "true",
"point": "mock_point",
},
ExpectErr: "missing the required parameter 'term' for failpoint",
},
{
Param: map[string]string{
"switchon": "true",
"point": "mock_point",
"term": "return(true)",
},
ExpectErr: "",
},
{
Param: map[string]string{
"switchon": "false",
"point": "mock_point",
},
ExpectErr: "",
},
} {
msg := message.NewMetaMessage(message.SendSysCtrlToMetaRequestMessage, &message.SendSysCtrlToMetaRequest{
Mod: "failpoint",
Param: testcase.Param,
})
h := New(msg.Type())
h.InitHandler(mockStore, nil, nil)
var err error
if err = h.SetRequestMsg(msg.Data()); err != nil {
t.Fatal(err)
}
res, err := h.Process()
assert.NoError(t, err)
assert.Equal(t, res.(*message.SendSysCtrlToMetaResponse).Err, testcase.ExpectErr)
}
}
Loading

0 comments on commit 16d6c75

Please sign in to comment.