Skip to content

Commit

Permalink
Fix term signal.
Browse files Browse the repository at this point in the history
This commit changes raft so that term changes are made immediately and
term change signals are made afterward. Previously, election timeouts
were invalidated by incoming term changes which caused an election loop.

Stale term was also fixed and http/pprof was added too.
  • Loading branch information
benbjohnson committed Apr 10, 2015
1 parent c757039 commit eaf4bfc
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bugfixes
- [#2225](https://github.com/influxdb/influxdb/pull/2225): Make keywords completely case insensitive
- [#2228](https://github.com/influxdb/influxdb/pull/2228): Accept keyword default unquoted in ALTER RETENTION POLICY statement
- [#2236](https://github.com/influxdb/influxdb/pull/2236): Immediate term changes, fix stale write issue, net/http/pprof

## v0.9.0-rc22 [2015-04-09]

Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ type Config struct {
WriteInterval Duration `toml:"write-interval"`
} `toml:"monitoring"`

Debugging struct {
PprofEnabled bool `toml:"pprof-enabled"`
} `toml:"debugging"`

ContinuousQuery struct {
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
Expand Down
16 changes: 16 additions & 0 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"math/rand"
"net/http"
"net/http/pprof"
"net/url"
"strings"

Expand All @@ -29,6 +30,21 @@ func NewHandler() *Handler {

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Debug routes.
if h.Config.Debugging.PprofEnabled && strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
case "/debug/pprof/cmdline":
pprof.Cmdline(w, r)
case "/debug/pprof/profile":
pprof.Profile(w, r)
case "/debug/pprof/symbol":
pprof.Symbol(w, r)
default:
pprof.Index(w, r)
}
return
}

// FIXME: This is very brittle. Refactor to have common path prefix
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)
Expand Down
8 changes: 4 additions & 4 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,9 @@ func (b *Broker) applySetTopicMaxIndex(m *Message) {
if t := b.topics[topicID]; t != nil {
t.mu.Lock()
defer t.mu.Unlock()

// Track the highest replicated index per data node URL
t.indexByURL[u] = index

if t.index < index {
t.index = index
}
}
}

Expand Down Expand Up @@ -815,6 +812,9 @@ func (t *Topic) WriteMessage(m *Message) error {
return fmt.Errorf("write segment: %s", err)
}

// Update index.
t.index = m.Index

return nil
}

Expand Down
3 changes: 0 additions & 3 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
t.Fatalf("apply error: %s", err)
}

if topic := b.Topic(20); topic.Index() != 5 {
t.Fatalf("unexpected topic index: %d", topic.Index())
}
if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 {
t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL))
}
Expand Down
6 changes: 6 additions & 0 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,12 @@ func (l *Log) mustSetTermIfHigher(term uint64) {
if err := l.setTerm(term); err != nil {
panic("unable to set term: " + err.Error())
}

// Signal term change.
select {
case l.terms <- struct{}{}:
default:
}
}

// readConfig reads the configuration from disk.
Expand Down
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
for _, p := range points {
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
if series == nil {
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name, p.Tags)
return ErrSeriesNotFound
}

Expand Down

0 comments on commit eaf4bfc

Please sign in to comment.