Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start cluster before broker. #2293

Merged
merged 3 commits into from
Apr 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bugfixes
- [#2282](https://github.com/influxdb/influxdb/pull/2282): Use "value" as field name for OpenTSDB input.
- [#2283](https://github.com/influxdb/influxdb/pull/2283): Fix bug when restarting an entire existing cluster.
- [#2293](https://github.com/influxdb/influxdb/pull/2293): Open cluster listener before starting broker.
- [#2287](https://github.com/influxdb/influxdb/pull/2287): Fix data race during SHOW RETENTION POLICIES.

## Features
Expand Down
28 changes: 13 additions & 15 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,27 +254,21 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node {
// Parse join urls from the --join flag.
joinURLs := parseURLs(join)

// Start the broker handler.
h := &Handler{Config: config}
if err := cmd.node.openClusterListener(cmd.config.ClusterAddr(), h); err != nil {
log.Fatalf("Cluster server failed to listen on %s. %s ", cmd.config.ClusterAddr(), err)
}
log.Printf("Cluster server listening on %s", cmd.config.ClusterAddr())

// Open broker & raft log, initialize or join as necessary.
if cmd.config.Broker.Enabled {
cmd.openBroker(joinURLs)
cmd.openBroker(joinURLs, h)
// If were running as a broker locally, always connect to it since it must
// be ready before we can start the data node.
joinURLs = []url.URL{cmd.node.Broker.URL()}
}

// Start the broker handler.
h := &Handler{
Config: config,
Broker: cmd.node.Broker,
Log: cmd.node.raftLog,
}

err := cmd.node.openClusterListener(cmd.config.ClusterAddr(), h)
if err != nil {
log.Fatalf("Cluster server failed to listen on %s. %s ", cmd.config.ClusterAddr(), err)
}
log.Printf("Cluster server listening on %s", cmd.config.ClusterAddr())

var s *influxdb.Server
// Open server, initialize or join as necessary.
if cmd.config.Data.Enabled {
Expand Down Expand Up @@ -469,7 +463,7 @@ func writePIDFile(path string) {
}

// creates and initializes a broker.
func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
func (cmd *RunCommand) openBroker(brokerURLs []url.URL, h *Handler) {
path := cmd.config.BrokerDir()
u := cmd.config.ClusterURL()
raftTracing := cmd.config.Logging.RaftTracing
Expand Down Expand Up @@ -502,6 +496,10 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
log.Fatalf("raft: %s", err)
}

// Attach broker and log to handler.
h.Broker = b
h.Log = l

// Checks to see if the raft index is 0. If it's 0, it might be the first
// node in the cluster and must initialize or join
index, _ := l.LastLogIndexTerm()
Expand Down
14 changes: 13 additions & 1 deletion raft/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"io/ioutil"
"os"
"sync"
"testing"
)

Expand Down Expand Up @@ -44,25 +45,36 @@ func CloseLog(l *Log) {

// IndexFSM represents a state machine that only records the last applied index.
type IndexFSM struct {
mu sync.Mutex
index uint64
}

// MustApply updates the index.
func (fsm *IndexFSM) Apply(entry *LogEntry) error {
fsm.mu.Lock()
fsm.index = entry.Index
fsm.mu.Unlock()
return nil
}

// Index returns the highest applied index.
func (fsm *IndexFSM) Index() uint64 { return fsm.index }
func (fsm *IndexFSM) Index() uint64 {
fsm.mu.Lock()
defer fsm.mu.Unlock()
return fsm.index
}

// WriteTo writes a snapshot of the FSM to w.
func (fsm *IndexFSM) WriteTo(w io.Writer) (n int64, err error) {
fsm.mu.Lock()
defer fsm.mu.Unlock()
return 0, binary.Write(w, binary.BigEndian, fsm.index)
}

// ReadFrom reads an FSM snapshot from r.
func (fsm *IndexFSM) ReadFrom(r io.Reader) (n int64, err error) {
fsm.mu.Lock()
defer fsm.mu.Unlock()
return 0, binary.Read(r, binary.BigEndian, &fsm.index)
}

Expand Down
5 changes: 5 additions & 0 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,11 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
close(ch)

// Ignore timeout if we are snapshotting.
// Or if we haven't received confirmation of join.
if l.isSnapshotting() {
continue
} else if l.FSM.Index() == 0 {
continue
}

// TODO: Prevote before becoming candidate.
Expand Down Expand Up @@ -1349,6 +1352,7 @@ func (l *Log) appendToWriters(buf []byte) {

// If an error occurs then remove the writer and close it.
if _, err := w.Write(buf); err != nil {
l.Logger.Printf("append to writers error: %s", err)
l.removeWriter(w)
i--
continue
Expand Down Expand Up @@ -1797,6 +1801,7 @@ func (l *Log) removeWriter(writer *logWriter) {
l.writers[len(l.writers)-1] = nil
l.writers = l.writers[:len(l.writers)-1]
_ = w.Close()
l.Logger.Printf("writer removed: %#v", w)
break
}
}
Expand Down