Skip to content

Commit

Permalink
quorum: join/leave/update write members into config file
Browse files Browse the repository at this point in the history
In the previous version, members were stored in the BadgerDB.

Read or write from config file after applying this patch.
  • Loading branch information
fanyang89 authored and 1023280072 committed Sep 7, 2023
1 parent 003604a commit 906d0e1
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 60 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
Expand All @@ -80,6 +81,7 @@ require (
go.opencensus.io v0.24.0 // indirect
golang.org/x/arch v0.4.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down Expand Up @@ -430,6 +432,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
8 changes: 5 additions & 3 deletions tcpmon/http_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ func JoinCluster(q *Quorum) func(c *gin.Context) {
return
}

clusterIPAddr := strings.TrimSpace(string(buf))
member := strings.TrimSpace(string(buf))
members := make(map[string]string)
members[member] = ""

_, err = q.TryJoin([]string{clusterIPAddr})
_, err = q.TryJoin(members)
if err != nil {
c.JSON(http.StatusOK, ErrorJSON(errors.WithStack(err)))
return
Expand All @@ -54,7 +56,7 @@ func JoinCluster(q *Quorum) func(c *gin.Context) {

func LeaveCluster(q *Quorum) func(c *gin.Context) {
return func(c *gin.Context) {
err := q.mlist.Leave(3 * time.Second)
err := q.Leave(3 * time.Second)
c.JSON(http.StatusOK, ErrorJSON(err))
}
}
14 changes: 8 additions & 6 deletions tcpmon/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)

type Monitor struct {
Expand Down Expand Up @@ -90,12 +91,13 @@ func (m *Monitor) Run(ctx context.Context) error {

m.startHttpServer(m.config.HttpListen)

// initialMembers, err := m.datastore.GetMemberAddressList()
// if err != nil {
// log.Info().Err(err).Msg("Get members from db failed")
// } else if len(initialMembers) != 0 {
// m.quorum.Join(initialMembers)
// }
members := viper.GetStringMapString("members")
if members != nil {
_, err := m.quorum.TryJoin(members)
if err != nil {
log.Warn().Err(err).Msg("Join cluster failed")
}
}

for {
select {
Expand Down
142 changes: 91 additions & 51 deletions tcpmon/quorum.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,40 @@
package tcpmon

import (
"fmt"
"net"
"time"

"github.com/gogo/protobuf/proto"
"github.com/cockroachdb/errors"
"github.com/hashicorp/memberlist"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/spf13/viper"
)

type Quorum struct {
mlist *memberlist.Memberlist
ds *DataStore
mlist *memberlist.Memberlist
ds *DataStore
listenAddr IpAddr
}

func NewQuorum(ds *DataStore, mconfig *MonitorConfig) *Quorum {
func NewQuorum(ds *DataStore, monitorConfig *MonitorConfig) *Quorum {
q := &Quorum{
ds: ds,
}

// create memberlist
config := memberlist.DefaultLANConfig()
config.Events = q
config.LogOutput = NewMemberlistLogger()
config.BindPort = mconfig.QuorumPort
config.AdvertisePort = mconfig.QuorumPort
config.BindPort = monitorConfig.QuorumPort
config.AdvertisePort = monitorConfig.QuorumPort

m, err := memberlist.Create(config)
if err != nil {
log.Fatal().Err(err).Msg("create memberlist failed")
}
q.mlist = m

my := m.LocalNode()
log.Info().Str("hostname", my.String()).
Str("address", my.Address()).
Msg("Quorum created")

// update local meta
var memberInfo MemberInfo
ipaddr := ParseIpAddr(mconfig.HttpListen)
ipaddr.Address = my.Addr.String()
memberInfo.HttpListen = fmt.Sprintf("http://%s", ipaddr.String())
buf, err := proto.Marshal(&memberInfo)
if err != nil {
log.Fatal().Err(err).Msg("Marshal member info failed")
}
my.Meta = buf
// err = ds.UpdateMember(my.Address(), buf)
// if err != nil {
// log.Fatal().Err(err).Msg("Update my member info failed")
// }

return q
}

Expand All @@ -62,28 +45,60 @@ func (q *Quorum) Close() {
}
}

func writeConfig() {
err := viper.WriteConfig()
if err != nil {
log.Warn().Err(err).Msg("Write members to config failed")
}
}

func (q *Quorum) configMemberJoin(member string, meta string) {
members := viper.GetStringMapString("members")
_, ok := members[member]
if ok {
log.Warn().Str("member", member).Msg("Already in the quorum")
return
}

members[member] = meta
viper.Set("members", members)
writeConfig()
}

func (q *Quorum) configMemberLeave(member string) {
members := viper.GetStringMapString("members")
delete(members, member)
viper.Set("members", members)
writeConfig()
}

func (q *Quorum) configMemberUpdate(member string, meta string) {
members := viper.GetStringMapString("members")
members[member] = meta
viper.Set("members", members)
writeConfig()
}

func (q *Quorum) NotifyJoin(node *memberlist.Node) {
log.Info().Str("node", node.Address()).Msgf("Member joined")
// err := q.ds.AddMember(node.Address(), node.Meta)
// if err != nil {
// log.Warn().Err(err).Str("node", node.Address()).Msg("Save member failed")
// }
log.Info().Str("node", node.Address()).
Str("meta", string(node.Meta)).
Msgf("Member joined")
q.configMemberJoin(node.Address(), string(node.Meta))

}

func (q *Quorum) NotifyLeave(node *memberlist.Node) {
log.Info().Str("node", node.Address()).Msgf("Member left")
// err := q.ds.DeleteMember(node.Address())
// if err != nil {
// log.Warn().Err(err).Str("node", node.Address()).Msg("Save member failed")
// }
log.Info().Str("node", node.Address()).
Str("meta", string(node.Meta)).
Msgf("Member left quorum")
q.configMemberLeave(node.Address())
}

func (q *Quorum) NotifyUpdate(node *memberlist.Node) {
log.Info().Str("node", node.Address()).Msgf("Update member meta data")
// err := q.ds.UpdateMember(node.Address(), node.Meta)
// if err != nil {
// log.Warn().Err(err).Str("node", node.Address()).Msg("Update member failed")
// }
log.Info().Str("node", node.Address()).
Str("meta", string(node.Meta)).
Msgf("Update member meta data")
q.configMemberUpdate(node.Address(), string(node.Meta))
}

func (q *Quorum) Local() *memberlist.Node {
Expand All @@ -98,27 +113,52 @@ func (q *Quorum) Leave(timeout time.Duration) error {
return q.mlist.Leave(timeout)
}

func (q *Quorum) TryJoin(members []string) (int, error) {
return q.mlist.Join(members)
func (q *Quorum) TryJoin(members map[string]string) (int, error) {
m := lo.Keys(members)
if len(m) > 0 {
return q.mlist.Join(m)
}
return 0, errors.New("members is empty")
}

func (q *Quorum) Join(members []string) {
for i := 0; i < 3; i++ {
func (q *Quorum) Join(members map[string]string, retry int, delay time.Duration) {
if retry == 0 {
retry = 3
}
if delay == 0 {
delay = time.Second
}

for i := 0; i < retry; i++ {
_, err := q.TryJoin(members)
if err != nil {
log.Err(err).Strs("members", members).Msg("join quorum failed")
log.Err(err).Msg("join quorum failed")
} else {
log.Info().Strs("members", members).Msg("join quorum success")
log.Info().Msg("join quorum success")
break
}
time.Sleep(1 * time.Second)
time.Sleep(delay)
}
}

func (q *Quorum) MyIP() net.IP {
return net.ParseIP(GetIpFromAddress(q.mlist.LocalNode().Address()))
}

func (q *Quorum) GetMemberMeta(member string) (map[string]any, error) {
return q.ds.GetMemberMeta(member)
func (q *Quorum) My() *memberlist.Node {
return q.mlist.LocalNode()
}

func (q *Quorum) GetMemberMeta(member string) (string, error) {
members := viper.GetStringMapString("members")
if members == nil {
return "", errors.New("Get members failed")
}

m, ok := members[member]
if !ok {
return "", errors.Newf("Member %s not in the cluster", member)
}

return m, nil
}

0 comments on commit 906d0e1

Please sign in to comment.