Skip to content

Commit

Permalink
Merge pull request #1250 from LK4D4/inc_hb_unknown
Browse files Browse the repository at this point in the history
dispatcher: increase heartbeat period for unknown nodes
  • Loading branch information
aaronlehmann committed Jul 29, 2016
2 parents 9f1d578 + f4e0641 commit 058bc22
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
4 changes: 2 additions & 2 deletions manager/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestHeartbeatTimeout(t *testing.T) {
time.Sleep(500 * time.Millisecond)

gd.Store.View(func(readTx store.ReadTx) {
storeNodes, err := store.FindNodes(readTx, store.All)
storeNodes, err := store.FindNodes(readTx, store.ByIDPrefix(gd.SecurityConfigs[0].ClientTLSCreds.NodeID()))
assert.NoError(t, err)
assert.NotEmpty(t, storeNodes)
assert.Equal(t, api.NodeStatus_DOWN, storeNodes[0].Status.State)
Expand Down Expand Up @@ -754,6 +754,6 @@ func TestNodesCount(t *testing.T) {
stream.Recv()
}
assert.Equal(t, 6, gd.dispatcherServer.NodeCount())
time.Sleep(500 * time.Millisecond)
time.Sleep(700 * time.Millisecond)
assert.Equal(t, 0, gd.dispatcherServer.NodeCount())
}
29 changes: 16 additions & 13 deletions manager/dispatcher/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,29 @@ func (rn *registeredNode) checkSessionID(sessionID string) error {
}

type nodeStore struct {
periodChooser *periodChooser
gracePeriodMultiplier time.Duration
rateLimitPeriod time.Duration
nodes map[string]*registeredNode
mu sync.RWMutex
periodChooser *periodChooser
gracePeriodMultiplierNormal time.Duration
gracePeriodMultiplierUnknown time.Duration
rateLimitPeriod time.Duration
nodes map[string]*registeredNode
mu sync.RWMutex
}

func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, rateLimitPeriod time.Duration) *nodeStore {
return &nodeStore{
nodes: make(map[string]*registeredNode),
periodChooser: newPeriodChooser(hbPeriod, hbEpsilon),
gracePeriodMultiplier: time.Duration(graceMultiplier),
rateLimitPeriod: rateLimitPeriod,
nodes: make(map[string]*registeredNode),
periodChooser: newPeriodChooser(hbPeriod, hbEpsilon),
gracePeriodMultiplierNormal: time.Duration(graceMultiplier),
gracePeriodMultiplierUnknown: time.Duration(graceMultiplier) * 2,
rateLimitPeriod: rateLimitPeriod,
}
}

func (s *nodeStore) updatePeriod(hbPeriod, hbEpsilon time.Duration, gracePeriodMultiplier int) {
s.mu.Lock()
s.periodChooser = newPeriodChooser(hbPeriod, hbEpsilon)
s.gracePeriodMultiplier = time.Duration(gracePeriodMultiplier)
s.gracePeriodMultiplierNormal = time.Duration(gracePeriodMultiplier)
s.gracePeriodMultiplierUnknown = s.gracePeriodMultiplierNormal * 2
s.mu.Unlock()
}

Expand All @@ -79,7 +82,7 @@ func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error {
Node: n,
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc)
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierUnknown, expireFunc)
return nil
}

Expand Down Expand Up @@ -124,7 +127,7 @@ func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode {
Disconnect: make(chan struct{}),
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc)
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierNormal, expireFunc)
return rn
}

Expand Down Expand Up @@ -154,7 +157,7 @@ func (s *nodeStore) Heartbeat(id, sid string) (time.Duration, error) {
return 0, err
}
period := s.periodChooser.Choose() // base period for node
grace := period * time.Duration(s.gracePeriodMultiplier)
grace := period * time.Duration(s.gracePeriodMultiplierNormal)
rn.mu.Lock()
rn.Heartbeat.Update(grace)
rn.Heartbeat.Beat()
Expand Down

0 comments on commit 058bc22

Please sign in to comment.