Skip to content

Commit

Permalink
picker: increase range of peer observation
Browse files Browse the repository at this point in the history
To allow the weighting algorithms to work properly, the values of
observation weights need to be selected to be greater than 1. Without
this, the observations, positive or negative, will always converge to 1.
We also relax the smoothing factor to allow the observations to react
faster to changes.

Arguably, we could remove the weight parameter from `Observe` and then
make the manager state truly discrete but that is a much larger change.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
  • Loading branch information
stevvooe committed Jul 28, 2016
1 parent 069614f commit ee5fbe1
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 21 deletions.
4 changes: 2 additions & 2 deletions agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (n *Node) run(ctx context.Context) (err error) {
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
if n.config.JoinAddr != "" {
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, 1)
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, picker.DefaultObservationWeight)
}
}

Expand Down Expand Up @@ -647,7 +647,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, 5)
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
Expand Down
7 changes: 6 additions & 1 deletion manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -138,7 +139,11 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
NodeID: m.NodeID,
Addr: m.Addr,
},
Weight: 1,

// TODO(stevvooe): Calculate weight of manager selection based on
// cluster-level observations, such as number of connections and
// load.
Weight: picker.DefaultObservationWeight,
})
}
return mgrs
Expand Down
24 changes: 14 additions & 10 deletions picker/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (

var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")

// DefaultObservationWeight provides a weight to use for positive observations
// that will balance well under repeated observations.
const DefaultObservationWeight = 10

// Remotes keeps track of remote addresses by weight, informed by
// observations.
type Remotes interface {
Expand Down Expand Up @@ -49,7 +53,7 @@ func NewRemotes(peers ...api.Peer) Remotes {
}

for _, peer := range peers {
mwr.Observe(peer, 1)
mwr.Observe(peer, DefaultObservationWeight)
}

return mwr
Expand Down Expand Up @@ -165,7 +169,7 @@ const (
// See
// https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_exponential_smoothing
// for details.
remoteWeightSmoothingFactor = 0.7
remoteWeightSmoothingFactor = 0.5
remoteWeightMax = 1 << 8
)

Expand Down Expand Up @@ -228,7 +232,7 @@ func (p *Picker) Init(cc *grpc.ClientConn) error {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
c, err := grpc.NewConn(cc)
if err != nil {
return err
Expand All @@ -248,7 +252,7 @@ func (p *Picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Unlock()
transport, err := p.conn.Wait(ctx)
if err != nil {
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return transport, err
Expand All @@ -261,7 +265,7 @@ func (p *Picker) PickAddr() (string, error) {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, -1) // downweight the current addr
p.r.ObserveIfExists(peer, -DefaultObservationWeight) // downweight the current addr

var err error
peer, err = p.r.Select()
Expand Down Expand Up @@ -299,15 +303,15 @@ func (p *Picker) WaitForStateChange(ctx context.Context, sourceState grpc.Connec
// TODO(stevvooe): This is questionable, but we'll see how it works.
switch state {
case grpc.Idle:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Connecting:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Ready:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.TransientFailure:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
case grpc.Shutdown:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return state, err
Expand Down
75 changes: 67 additions & 8 deletions picker/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRemotesExclude(t *testing.T) {
// value.
func TestRemotesConvergence(t *testing.T) {
remotes := NewRemotes()
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)

// zero weighted against 1
if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -115,7 +115,7 @@ func TestRemotesConvergence(t *testing.T) {

// crank it up
for i := 0; i < 10; i++ {
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)
}

if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -127,7 +127,7 @@ func TestRemotesConvergence(t *testing.T) {
}

// provided a poor review
remotes.Observe(api.Peer{Addr: "one"}, -1)
remotes.Observe(api.Peer{Addr: "one"}, -DefaultObservationWeight)

if remotes.Weights()[api.Peer{Addr: "one"}] > 0 {
t.Fatalf("should be below zero: %v", remotes.Weights()[api.Peer{Addr: "one"}])
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestRemotesZeroWeights(t *testing.T) {
}

// Pump up number 3!
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)

count := map[api.Peer]int{}
for i := 0; i < 100; i++ {
Expand All @@ -178,7 +178,7 @@ func TestRemotesZeroWeights(t *testing.T) {
count[peer]++

// keep observing three
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)
}

// here, we ensure that three is at least three times more likely to be
Expand Down Expand Up @@ -238,10 +238,10 @@ func TestRemotesDownweight(t *testing.T) {
}

for _, p := range peers {
remotes.Observe(p, 1)
remotes.Observe(p, DefaultObservationWeight)
}

remotes.Observe(peers[0], -1)
remotes.Observe(peers[0], -DefaultObservationWeight)

samples := 100000
choosen := 0
Expand All @@ -262,6 +262,65 @@ func TestRemotesDownweight(t *testing.T) {
}
}

// TestRemotesPractical ensures that under a single poor observation, such as
// an error, the likelihood of selecting the node dramatically decreases.
func TestRemotesPractical(t *testing.T) {
peers := []api.Peer{{Addr: "one"}, {Addr: "two"}, {Addr: "three"}}
remotes := NewRemotes(peers...)
seen := map[api.Peer]int{}
selections := 1000
tolerance := 0.04

// set a baseline, where selections should be even
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

seen[peer]++
}

expected, delta := selections/len(peers), int(tolerance*float64(selections))
low, high := expected-delta, expected+delta
for peer, count := range seen {
if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
}
}

// one bad observation should mark the node as bad
remotes.Observe(peers[0], -DefaultObservationWeight)

seen = map[api.Peer]int{} // resut
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

seen[peer]++
}

// same check as above, with only 2 peers, the bad peer should be unseen
expected, delta = selections/(len(peers)-1), int(tolerance*float64(selections))
low, high = expected-delta, expected+delta
for peer, count := range seen {
if peer == peers[0] {
// we have an *extremely* low probability of selecting this node
// (like 0.005%) once. We still allow the delta to keep from being
// flaky.
if count > delta {
t.Fatalf("downweighted peer should not be selected, selected %v times", count)
}
}

if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
}
}
}

var peers = []api.Peer{
{Addr: "one"}, {Addr: "two"}, {Addr: "three"},
{Addr: "four"}, {Addr: "five"}, {Addr: "six"},
Expand Down Expand Up @@ -320,6 +379,6 @@ func benchmarkRemotesObserve(b *testing.B, peers ...api.Peer) {
remotes := NewRemotes(peers...)

for i := 0; i < b.N; i++ {
remotes.Observe(peers[i%len(peers)], 1.0)
remotes.Observe(peers[i%len(peers)], DefaultObservationWeight)
}
}

0 comments on commit ee5fbe1

Please sign in to comment.