Skip to content

Commit

Permalink
Fix cluster-wide restart issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Apr 14, 2015
1 parent 1da5bc3 commit c5bdb5a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
29 changes: 16 additions & 13 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,9 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
log.Fatalf("raft: %s", err)
}

index, _ := l.LastLogIndexTerm()
// Checks to see if the raft index is 0. If it's 0, it might be the first
// node in the cluster and must initialize or join
index, _ := l.LastLogIndexTerm()
if index == 0 {
// If we have join URLs, then attemp to join the cluster
if len(brokerURLs) > 0 {
Expand All @@ -524,7 +524,9 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
func joinLog(l *raft.Log, brokerURLs []url.URL) {
// Attempts to join each server until successful.
for _, u := range brokerURLs {
if err := l.Join(u); err != nil {
if err := l.Join(u); err == raft.ErrInitialized {
return
} else if err != nil {
log.Printf("join: failed to connect to raft cluster: %s: %s", (&u).String(), err)
} else {
log.Printf("join: connected raft log to %s", (&u).String())
Expand Down Expand Up @@ -567,17 +569,19 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
if err := s.Open(cmd.config.Data.Dir, c); err != nil {
log.Fatalf("failed to open data node: %v", err.Error())
}
log.Printf("data node opened at %s", cmd.config.Data.Dir)
log.Printf("data node(%d) opened at %s", s.ID(), cmd.config.Data.Dir)

dataNodeIndex := s.Index()
if dataNodeIndex == 0 {
// Give brokers time to elect a leader if entire cluster is being restarted.
time.Sleep(1 * time.Second)

if s.ID() == 0 && s.Index() == 0 {
if len(joinURLs) > 0 {
joinServer(s, cmd.config.ClusterURL(), joinURLs)
return s
}

if err := s.Initialize(cmd.config.ClusterURL()); err != nil {
log.Fatalf("server initialization error: %s", err)
log.Fatalf("server initialization error(0): %s", err)
}

u := cmd.config.ClusterURL()
Expand All @@ -596,15 +600,14 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {

// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(&u, &joinURL); err != nil {
if err := s.Join(&u, &joinURL); err == influxdb.ErrDataNodeNotFound {
// No data nodes could be found to join. We're the first.
if err == influxdb.ErrDataNodeNotFound {
if err := s.Initialize(u); err != nil {
log.Fatalf("server initialization error: %s", err)
}
log.Printf("initialized data node: %s\n", (&u).String())
return
if err := s.Initialize(u); err != nil {
log.Fatalf("server initialization error(1): %s", err)
}
log.Printf("initialized data node: %s\n", (&u).String())
return
} else if err != nil {
log.Printf("join: failed to connect data node: %s: %s", (&u).String(), err)
} else {
log.Printf("join: connected data node to %s", u)
Expand Down
2 changes: 2 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
// If we get a service unavailable, the other data nodes may still be booting
// so retry again
if resp.StatusCode == http.StatusServiceUnavailable {
s.Logger.Printf("join unavailable, retrying")
retries += 1
time.Sleep(1 * time.Second)
continue
Expand All @@ -709,6 +710,7 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
// has given us the address of a known data node to join instead.
if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
s.Logger.Printf("redirect join: %s", redirectURL)

// if we happen to get redirected back to ourselves then we'll never join. This
// may because the heartbeater could have already fired once, registering our endpoints
Expand Down

0 comments on commit c5bdb5a

Please sign in to comment.