diff --git a/server/clustering.go b/server/clustering.go index 016bb8cf..612d56e7 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -64,6 +64,14 @@ type ClusteringOptions struct { Sync bool // Do a file sync after every write to the Raft log and message store. RaftLogging bool // Enable logging of Raft library (disabled by default since really verbose). + // When a node processes a snapshot (either on startup or if falling behind) and its is + // not in phase with the message store's state, it is required to reconcile its state + // with the current leader. If it is unable, the node will fail to start or exit. + // If all nodes are starting and there is no way to have a leader at this point, + // then if this boolean is set to true, then the node will attempt to reconcile but + // if it can't it will still proceed. + ProceedOnRestoreFailure bool + // These will be set to some sane defaults. Change only if experiencing raft issues. RaftHeartbeatTimeout time.Duration RaftElectionTimeout time.Duration diff --git a/server/clustering_test.go b/server/clustering_test.go index 1c8ae865..f5fe9671 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -6406,6 +6406,11 @@ func TestClusteringRestoreSnapshotMsgsBailIfNoLeader(t *testing.T) { t.Fatalf(e.Error()) } } + + // Now restart it with an option to force start... + s3sOpts.Clustering.ProceedOnRestoreFailure = true + s3 = runServerWithOpts(t, s3sOpts, nil) + s3.Shutdown() } func TestClusteringSQLMsgStoreFlushed(t *testing.T) { diff --git a/server/conf.go b/server/conf.go index e9cef251..0a9f2fa3 100644 --- a/server/conf.go +++ b/server/conf.go @@ -287,6 +287,11 @@ func parseCluster(itf interface{}, opts *Options) error { return err } opts.Clustering.Sync = v.(bool) + case "proceed_on_restore_failure": + if err := checkType(k, reflect.Bool, v); err != nil { + return err + } + opts.Clustering.ProceedOnRestoreFailure = v.(bool) case "raft_logging": if err := checkType(k, reflect.Bool, v); err != nil { return err @@ -646,6 +651,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.Int64Var(&sopts.Clustering.TrailingLogs, "cluster_trailing_logs", DefaultTrailingLogs, "stan.Clustering.TrailingLogs") fs.BoolVar(&sopts.Clustering.Sync, "cluster_sync", false, "stan.Clustering.Sync") fs.BoolVar(&sopts.Clustering.RaftLogging, "cluster_raft_logging", false, "") + fs.BoolVar(&sopts.Clustering.ProceedOnRestoreFailure, "cluster_proceed_on_restore_failure", false, "") fs.StringVar(&sopts.SQLStoreOpts.Driver, "sql_driver", "", "SQL Driver") fs.StringVar(&sopts.SQLStoreOpts.Source, "sql_source", "", "SQL Data Source") defSQLOpts := stores.DefaultSQLStoreOptions() diff --git a/server/conf_test.go b/server/conf_test.go index 8a1b451b..14fc26aa 100644 --- a/server/conf_test.go +++ b/server/conf_test.go @@ -227,6 +227,9 @@ func TestParseConfig(t *testing.T) { t.Fatalf("Expected peer %q, got %q", peers[i], p) } } + if !opts.Clustering.ProceedOnRestoreFailure { + t.Fatalf("Expected ProceedOnRestoreFailure to be true, got false") + } if opts.Clustering.RaftLogPath != "/path/to/log" { t.Fatalf("Expected RaftLogPath to be %q, got %q", "/path/to/log", opts.Clustering.RaftLogPath) } @@ -472,6 +475,7 @@ func TestParseWrongTypes(t *testing.T) { expectFailureFor(t, "cluster:{log_snapshots:false}", wrongTypeErr) expectFailureFor(t, "cluster:{trailing_logs:false}", wrongTypeErr) expectFailureFor(t, "cluster:{sync:1}", wrongTypeErr) + expectFailureFor(t, "cluster:{proceed_on_restore_failure:123}", wrongTypeErr) expectFailureFor(t, "cluster:{raft_logging:1}", wrongTypeErr) expectFailureFor(t, "cluster:{raft_heartbeat_timeout:123}", wrongTypeErr) expectFailureFor(t, "cluster:{raft_heartbeat_timeout:\"not_a_time\"}", wrongTimeErr) diff --git a/server/snapshot.go b/server/snapshot.go index b3398ad1..cf086b26 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -398,23 +398,27 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe // messages and fail, keep trying until a leader is avail and // able to serve this node. ok := false - for i := 0; i < restoreMsgsAttempts; i++ { + for i := 0; !ok && i < restoreMsgsAttempts; i++ { if err := r.restoreMsgsFromSnapshot(c, sc.First, sc.Last, false); err != nil { - s.log.Errorf("channel %q - unable to restore messages, can't start until leader is available", - sc.Channel) + sf, sl, _ := c.store.Msgs.FirstAndLastSequence() + s.log.Errorf("channel %q - unable to restore messages (snapshot %v/%v, store %v/%v, cfs %v): %v", + sc.Channel, sc.First, sc.Last, sf, sl, atomic.LoadUint64(&c.firstSeq), err) time.Sleep(restoreMsgsSleepBetweenAttempts) } else { ok = true - break } } if !ok { - err = fmt.Errorf("channel %q - unable to restore messages, aborting", sc.Channel) - s.log.Fatalf(err.Error()) - // In tests, we use a "dummy" logger, so process will not exit - // (and we would not want that anyway), so make sure we return - // an error. - return false, err + if s.opts.Clustering.ProceedOnRestoreFailure { + s.log.Errorf("channel %q - was not able to restore from the leader, proceeding anyway") + } else { + err = fmt.Errorf("channel %q - unable to restore messages, aborting", sc.Channel) + s.log.Fatalf(err.Error()) + // In tests, we use a "dummy" logger, so process will not exit + // (and we would not want that anyway), so make sure we return + // an error. + return false, err + } } if !inNewRaftCall { delete(channelsBeforeRestore, sc.Channel) diff --git a/test/configs/test_parse.conf b/test/configs/test_parse.conf index 7e94a1d0..66b86b56 100644 --- a/test/configs/test_parse.conf +++ b/test/configs/test_parse.conf @@ -81,6 +81,7 @@ streaming: { log_snapshots: 1 trailing_logs: 256 sync: true + proceed_on_restore_failure: true raft_logging: true raft_heartbeat_timeout: "1s" raft_election_timeout: "1s"