Skip to content

Commit

Permalink
Merge 31b69ae into b0703ec
Browse files Browse the repository at this point in the history
  • Loading branch information
jwolski2 committed Jan 15, 2016
2 parents b0703ec + 31b69ae commit f5a33b2
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 0 deletions.
131 changes: 131 additions & 0 deletions swim/join_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package swim
import (
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"
Expand All @@ -33,6 +34,11 @@ import (
"github.com/uber/tchannel-go/json"
)

var noDelay = time.Duration(0)
var errNumAttemptsInvalid = errors.New("numAttempts is required to be greater or equal to 0")
var errRandomizerRequired = errors.New("randomizer is required")
var errSleeperRequired = errors.New("sleeper is required")

const (
// If a node cannot complete a join within defaultMaxJoinDuration
// there is likely something very wrong. The aim is for the join
Expand All @@ -49,8 +55,14 @@ const (
defaultJoinTimeout = time.Second
defaultJoinSize = 3
defaultParallelismFactor = 2
defaultDelayInitial = 100 * time.Millisecond
defaultDelayMax = 60 * time.Second
defaultDelayMin = 0 * time.Second
)

var defaultDelayRandomizer = rand.Intn
var defaultDelaySleeper = time.Sleep

// A joinRequest is used to request a join to a remote node
type joinRequest struct {
App string `json:"app"`
Expand All @@ -69,6 +81,11 @@ type joinOpts struct {
// discoverProvider is the DiscoverProvider that this joinSender will use to
// enumerate bootstrap hosts.
discoverProvider DiscoverProvider

// delayOpts are the parameters used to control the delay applied in between
// each join attempt. If none are provided upon construction of joinOpts,
// they will be filled in by the joinSender constructor.
delayOpts *delayOpts
}

// A joinSender is used to join an existing cluster of nodes defined in a node's
Expand Down Expand Up @@ -104,6 +121,26 @@ type joinSender struct {
roundNonPreferredNodes []string

numTries int

delayOpts *delayOpts

// These are used for bookkeeping purposes while
// tracking the amount of delay applied in between
// each join attempt.

// nextDelayMin tracks the last used upper-bound for a join attempt delay.
// Upon the next join attempt, nextDelayMin will be used as the lower-bound
// for that delay. This acts as a shifting window for the bounds of the
// exponential backoff.
nextDelayMin float64
maxDelayReached bool
}

type delayOpts struct {
initial float64
max float64
randomizer func(int) int
sleeper func(time.Duration)
}

// newJoinSender returns a new JoinSender to join a cluster with
Expand Down Expand Up @@ -140,6 +177,15 @@ func newJoinSender(node *Node, opts *joinOpts) (*joinSender, error) {
js.parallelismFactor = util.SelectInt(opts.parallelismFactor, defaultParallelismFactor)
js.size = util.SelectInt(opts.size, defaultJoinSize)
js.size = util.Min(js.size, len(js.potentialNodes))
js.delayOpts = opts.delayOpts

// Initialize delayOpts to default if none provided.
if js.delayOpts == nil {
js.delayOpts = newDelayOpts()
}

js.nextDelayMin = 0
js.maxDelayReached = false

return js, nil
}
Expand All @@ -162,6 +208,15 @@ func (j *joinSender) parseHosts(hostports []string) {
}
}

func newDelayOpts() *delayOpts {
return &delayOpts{
initial: float64(defaultDelayInitial),
max: float64(defaultDelayMax),
randomizer: defaultDelayRandomizer,
sleeper: defaultDelaySleeper,
}
}

// potential nodes are nodes that can be joined that are not the local node
func (j *joinSender) CollectPotentialNodes(nodesJoined []string) []string {
if nodesJoined == nil {
Expand Down Expand Up @@ -323,6 +378,20 @@ func (j *joinSender) JoinCluster() ([]string, error) {
"numFailed": numFailed,
"startTime": startTime,
}).Debug("join not yet complete")

// Subtract one from numGroups to ensure first argument
// provided to delayAttempt is 0-based. See that method's
// documentation for more information.
lastAttemptNumber := numGroups - 1
_, err := j.delayAttempt(lastAttemptNumber, j.delayOpts.randomizer,
j.delayOpts.sleeper)
if err != nil {
j.node.log.WithFields(log.Fields{
"local": j.node.address,
"err": err.Error(),
"lastAttemptNumber": lastAttemptNumber,
}).Error("ringpop join attempt delay failed")
}
}

j.node.emit(JoinCompleteEvent{
Expand All @@ -334,6 +403,68 @@ func (j *joinSender) JoinCluster() ([]string, error) {
return nodesJoined, nil
}

// delayAttempt delays a join attempt by sleeping for an amount of time. The
// amount of time is computed as an exponential backoff based on the number
// of join attempts that have been made at the time of the function call;
// the number of attempts is 0-based. It returns a time.Duration equal to the
// amount of delay applied and an error if one occurs. If an error occurs, a
// duration equal to 0 will be returned, signifying no delay has been applied.
func (j *joinSender) delayAttempt(numAttempts int, randomizer func(int) int,
sleeper func(time.Duration)) (time.Duration, error) {
if numAttempts < 0 {
return noDelay, errNumAttemptsInvalid
}

if randomizer == nil {
return noDelay, errRandomizerRequired
}

if sleeper == nil {
return noDelay, errSleeperRequired
}

// Compute uncapped exponential delay (exponent is the number of join
// attempts so far). Then, make sure the computed delay is capped at its
// max. Apply a random jitter to the actual sleep duration and finally,
// sleep.
uncappedDelay := j.delayOpts.initial * math.Pow(2, float64(numAttempts))
cappedDelay := math.Min(j.delayOpts.max, uncappedDelay)

// If cappedDelay and nextDelayMin are equal, we have reached the point
// at which the exponential backoff has reached its max; apply no more
// jitter.
var jitteredDelay int
if cappedDelay == j.nextDelayMin {
jitteredDelay = int(cappedDelay)
} else {
jitteredDelay = randomizer(int(cappedDelay-j.nextDelayMin)) + int(j.nextDelayMin)
}

// If this is the first time an uncapped delay reached or exceeded the
// maximum allowable delay, log a message.
if uncappedDelay >= j.delayOpts.max && j.maxDelayReached == false {
j.node.log.WithFields(log.Fields{
"local": j.node.address,
"numAttempts": numAttempts,
"initialDelay": j.delayOpts.initial,
"minDelay": j.nextDelayMin,
"maxDelay": j.delayOpts.max,
"uncappedDelay": uncappedDelay,
"cappedDelay": cappedDelay,
"jitteredDelay": jitteredDelay,
}).Debug("ringpop join attempt delay reached max")
j.maxDelayReached = true
}

// Set lower-bound for next attempt to maximum of current attempt.
j.nextDelayMin = cappedDelay

actualDelay := time.Duration(jitteredDelay)
sleeper(actualDelay)

return actualDelay, nil
}

func (j *joinSender) JoinGroup(nodesJoined []string) ([]string, []string) {
group := j.SelectGroup(nodesJoined)

Expand Down
80 changes: 80 additions & 0 deletions swim/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ package swim
import (
"sort"
"testing"
"time"

"github.com/stretchr/testify/suite"
"github.com/uber/tchannel-go/json"
)

// Null randomizer and sleeper used for join delay tests.
var noRandom = func(n int) int { return n }
var noSleep = func(d time.Duration) {}

type JoinSenderTestSuite struct {
suite.Suite
tnode *testNode
Expand All @@ -43,6 +48,32 @@ func (s *JoinSenderTestSuite) SetupTest() {
s.node = s.tnode.node
}

func (s *JoinSenderTestSuite) newJoinSenderForDelay() *joinSender {
bootstrapHosts := fakeHostPorts(1, 2, 1, 1)
joinSender, err := newJoinSender(s.node, &joinOpts{
discoverProvider: &StaticHostList{bootstrapHosts},
})
s.NoError(err, "joiner created successfully")
return joinSender
}

func (s *JoinSenderTestSuite) setupTestForDelay() (*joinSender, [6]float64) {
joinSender := s.newJoinSenderForDelay()
joinSender.delayOpts.initial = 100
joinSender.delayOpts.max = 1000

expectedDelays := [...]float64{
joinSender.delayOpts.initial,
200,
400,
800,
joinSender.delayOpts.max, // Backoff delay is capped at this point.
joinSender.delayOpts.max,
}

return joinSender, expectedDelays
}

func (s *JoinSenderTestSuite) TearDownTest() {
s.tnode.Destroy()
}
Expand Down Expand Up @@ -199,6 +230,55 @@ func (s *JoinSenderTestSuite) TestJoinSelf() {
}
}

func (s *JoinSenderTestSuite) TestDelayWithRandomness() {
joinSender, expectedDelays := s.setupTestForDelay()

for i, expectedDelay := range expectedDelays {
delay, err := joinSender.delayAttempt(i, defaultDelayRandomizer, noSleep)
s.NoError(err, "join attempt delay no error")

delayF := float64(delay)
if i == 0 {
s.True(delayF >= 0 && delayF < expectedDelay)
} else {
s.True(delayF >= expectedDelays[i-1] && delayF <= expectedDelay)
}
}
}

func (s *JoinSenderTestSuite) TestDelayWithoutRandomness() {
joinSender, expectedDelays := s.setupTestForDelay()

// Exercise joinSender method under test. Verify no error results
// and actual delay applied equals expected delay.
assertDelay := func(numAttempts int, expectedDelay float64) {
delay, err := joinSender.delayAttempt(numAttempts, noRandom, noSleep)
s.NoError(err, "join attempt delay no error")
s.EqualValues(time.Duration(expectedDelay), delay,
"join attempt delay is correct")
}

for i, delay := range expectedDelays {
assertDelay(i, delay)
}
}

func (s *JoinSenderTestSuite) TestDelayPreconditions() {
joinSender := s.newJoinSenderForDelay()

delay, err := joinSender.delayAttempt(-1, noRandom, noSleep)
s.EqualValues(noDelay, delay, "no delay applied")
s.Equal(err, errNumAttemptsInvalid, "num attempts invalid error")

delay, err = joinSender.delayAttempt(0, nil, noSleep)
s.EqualValues(noDelay, delay, "no delay applied")
s.Equal(err, errRandomizerRequired, "randomizer required error")

delay, err = joinSender.delayAttempt(0, noRandom, nil)
s.EqualValues(noDelay, delay, "no delay applied")
s.Equal(err, errSleeperRequired, "sleeper required error")
}

func TestJoinSenderTestSuite(t *testing.T) {
suite.Run(t, new(JoinSenderTestSuite))
}

0 comments on commit f5a33b2

Please sign in to comment.