Skip to content

Commit

Permalink
make bootstrap return on successful notify
Browse files Browse the repository at this point in the history
Signed-off-by: Loic Reyreaud <loic@weaviate.io>
  • Loading branch information
reyreaud-l committed May 8, 2024
1 parent 18e100e commit 0e7e616
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 22 deletions.
4 changes: 3 additions & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func (c *Service) Open(ctx context.Context, db store.Indexer) error {
c.client,
c.config.NodeID,
c.raftAddr,
c.config.AddrResolver)
c.config.AddrResolver,
c.Service.Ready,
)

bCtx, bCancel := context.WithTimeout(ctx, c.config.BootstrapTimeout)
defer bCancel()
Expand Down
42 changes: 38 additions & 4 deletions cluster/store/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type joiner interface {
type Bootstrapper struct {
joiner joiner
addrResolver addressResolver
isStoreReady func() bool

localRaftAddr string
localNodeID string
Expand All @@ -41,14 +42,15 @@ type Bootstrapper struct {
}

// NewBootstrapper constructs a new bootsrapper
func NewBootstrapper(joiner joiner, raftID, raftAddr string, r addressResolver) *Bootstrapper {
func NewBootstrapper(joiner joiner, raftID, raftAddr string, r addressResolver, isStoreReady func() bool) *Bootstrapper {
return &Bootstrapper{
joiner: joiner,
addrResolver: r,
retryPeriod: time.Second,
jitter: time.Second,
localNodeID: raftID,
localRaftAddr: raftAddr,
isStoreReady: isStoreReady,
}
}

Expand All @@ -65,16 +67,43 @@ func (b *Bootstrapper) Do(ctx context.Context, serverPortMap map[string]int, lg
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if b.isStoreReady() {
lg.WithField("action", "bootstrap").Info("node reporting ready, node has probably recovered cluster from raft config. Exiting bootstrap process")
return nil
}

// If we have found no other server, there is nobody to contact
if len(servers) <= 0 {
continue
}

// try to join an existing cluster
if leader, err := b.join(ctx, servers, voter); err == nil {
lg.WithField("leader", leader).Info("successfully joined cluster")
if leader, err := b.join(ctx, servers, voter); err != nil {
lg.WithFields(logrus.Fields{
"servers": servers,
"action": "bootstrap",
"voter": voter,
}).WithError(err).Warning("failed to join cluster, will notify next if voter")
} else {
lg.WithFields(logrus.Fields{
"action": "bootstrap",
"leader": leader,
}).Info("successfully joined cluster")
return nil
}

if voter {
// notify other servers about readiness of this node to be joined
if err := b.notify(ctx, servers); err != nil {
lg.WithField("servers", servers).WithError(err).Error("notify all peers")
lg.WithFields(logrus.Fields{
"action": "bootstrap",
"servers": servers,
}).WithError(err).Error("notify all peers")
} else {
lg.WithFields(logrus.Fields{
"action": "bootstrap",
"servers": servers,
}).Info("notified peers this node is ready to join as voter")
}
}

Expand All @@ -86,6 +115,11 @@ func (b *Bootstrapper) Do(ctx context.Context, serverPortMap map[string]int, lg
func (b *Bootstrapper) join(ctx context.Context, servers []string, voter bool) (leader string, err error) {
var resp *cmd.JoinPeerResponse
req := &cmd.JoinPeerRequest{Id: b.localNodeID, Address: b.localRaftAddr, Voter: voter}
// For each server, try to join.
// If we have no error then we have a leader
// If we have an error check for err == NOT_FOUND and leader != "" -> we contacted a non-leader node part of the
// cluster, let's join the leader.
// If no server allows us to join a cluster, return an error
for _, addr := range servers {
resp, err = b.joiner.Join(ctx, addr, req)
if err == nil {
Expand Down
42 changes: 29 additions & 13 deletions cluster/store/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestBootStrapper(t *testing.T) {
voter bool
servers map[string]int
doBefore func(*MockJoiner)
isReady func() bool
success bool
}{
{
Expand All @@ -41,6 +42,7 @@ func TestBootStrapper(t *testing.T) {
doBefore: func(m *MockJoiner) {
m.On("Join", anything, anything, anything).Return(&cmd.JoinPeerResponse{}, nil)
},
isReady: func() bool { return false },
success: false,
},
{
Expand All @@ -50,6 +52,7 @@ func TestBootStrapper(t *testing.T) {
doBefore: func(m *MockJoiner) {
m.On("Join", anything, anything, anything).Return(&cmd.JoinPeerResponse{}, nil)
},
isReady: func() bool { return false },
success: true,
},
{
Expand All @@ -63,6 +66,7 @@ func TestBootStrapper(t *testing.T) {
m.On("Notify", anything, "S1:1", anything).Return(&cmd.NotifyPeerResponse{}, nil)
m.On("Notify", anything, "S2:2", anything).Return(&cmd.NotifyPeerResponse{}, errAny)
},
isReady: func() bool { return false },
success: false,
},
{
Expand All @@ -75,23 +79,35 @@ func TestBootStrapper(t *testing.T) {
m.On("Join", anything, "S2:2", anything).Return(&cmd.JoinPeerResponse{Leader: "S3"}, err)
m.On("Join", anything, "S3", anything).Return(&cmd.JoinPeerResponse{}, nil)
},
isReady: func() bool { return false },
success: true,
},
{
name: "exit early on cluster ready",
voter: true,
servers: servers,
doBefore: func(m *MockJoiner) {},
isReady: func() bool { return true },
success: true,
},
}
for _, test := range tests {
m := &MockJoiner{}
b := NewBootstrapper(m, "RID", "ADDR", &MockAddressResolver{func(id string) string { return id }})
b.retryPeriod = time.Millisecond
b.jitter = time.Millisecond
test.doBefore(m)
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
err := b.Do(ctx, test.servers, NewMockLogger(t).Logger, test.voter, make(chan struct{}))
cancel()
if test.success && err != nil {
t.Errorf("%s: %v", test.name, err)
} else if !test.success && err == nil {
t.Errorf("%s: test must fail", test.name)
}
test := test
t.Run(test.name, func(t *testing.T) {
m := &MockJoiner{}
b := NewBootstrapper(m, "RID", "ADDR", &MockAddressResolver{func(id string) string { return id }}, test.isReady)
b.retryPeriod = time.Millisecond
b.jitter = time.Millisecond
test.doBefore(m)
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
err := b.Do(ctx, test.servers, NewMockLogger(t).Logger, test.voter, make(chan struct{}))
cancel()
if test.success && err != nil {
t.Errorf("%s: %v", test.name, err)
} else if !test.success && err == nil {
t.Errorf("%s: test must fail", test.name)
}
})
}
}

Expand Down
11 changes: 7 additions & 4 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,6 @@ func (st *Store) Remove(id string) error {
// Notify signals this Store that a node is ready for bootstrapping at the specified address.
// Bootstrapping will be initiated once the number of known nodes reaches the expected level,
// which includes this node.

func (st *Store) Notify(id, addr string) (err error) {
if !st.open.Load() {
return ErrNotOpen
Expand All @@ -783,9 +782,10 @@ func (st *Store) Notify(id, addr string) (err error) {
st.candidates[id] = addr
if len(st.candidates) < st.bootstrapExpect {
st.log.WithFields(logrus.Fields{
"action": "bootstrap",
"expect": st.bootstrapExpect,
"got": st.candidates,
}).Debug("number of candidates")
}).Debug("number of candidates lower than bootstrap expect param, stopping notify")
return nil
}
candidates := make([]raft.Server, 0, len(st.candidates))
Expand All @@ -800,11 +800,14 @@ func (st *Store) Notify(id, addr string) (err error) {
i++
}

st.log.WithField("candidates", candidates).Info("starting cluster bootstrapping")
st.log.WithFields(logrus.Fields{
"action": "bootstrap",
"candidates": candidates,
}).Info("starting cluster bootstrapping")

fut := st.raft.BootstrapCluster(raft.Configuration{Servers: candidates})
if err := fut.Error(); err != nil {
st.log.WithError(err).Error("bootstrapping cluster")
st.log.WithField("action", "bootstrap").WithError(err).Error("could not bootstrapping cluster")
if !errors.Is(err, raft.ErrCantBootstrap) {
return err
}
Expand Down

0 comments on commit 0e7e616

Please sign in to comment.