From c5bdb5af86ea306b8d09ac7aa4246f0e2a02b043 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 14 Apr 2015 13:43:25 -0600 Subject: [PATCH] Fix cluster-wide restart issue. --- cmd/influxd/run.go | 29 ++++++++++++++++------------- server.go | 2 ++ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 74476493860..9eff5b1710a 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -499,9 +499,9 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) { log.Fatalf("raft: %s", err) } - index, _ := l.LastLogIndexTerm() // Checks to see if the raft index is 0. If it's 0, it might be the first // node in the cluster and must initialize or join + index, _ := l.LastLogIndexTerm() if index == 0 { // If we have join URLs, then attemp to join the cluster if len(brokerURLs) > 0 { @@ -524,7 +524,9 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) { func joinLog(l *raft.Log, brokerURLs []url.URL) { // Attempts to join each server until successful. for _, u := range brokerURLs { - if err := l.Join(u); err != nil { + if err := l.Join(u); err == raft.ErrInitialized { + return + } else if err != nil { log.Printf("join: failed to connect to raft cluster: %s: %s", (&u).String(), err) } else { log.Printf("join: connected raft log to %s", (&u).String()) @@ -567,17 +569,19 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server { if err := s.Open(cmd.config.Data.Dir, c); err != nil { log.Fatalf("failed to open data node: %v", err.Error()) } - log.Printf("data node opened at %s", cmd.config.Data.Dir) + log.Printf("data node(%d) opened at %s", s.ID(), cmd.config.Data.Dir) - dataNodeIndex := s.Index() - if dataNodeIndex == 0 { + // Give brokers time to elect a leader if entire cluster is being restarted. + time.Sleep(1 * time.Second) + + if s.ID() == 0 && s.Index() == 0 { if len(joinURLs) > 0 { joinServer(s, cmd.config.ClusterURL(), joinURLs) return s } if err := s.Initialize(cmd.config.ClusterURL()); err != nil { - log.Fatalf("server initialization error: %s", err) + log.Fatalf("server initialization error(0): %s", err) } u := cmd.config.ClusterURL() @@ -596,15 +600,14 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) { // Create data node on an existing data node. for _, joinURL := range joinURLs { - if err := s.Join(&u, &joinURL); err != nil { + if err := s.Join(&u, &joinURL); err == influxdb.ErrDataNodeNotFound { // No data nodes could be found to join. We're the first. - if err == influxdb.ErrDataNodeNotFound { - if err := s.Initialize(u); err != nil { - log.Fatalf("server initialization error: %s", err) - } - log.Printf("initialized data node: %s\n", (&u).String()) - return + if err := s.Initialize(u); err != nil { + log.Fatalf("server initialization error(1): %s", err) } + log.Printf("initialized data node: %s\n", (&u).String()) + return + } else if err != nil { log.Printf("join: failed to connect data node: %s: %s", (&u).String(), err) } else { log.Printf("join: connected data node to %s", u) diff --git a/server.go b/server.go index 9d8d22ee0ba..b7a0e35a0f8 100644 --- a/server.go +++ b/server.go @@ -700,6 +700,7 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error { // If we get a service unavailable, the other data nodes may still be booting // so retry again if resp.StatusCode == http.StatusServiceUnavailable { + s.Logger.Printf("join unavailable, retrying") retries += 1 time.Sleep(1 * time.Second) continue @@ -709,6 +710,7 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error { // has given us the address of a known data node to join instead. if resp.StatusCode == http.StatusTemporaryRedirect { redirectURL, err := url.Parse(resp.Header.Get("Location")) + s.Logger.Printf("redirect join: %s", redirectURL) // if we happen to get redirected back to ourselves then we'll never join. This // may because the heartbeater could have already fired once, registering our endpoints