diff --git a/cluster/cluster.go b/cluster/cluster.go index c0037d0fc97..3d6fb2b3078 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -72,7 +72,9 @@ func (c *Service) Open(ctx context.Context, db store.Indexer) error { c.client, c.config.NodeID, c.raftAddr, - c.config.AddrResolver) + c.config.AddrResolver, + c.Service.Ready, + ) bCtx, bCancel := context.WithTimeout(ctx, c.config.BootstrapTimeout) defer bCancel() diff --git a/cluster/store/bootstrap.go b/cluster/store/bootstrap.go index 52943d1dfcb..22ebbff270f 100644 --- a/cluster/store/bootstrap.go +++ b/cluster/store/bootstrap.go @@ -32,6 +32,7 @@ type joiner interface { type Bootstrapper struct { joiner joiner addrResolver addressResolver + isStoreReady func() bool localRaftAddr string localNodeID string @@ -41,7 +42,7 @@ type Bootstrapper struct { } // NewBootstrapper constructs a new bootsrapper -func NewBootstrapper(joiner joiner, raftID, raftAddr string, r addressResolver) *Bootstrapper { +func NewBootstrapper(joiner joiner, raftID, raftAddr string, r addressResolver, isStoreReady func() bool) *Bootstrapper { return &Bootstrapper{ joiner: joiner, addrResolver: r, @@ -49,6 +50,7 @@ func NewBootstrapper(joiner joiner, raftID, raftAddr string, r addressResolver) jitter: time.Second, localNodeID: raftID, localRaftAddr: raftAddr, + isStoreReady: isStoreReady, } } @@ -65,16 +67,43 @@ func (b *Bootstrapper) Do(ctx context.Context, serverPortMap map[string]int, lg case <-ctx.Done(): return ctx.Err() case <-ticker.C: + if b.isStoreReady() { + lg.WithField("action", "bootstrap").Info("node reporting ready, node has probably recovered cluster from raft config. Exiting bootstrap process") + return nil + } + + // If we have found no other server, there is nobody to contact + if len(servers) <= 0 { + continue + } + // try to join an existing cluster - if leader, err := b.join(ctx, servers, voter); err == nil { - lg.WithField("leader", leader).Info("successfully joined cluster") + if leader, err := b.join(ctx, servers, voter); err != nil { + lg.WithFields(logrus.Fields{ + "servers": servers, + "action": "bootstrap", + "voter": voter, + }).WithError(err).Warning("failed to join cluster, will notify next if voter") + } else { + lg.WithFields(logrus.Fields{ + "action": "bootstrap", + "leader": leader, + }).Info("successfully joined cluster") return nil } if voter { // notify other servers about readiness of this node to be joined if err := b.notify(ctx, servers); err != nil { - lg.WithField("servers", servers).WithError(err).Error("notify all peers") + lg.WithFields(logrus.Fields{ + "action": "bootstrap", + "servers": servers, + }).WithError(err).Error("notify all peers") + } else { + lg.WithFields(logrus.Fields{ + "action": "bootstrap", + "servers": servers, + }).Info("notified peers this node is ready to join as voter") } } @@ -86,6 +115,11 @@ func (b *Bootstrapper) Do(ctx context.Context, serverPortMap map[string]int, lg func (b *Bootstrapper) join(ctx context.Context, servers []string, voter bool) (leader string, err error) { var resp *cmd.JoinPeerResponse req := &cmd.JoinPeerRequest{Id: b.localNodeID, Address: b.localRaftAddr, Voter: voter} + // For each server, try to join. + // If we have no error then we have a leader + // If we have an error check for err == NOT_FOUND and leader != "" -> we contacted a non-leader node part of the + // cluster, let's join the leader. + // If no server allows us to join a cluster, return an error for _, addr := range servers { resp, err = b.joiner.Join(ctx, addr, req) if err == nil { diff --git a/cluster/store/bootstrap_test.go b/cluster/store/bootstrap_test.go index 632e12a2805..539c12469a2 100644 --- a/cluster/store/bootstrap_test.go +++ b/cluster/store/bootstrap_test.go @@ -32,6 +32,7 @@ func TestBootStrapper(t *testing.T) { voter bool servers map[string]int doBefore func(*MockJoiner) + isReady func() bool success bool }{ { @@ -41,6 +42,7 @@ func TestBootStrapper(t *testing.T) { doBefore: func(m *MockJoiner) { m.On("Join", anything, anything, anything).Return(&cmd.JoinPeerResponse{}, nil) }, + isReady: func() bool { return false }, success: false, }, { @@ -50,6 +52,7 @@ func TestBootStrapper(t *testing.T) { doBefore: func(m *MockJoiner) { m.On("Join", anything, anything, anything).Return(&cmd.JoinPeerResponse{}, nil) }, + isReady: func() bool { return false }, success: true, }, { @@ -63,6 +66,7 @@ func TestBootStrapper(t *testing.T) { m.On("Notify", anything, "S1:1", anything).Return(&cmd.NotifyPeerResponse{}, nil) m.On("Notify", anything, "S2:2", anything).Return(&cmd.NotifyPeerResponse{}, errAny) }, + isReady: func() bool { return false }, success: false, }, { @@ -75,23 +79,35 @@ func TestBootStrapper(t *testing.T) { m.On("Join", anything, "S2:2", anything).Return(&cmd.JoinPeerResponse{Leader: "S3"}, err) m.On("Join", anything, "S3", anything).Return(&cmd.JoinPeerResponse{}, nil) }, + isReady: func() bool { return false }, success: true, }, + { + name: "exit early on cluster ready", + voter: true, + servers: servers, + doBefore: func(m *MockJoiner) {}, + isReady: func() bool { return true }, + success: true, + }, } for _, test := range tests { - m := &MockJoiner{} - b := NewBootstrapper(m, "RID", "ADDR", &MockAddressResolver{func(id string) string { return id }}) - b.retryPeriod = time.Millisecond - b.jitter = time.Millisecond - test.doBefore(m) - ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100) - err := b.Do(ctx, test.servers, NewMockLogger(t).Logger, test.voter, make(chan struct{})) - cancel() - if test.success && err != nil { - t.Errorf("%s: %v", test.name, err) - } else if !test.success && err == nil { - t.Errorf("%s: test must fail", test.name) - } + test := test + t.Run(test.name, func(t *testing.T) { + m := &MockJoiner{} + b := NewBootstrapper(m, "RID", "ADDR", &MockAddressResolver{func(id string) string { return id }}, test.isReady) + b.retryPeriod = time.Millisecond + b.jitter = time.Millisecond + test.doBefore(m) + ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100) + err := b.Do(ctx, test.servers, NewMockLogger(t).Logger, test.voter, make(chan struct{})) + cancel() + if test.success && err != nil { + t.Errorf("%s: %v", test.name, err) + } else if !test.success && err == nil { + t.Errorf("%s: test must fail", test.name) + } + }) } } diff --git a/cluster/store/store.go b/cluster/store/store.go index 2d3dae3496c..18bee976399 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -767,7 +767,6 @@ func (st *Store) Remove(id string) error { // Notify signals this Store that a node is ready for bootstrapping at the specified address. // Bootstrapping will be initiated once the number of known nodes reaches the expected level, // which includes this node. - func (st *Store) Notify(id, addr string) (err error) { if !st.open.Load() { return ErrNotOpen @@ -783,9 +782,10 @@ func (st *Store) Notify(id, addr string) (err error) { st.candidates[id] = addr if len(st.candidates) < st.bootstrapExpect { st.log.WithFields(logrus.Fields{ + "action": "bootstrap", "expect": st.bootstrapExpect, "got": st.candidates, - }).Debug("number of candidates") + }).Debug("number of candidates lower than bootstrap expect param, stopping notify") return nil } candidates := make([]raft.Server, 0, len(st.candidates)) @@ -800,11 +800,14 @@ func (st *Store) Notify(id, addr string) (err error) { i++ } - st.log.WithField("candidates", candidates).Info("starting cluster bootstrapping") + st.log.WithFields(logrus.Fields{ + "action": "bootstrap", + "candidates": candidates, + }).Info("starting cluster bootstrapping") fut := st.raft.BootstrapCluster(raft.Configuration{Servers: candidates}) if err := fut.Error(); err != nil { - st.log.WithError(err).Error("bootstrapping cluster") + st.log.WithField("action", "bootstrap").WithError(err).Error("could not bootstrapping cluster") if !errors.Is(err, raft.ErrCantBootstrap) { return err }