Skip to content

Commit

Permalink
Merge 5e37384 into 5751408
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jan 16, 2020
2 parents 5751408 + 5e37384 commit e6ae776
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 10 deletions.
8 changes: 8 additions & 0 deletions server/clustering.go
Expand Up @@ -64,6 +64,14 @@ type ClusteringOptions struct {
Sync bool // Do a file sync after every write to the Raft log and message store.
RaftLogging bool // Enable logging of Raft library (disabled by default since really verbose).

// When a node processes a snapshot (either on startup or if falling behind) and its is
// not in phase with the message store's state, it is required to reconcile its state
// with the current leader. If it is unable, the node will fail to start or exit.
// If all nodes are starting and there is no way to have a leader at this point,
// then if this boolean is set to true, then the node will attempt to reconcile but
// if it can't it will still proceed.
ProceedOnRestoreFailure bool

// These will be set to some sane defaults. Change only if experiencing raft issues.
RaftHeartbeatTimeout time.Duration
RaftElectionTimeout time.Duration
Expand Down
5 changes: 5 additions & 0 deletions server/clustering_test.go
Expand Up @@ -6406,6 +6406,11 @@ func TestClusteringRestoreSnapshotMsgsBailIfNoLeader(t *testing.T) {
t.Fatalf(e.Error())
}
}

// Now restart it with an option to force start...
s3sOpts.Clustering.ProceedOnRestoreFailure = true
s3 = runServerWithOpts(t, s3sOpts, nil)
s3.Shutdown()
}

func TestClusteringSQLMsgStoreFlushed(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions server/conf.go
Expand Up @@ -287,6 +287,11 @@ func parseCluster(itf interface{}, opts *Options) error {
return err
}
opts.Clustering.Sync = v.(bool)
case "proceed_on_restore_failure":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.Clustering.ProceedOnRestoreFailure = v.(bool)
case "raft_logging":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
Expand Down Expand Up @@ -646,6 +651,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.Int64Var(&sopts.Clustering.TrailingLogs, "cluster_trailing_logs", DefaultTrailingLogs, "stan.Clustering.TrailingLogs")
fs.BoolVar(&sopts.Clustering.Sync, "cluster_sync", false, "stan.Clustering.Sync")
fs.BoolVar(&sopts.Clustering.RaftLogging, "cluster_raft_logging", false, "")
fs.BoolVar(&sopts.Clustering.ProceedOnRestoreFailure, "cluster_proceed_on_restore_failure", false, "")
fs.StringVar(&sopts.SQLStoreOpts.Driver, "sql_driver", "", "SQL Driver")
fs.StringVar(&sopts.SQLStoreOpts.Source, "sql_source", "", "SQL Data Source")
defSQLOpts := stores.DefaultSQLStoreOptions()
Expand Down
4 changes: 4 additions & 0 deletions server/conf_test.go
Expand Up @@ -227,6 +227,9 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("Expected peer %q, got %q", peers[i], p)
}
}
if !opts.Clustering.ProceedOnRestoreFailure {
t.Fatalf("Expected ProceedOnRestoreFailure to be true, got false")
}
if opts.Clustering.RaftLogPath != "/path/to/log" {
t.Fatalf("Expected RaftLogPath to be %q, got %q", "/path/to/log", opts.Clustering.RaftLogPath)
}
Expand Down Expand Up @@ -472,6 +475,7 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "cluster:{log_snapshots:false}", wrongTypeErr)
expectFailureFor(t, "cluster:{trailing_logs:false}", wrongTypeErr)
expectFailureFor(t, "cluster:{sync:1}", wrongTypeErr)
expectFailureFor(t, "cluster:{proceed_on_restore_failure:123}", wrongTypeErr)
expectFailureFor(t, "cluster:{raft_logging:1}", wrongTypeErr)
expectFailureFor(t, "cluster:{raft_heartbeat_timeout:123}", wrongTypeErr)
expectFailureFor(t, "cluster:{raft_heartbeat_timeout:\"not_a_time\"}", wrongTimeErr)
Expand Down
24 changes: 14 additions & 10 deletions server/snapshot.go
Expand Up @@ -398,23 +398,27 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe
// messages and fail, keep trying until a leader is avail and
// able to serve this node.
ok := false
for i := 0; i < restoreMsgsAttempts; i++ {
for i := 0; !ok && i < restoreMsgsAttempts; i++ {
if err := r.restoreMsgsFromSnapshot(c, sc.First, sc.Last, false); err != nil {
s.log.Errorf("channel %q - unable to restore messages, can't start until leader is available",
sc.Channel)
sf, sl, _ := c.store.Msgs.FirstAndLastSequence()
s.log.Errorf("channel %q - unable to restore messages (snapshot %v/%v, store %v/%v, cfs %v): %v",
sc.Channel, sc.First, sc.Last, sf, sl, atomic.LoadUint64(&c.firstSeq), err)
time.Sleep(restoreMsgsSleepBetweenAttempts)
} else {
ok = true
break
}
}
if !ok {
err = fmt.Errorf("channel %q - unable to restore messages, aborting", sc.Channel)
s.log.Fatalf(err.Error())
// In tests, we use a "dummy" logger, so process will not exit
// (and we would not want that anyway), so make sure we return
// an error.
return false, err
if s.opts.Clustering.ProceedOnRestoreFailure {
s.log.Errorf("channel %q - was not able to restore from the leader, proceeding anyway")
} else {
err = fmt.Errorf("channel %q - unable to restore messages, aborting", sc.Channel)
s.log.Fatalf(err.Error())
// In tests, we use a "dummy" logger, so process will not exit
// (and we would not want that anyway), so make sure we return
// an error.
return false, err
}
}
if !inNewRaftCall {
delete(channelsBeforeRestore, sc.Channel)
Expand Down
1 change: 1 addition & 0 deletions test/configs/test_parse.conf
Expand Up @@ -81,6 +81,7 @@ streaming: {
log_snapshots: 1
trailing_logs: 256
sync: true
proceed_on_restore_failure: true
raft_logging: true
raft_heartbeat_timeout: "1s"
raft_election_timeout: "1s"
Expand Down

0 comments on commit e6ae776

Please sign in to comment.