Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Term signal #2236

Merged
merged 2 commits into from
Apr 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bugfixes
- [#2225](https://github.com/influxdb/influxdb/pull/2225): Make keywords completely case insensitive
- [#2228](https://github.com/influxdb/influxdb/pull/2228): Accept keyword default unquoted in ALTER RETENTION POLICY statement
- [#2236](https://github.com/influxdb/influxdb/pull/2236): Immediate term changes, fix stale write issue, net/http/pprof

## v0.9.0-rc22 [2015-04-09]

Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ type Config struct {
WriteInterval Duration `toml:"write-interval"`
} `toml:"monitoring"`

Debugging struct {
PprofEnabled bool `toml:"pprof-enabled"`
} `toml:"debugging"`

ContinuousQuery struct {
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
Expand Down
16 changes: 16 additions & 0 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"math/rand"
"net/http"
"net/http/pprof"
"net/url"
"strings"

Expand All @@ -29,6 +30,21 @@ func NewHandler() *Handler {

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Debug routes.
if h.Config.Debugging.PprofEnabled && strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
case "/debug/pprof/cmdline":
pprof.Cmdline(w, r)
case "/debug/pprof/profile":
pprof.Profile(w, r)
case "/debug/pprof/symbol":
pprof.Symbol(w, r)
default:
pprof.Index(w, r)
}
return
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can now pull pprof profiles if you run:

$ go tool pprof http://localhost:8086/debug/pprof/profile

More info on the net/http/pprof page. Make sure you change their :6060 ports to :8086 though.

// FIXME: This is very brittle. Refactor to have common path prefix
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)
Expand Down
8 changes: 4 additions & 4 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,9 @@ func (b *Broker) applySetTopicMaxIndex(m *Message) {
if t := b.topics[topicID]; t != nil {
t.mu.Lock()
defer t.mu.Unlock()

// Track the highest replicated index per data node URL
t.indexByURL[u] = index

if t.index < index {
t.index = index
}
}
}

Expand Down Expand Up @@ -815,6 +812,9 @@ func (t *Topic) WriteMessage(m *Message) error {
return fmt.Errorf("write segment: %s", err)
}

// Update index.
t.index = m.Index

return nil
}

Expand Down
3 changes: 0 additions & 3 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
t.Fatalf("apply error: %s", err)
}

if topic := b.Topic(20); topic.Index() != 5 {
t.Fatalf("unexpected topic index: %d", topic.Index())
}
if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 {
t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL))
}
Expand Down
111 changes: 44 additions & 67 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type Log struct {
// Incoming heartbeats and term changes go to these channels
// and are picked up by the current state.
heartbeats chan heartbeat
terms chan uint64
terms chan struct{}

// Close notification and wait.
wg sync.WaitGroup
Expand Down Expand Up @@ -201,7 +201,7 @@ func NewLog() *Log {
Transport: &HTTPTransport{},
Rand: rand.NewSource(time.Now().UnixNano()).Int63,
heartbeats: make(chan heartbeat, 10),
terms: make(chan uint64, 10),
terms: make(chan struct{}, 1),
Logger: log.New(os.Stderr, "[raft] ", log.LstdFlags),
}
l.updateLogPrefix()
Expand Down Expand Up @@ -538,10 +538,20 @@ func (l *Log) setTerm(term uint64) error {
}

// mustSetTerm sets the current term and clears the vote. Panic on error.
func (l *Log) mustSetTerm(term uint64) {
func (l *Log) mustSetTermIfHigher(term uint64) {
if term <= l.term {
return
}

if err := l.setTerm(term); err != nil {
panic("unable to set term: " + err.Error())
}

// Signal term change.
select {
case l.terms <- struct{}{}:
default:
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Term updates immediately but signals asynchronously.


// readConfig reads the configuration from disk.
Expand Down Expand Up @@ -863,21 +873,12 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {

// Update term, commit index & leader.
l.mu.Lock()
if hb.term > l.term {
l.mustSetTerm(hb.term)
}
l.mustSetTermIfHigher(hb.term)
if hb.commitIndex > l.commitIndex {
l.commitIndex = hb.commitIndex
}
l.leaderID = hb.leaderID
l.mu.Unlock()

case term := <-l.terms:
l.mu.Lock()
if term > l.term {
l.mustSetTerm(term)
}
l.mu.Unlock()
}
}
}
Expand Down Expand Up @@ -960,9 +961,14 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {

// Increment term and request votes.
l.mu.Lock()
l.mustSetTerm(l.term + 1)
l.mustSetTermIfHigher(l.term + 1)
l.votedFor = l.id
term := l.term

select {
case <-l.terms:
default:
}
l.mu.Unlock()

// Ensure all candidate goroutines complete before transitioning to another state.
Expand All @@ -981,27 +987,13 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
return Stopped
case hb := <-l.heartbeats:
l.mu.Lock()
if hb.term >= term {
l.mustSetTerm(hb.term)
l.mustSetTermIfHigher(hb.term)
if hb.term >= l.term {
l.leaderID = hb.leaderID
l.mu.Unlock()
return Follower
}
l.mu.Unlock()
case newTerm := <-l.terms:
// Ignore if it's not after this current term.
if newTerm <= term {
continue
}

// Check against the current term since that may have changed.
l.mu.Lock()
if newTerm > l.term {
l.mustSetTerm(newTerm)
l.mu.Unlock()
return Follower
}
l.mu.Unlock()
case <-l.terms:
return Follower
case <-elected:
return Leader
case ch := <-l.Clock.AfterElectionTimeout():
Expand Down Expand Up @@ -1034,12 +1026,11 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
peerTerm, err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm)
l.Logger.Printf("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)

// If an error occured then send the peer's term.
// If an error occured then update term.
if err != nil {
select {
case l.terms <- peerTerm:
default:
}
l.mu.Lock()
l.mustSetTermIfHigher(peerTerm)
l.mu.Unlock()
return
}
votes <- struct{}{}
Expand Down Expand Up @@ -1078,6 +1069,11 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
// Retrieve leader's term.
l.mu.Lock()
term := l.term

select {
case <-l.terms:
default:
}
l.mu.Unlock()

// Read log from leader in a separate goroutine.
Expand All @@ -1092,25 +1088,16 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
case <-closing: // wait for state change.
return Stopped

case newTerm := <-l.terms: // step down on higher term
if newTerm > term {
l.mu.Lock()
l.mustSetTerm(newTerm)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
}
continue
case <-l.terms: // step down on higher term
l.mu.Lock()
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower

case hb := <-l.heartbeats: // step down on higher term
if hb.term > term {
l.mu.Lock()
l.mustSetTerm(hb.term)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
}
continue
case hb := <-l.heartbeats: // update term, if necessary
l.mu.Lock()
l.mustSetTermIfHigher(hb.term)
l.mu.Unlock()

case commitIndex, ok := <-committed:
// Quorum not reached, try again.
Expand Down Expand Up @@ -1614,14 +1601,7 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (
}

// Notify term change.
l.term = term
l.votedFor = 0
if term > l.term {
select {
case l.terms <- term:
default:
}
}
l.mustSetTermIfHigher(term)

// Reject request if log is out of date.
if lastLogTerm < l.lastLogTerm {
Expand Down Expand Up @@ -1675,10 +1655,7 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error
if l.state != Leader {
return nil, ErrNotLeader
} else if term > l.term {
select {
case l.terms <- term:
default:
}
l.mustSetTermIfHigher(term)
return nil, ErrNotLeader
}

Expand Down
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
for _, p := range points {
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
if series == nil {
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name, p.Tags)
return ErrSeriesNotFound
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional logging when a series is not found.


Expand Down