Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor raft state handling #1615

Merged
merged 7 commits into from
Feb 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions raft/clock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"math/rand"
"time"
)

Expand All @@ -9,10 +10,10 @@ const (
DefaultApplyInterval = 10 * time.Millisecond

// DefaultElectionTimeout is the default time before starting an election.
DefaultElectionTimeout = 500 * time.Millisecond
DefaultElectionTimeout = 1 * time.Second

// DefaultHeartbeatInterval is the default time to wait between heartbeats.
DefaultHeartbeatInterval = 150 * time.Millisecond
DefaultHeartbeatInterval = 100 * time.Millisecond

// DefaultReconnectTimeout is the default time to wait before reconnecting.
DefaultReconnectTimeout = 10 * time.Millisecond
Expand All @@ -39,8 +40,11 @@ func NewClock() *Clock {
// AfterApplyInterval returns a channel that fires after the apply interval.
func (c *Clock) AfterApplyInterval() <-chan chan struct{} { return newClockChan(c.ApplyInterval) }

// AfterElectionTimeout returns a channel that fires after the election timeout.
func (c *Clock) AfterElectionTimeout() <-chan chan struct{} { return newClockChan(c.ElectionTimeout) }
// AfterElectionTimeout returns a channel that fires after a duration that is
// between the election timeout and double the election timeout.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the Raft paper. If all the nodes return the same timeout then they tend to get stuck in an election loop indefinitely where they can't get enough votes.

func (c *Clock) AfterElectionTimeout() <-chan chan struct{} {
return newClockChan(c.ElectionTimeout + time.Duration(rand.Intn(int(c.ElectionTimeout))))
}

// AfterHeartbeatInterval returns a channel that fires after the heartbeat interval.
func (c *Clock) AfterHeartbeatInterval() <-chan chan struct{} {
Expand Down
20 changes: 7 additions & 13 deletions raft/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
// Handler represents an HTTP endpoint for Raft to communicate over.
type Handler struct {
Log interface {
AddPeer(u *url.URL) (uint64, *Config, error)
AddPeer(u *url.URL) (id uint64, leaderID uint64, config *Config, err error)
RemovePeer(id uint64) error
Heartbeat(term, commitIndex, leaderID uint64) (currentIndex, currentTerm uint64, err error)
Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
WriteEntriesTo(w io.Writer, id, term, index uint64) error
RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error)
RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) error
}
}

Expand Down Expand Up @@ -60,7 +60,7 @@ func (h *Handler) serveJoin(w http.ResponseWriter, r *http.Request) {
}

// Add peer to the log.
id, config, err := h.Log.AddPeer(u)
id, leaderID, config, err := h.Log.AddPeer(u)
if err != nil {
w.Header().Set("X-Raft-Error", err.Error())
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -69,6 +69,7 @@ func (h *Handler) serveJoin(w http.ResponseWriter, r *http.Request) {

// Return member's id in the cluster.
w.Header().Set("X-Raft-ID", strconv.FormatUint(id, 10))
w.Header().Set("X-Raft-Leader-ID", strconv.FormatUint(leaderID, 10))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a nice-to-do, or an integral part of this change's improvements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a simple change and it cleans up the trace log since it doesn't have to keep trying to reconnect while waiting for a heartbeat.

w.WriteHeader(http.StatusOK)

// Write config to the body.
Expand Down Expand Up @@ -120,11 +121,10 @@ func (h *Handler) serveHeartbeat(w http.ResponseWriter, r *http.Request) {
}

// Execute heartbeat on the log.
currentIndex, currentTerm, err := h.Log.Heartbeat(term, commitIndex, leaderID)
currentIndex, err := h.Log.Heartbeat(term, commitIndex, leaderID)

// Return current term and index.
w.Header().Set("X-Raft-Index", strconv.FormatUint(currentIndex, 10))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just not need any longer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the code a lot cleaner if we only depose a leader through heartbeats and not on every single possible RPC interaction.

w.Header().Set("X-Raft-Term", strconv.FormatUint(currentTerm, 10))

// Write error, if applicable.
if err != nil {
Expand Down Expand Up @@ -201,14 +201,8 @@ func (h *Handler) serveRequestVote(w http.ResponseWriter, r *http.Request) {
return
}

// Execute heartbeat on the log.
currentTerm, err := h.Log.RequestVote(term, candidateID, lastLogIndex, lastLogTerm)

// Return current term and index.
w.Header().Set("X-Raft-Term", strconv.FormatUint(currentTerm, 10))

// Write error, if applicable.
if err != nil {
if err := h.Log.RequestVote(term, candidateID, lastLogIndex, lastLogTerm); err != nil {
w.Header().Set("X-Raft-Error", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
Expand Down
44 changes: 21 additions & 23 deletions raft/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
// Ensure a node can join a cluster over HTTP.
func TestHandler_HandleJoin(t *testing.T) {
h := NewHandler()
h.AddPeerFunc = func(u *url.URL) (uint64, *raft.Config, error) {
h.AddPeerFunc = func(u *url.URL) (uint64, uint64, *raft.Config, error) {
if u.String() != "http://localhost:1000" {
t.Fatalf("unexpected url: %s", u)
}
return 2, &raft.Config{}, nil
return 2, 3, &raft.Config{}, nil
}
s := httptest.NewServer(h)
defer s.Close()
Expand All @@ -34,14 +34,16 @@ func TestHandler_HandleJoin(t *testing.T) {
t.Fatalf("unexpected raft error: %s", s)
} else if s = resp.Header.Get("X-Raft-ID"); s != "2" {
t.Fatalf("unexpected raft id: %s", s)
} else if s = resp.Header.Get("X-Raft-Leader-ID"); s != "3" {
t.Fatalf("unexpected raft leader id: %s", s)
}
}

// Ensure that joining with an invalid query string with return an error.
func TestHandler_HandleJoin_Error(t *testing.T) {
h := NewHandler()
h.AddPeerFunc = func(u *url.URL) (uint64, *raft.Config, error) {
return 0, nil, raft.ErrClosed
h.AddPeerFunc = func(u *url.URL) (uint64, uint64, *raft.Config, error) {
return 0, 0, nil, raft.ErrClosed
}
s := httptest.NewServer(h)
defer s.Close()
Expand Down Expand Up @@ -123,15 +125,15 @@ func TestHandler_HandleLeave_Error(t *testing.T) {
// Ensure a heartbeat can be sent over HTTP.
func TestHandler_HandleHeartbeat(t *testing.T) {
h := NewHandler()
h.HeartbeatFunc = func(term, commitIndex, leaderID uint64) (currentIndex, currentTerm uint64, err error) {
h.HeartbeatFunc = func(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) {
if term != 1 {
t.Fatalf("unexpected term: %d", term)
} else if commitIndex != 2 {
t.Fatalf("unexpected commit index: %d", commitIndex)
} else if leaderID != 3 {
t.Fatalf("unexpected leader id: %d", leaderID)
}
return 4, 5, nil
return 4, nil
}
s := httptest.NewServer(h)
defer s.Close()
Expand All @@ -147,8 +149,6 @@ func TestHandler_HandleHeartbeat(t *testing.T) {
t.Fatalf("unexpected raft error: %s", s)
} else if s = resp.Header.Get("X-Raft-Index"); s != "4" {
t.Fatalf("unexpected raft index: %s", s)
} else if s = resp.Header.Get("X-Raft-Term"); s != "5" {
t.Fatalf("unexpected raft term: %s", s)
}
}

Expand Down Expand Up @@ -182,8 +182,8 @@ func TestHandler_HandleHeartbeat_Error(t *testing.T) {
// Ensure that sending a heartbeat to a closed log returns an error.
func TestHandler_HandleHeartbeat_ErrClosed(t *testing.T) {
h := NewHandler()
h.HeartbeatFunc = func(term, commitIndex, leaderID uint64) (currentIndex, currentTerm uint64, err error) {
return 0, 0, raft.ErrClosed
h.HeartbeatFunc = func(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) {
return 0, raft.ErrClosed
}
s := httptest.NewServer(h)
defer s.Close()
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestHandler_HandleStream_Error(t *testing.T) {
// Ensure a vote request can be sent over HTTP.
func TestHandler_HandleRequestVote(t *testing.T) {
h := NewHandler()
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) error {
if term != 1 {
t.Fatalf("unexpected term: %d", term)
} else if candidateID != 2 {
Expand All @@ -281,7 +281,7 @@ func TestHandler_HandleRequestVote(t *testing.T) {
} else if lastLogTerm != 4 {
t.Fatalf("unexpected last log term: %d", lastLogTerm)
}
return 5, nil
return nil
}
s := httptest.NewServer(h)
defer s.Close()
Expand All @@ -295,16 +295,14 @@ func TestHandler_HandleRequestVote(t *testing.T) {
t.Fatalf("unexpected status: %d", resp.StatusCode)
} else if s := resp.Header.Get("X-Raft-Error"); s != "" {
t.Fatalf("unexpected raft error: %s", s)
} else if s = resp.Header.Get("X-Raft-Term"); s != "5" {
t.Fatalf("unexpected raft term: %s", s)
}
}

// Ensure sending invalid parameters in a vote request returns an error.
func TestHandler_HandleRequestVote_Error(t *testing.T) {
h := NewHandler()
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
return 0, raft.ErrStaleTerm
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) error {
return raft.ErrStaleTerm
}
s := httptest.NewServer(h)
defer s.Close()
Expand Down Expand Up @@ -366,11 +364,11 @@ func TestHandler_Ping(t *testing.T) {
// Handler represents a test wrapper for the raft.Handler.
type Handler struct {
*raft.Handler
AddPeerFunc func(u *url.URL) (uint64, *raft.Config, error)
AddPeerFunc func(u *url.URL) (uint64, uint64, *raft.Config, error)
RemovePeerFunc func(id uint64) error
HeartbeatFunc func(term, commitIndex, leaderID uint64) (currentIndex, currentTerm uint64, err error)
HeartbeatFunc func(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
WriteEntriesToFunc func(w io.Writer, id, term, index uint64) error
RequestVoteFunc func(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error)
RequestVoteFunc func(term, candidateID, lastLogIndex, lastLogTerm uint64) error
}

// NewHandler returns a new instance of Handler.
Expand All @@ -380,17 +378,17 @@ func NewHandler() *Handler {
return h
}

func (h *Handler) AddPeer(u *url.URL) (uint64, *raft.Config, error) { return h.AddPeerFunc(u) }
func (h *Handler) RemovePeer(id uint64) error { return h.RemovePeerFunc(id) }
func (h *Handler) AddPeer(u *url.URL) (uint64, uint64, *raft.Config, error) { return h.AddPeerFunc(u) }
func (h *Handler) RemovePeer(id uint64) error { return h.RemovePeerFunc(id) }

func (h *Handler) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex, currentTerm uint64, err error) {
func (h *Handler) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) {
return h.HeartbeatFunc(term, commitIndex, leaderID)
}

func (h *Handler) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
return h.WriteEntriesToFunc(w, id, term, index)
}

func (h *Handler) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
func (h *Handler) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) error {
return h.RequestVoteFunc(term, candidateID, lastLogIndex, lastLogTerm)
}
Loading