Skip to content

Commit

Permalink
picker: increase range of peer observation
Browse files Browse the repository at this point in the history
To allow the weighting algorithms to work properly, the values of
observation weights need to be selected to be greater than 1. Without
this, the observations, positive or negative, will always converge to 1.
We also relax the smoothing factor to allow the observations to react
faster to changes.

Arguably, we could remove the weight parameter from `Observe` and then
make the manager state truly discrete but that is a much larger change.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
  • Loading branch information
stevvooe committed Jul 29, 2016
1 parent 069614f commit bfd10a1
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
4 changes: 2 additions & 2 deletions agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (n *Node) run(ctx context.Context) (err error) {
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
if n.config.JoinAddr != "" {
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, 1)
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, picker.DefaultObservationWeight)
}
}

Expand Down Expand Up @@ -647,7 +647,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, 5)
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
Expand Down
7 changes: 6 additions & 1 deletion manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -138,7 +139,11 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
NodeID: m.NodeID,
Addr: m.Addr,
},
Weight: 1,

// TODO(stevvooe): Calculate weight of manager selection based on
// cluster-level observations, such as number of connections and
// load.
Weight: picker.DefaultObservationWeight,
})
}
return mgrs
Expand Down
24 changes: 14 additions & 10 deletions picker/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (

var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")

// DefaultObservationWeight provides a weight to use for positive observations
// that will balance well under repeated observations.
const DefaultObservationWeight = 10

// Remotes keeps track of remote addresses by weight, informed by
// observations.
type Remotes interface {
Expand Down Expand Up @@ -49,7 +53,7 @@ func NewRemotes(peers ...api.Peer) Remotes {
}

for _, peer := range peers {
mwr.Observe(peer, 1)
mwr.Observe(peer, DefaultObservationWeight)
}

return mwr
Expand Down Expand Up @@ -165,7 +169,7 @@ const (
// See
// https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_exponential_smoothing
// for details.
remoteWeightSmoothingFactor = 0.7
remoteWeightSmoothingFactor = 0.5
remoteWeightMax = 1 << 8
)

Expand Down Expand Up @@ -228,7 +232,7 @@ func (p *Picker) Init(cc *grpc.ClientConn) error {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
c, err := grpc.NewConn(cc)
if err != nil {
return err
Expand All @@ -248,7 +252,7 @@ func (p *Picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Unlock()
transport, err := p.conn.Wait(ctx)
if err != nil {
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return transport, err
Expand All @@ -261,7 +265,7 @@ func (p *Picker) PickAddr() (string, error) {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, -1) // downweight the current addr
p.r.ObserveIfExists(peer, -DefaultObservationWeight) // downweight the current addr

var err error
peer, err = p.r.Select()
Expand Down Expand Up @@ -299,15 +303,15 @@ func (p *Picker) WaitForStateChange(ctx context.Context, sourceState grpc.Connec
// TODO(stevvooe): This is questionable, but we'll see how it works.
switch state {
case grpc.Idle:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Connecting:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Ready:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.TransientFailure:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
case grpc.Shutdown:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return state, err
Expand Down
79 changes: 70 additions & 9 deletions picker/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRemotesExclude(t *testing.T) {
// value.
func TestRemotesConvergence(t *testing.T) {
remotes := NewRemotes()
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)

// zero weighted against 1
if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -115,7 +115,7 @@ func TestRemotesConvergence(t *testing.T) {

// crank it up
for i := 0; i < 10; i++ {
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)
}

if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -127,7 +127,7 @@ func TestRemotesConvergence(t *testing.T) {
}

// provided a poor review
remotes.Observe(api.Peer{Addr: "one"}, -1)
remotes.Observe(api.Peer{Addr: "one"}, -DefaultObservationWeight)

if remotes.Weights()[api.Peer{Addr: "one"}] > 0 {
t.Fatalf("should be below zero: %v", remotes.Weights()[api.Peer{Addr: "one"}])
Expand All @@ -149,7 +149,7 @@ func TestRemotesZeroWeights(t *testing.T) {
}

seen := map[api.Peer]struct{}{}
for i := 0; i < 25; i++ {
for i := 0; i < 1000; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("unexpected error from Select: %v", err)
Expand All @@ -165,7 +165,7 @@ func TestRemotesZeroWeights(t *testing.T) {
}

// Pump up number 3!
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)

count := map[api.Peer]int{}
for i := 0; i < 100; i++ {
Expand All @@ -178,7 +178,7 @@ func TestRemotesZeroWeights(t *testing.T) {
count[peer]++

// keep observing three
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)
}

// here, we ensure that three is at least three times more likely to be
Expand Down Expand Up @@ -238,10 +238,10 @@ func TestRemotesDownweight(t *testing.T) {
}

for _, p := range peers {
remotes.Observe(p, 1)
remotes.Observe(p, DefaultObservationWeight)
}

remotes.Observe(peers[0], -1)
remotes.Observe(peers[0], -DefaultObservationWeight)

samples := 100000
choosen := 0
Expand All @@ -262,6 +262,67 @@ func TestRemotesDownweight(t *testing.T) {
}
}

// TestRemotesPractical ensures that under a single poor observation, such as
// an error, the likelihood of selecting the node dramatically decreases.
func TestRemotesPractical(t *testing.T) {
peers := []api.Peer{{Addr: "one"}, {Addr: "two"}, {Addr: "three"}}
remotes := NewRemotes(peers...)
seen := map[api.Peer]int{}
selections := 1000
tolerance := 0.20 // allow 10% delta to reduce test failure probability

// set a baseline, where selections should be even
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

remotes.Observe(peer, DefaultObservationWeight)
seen[peer]++
}

expected, delta := selections/len(peers), int(tolerance*float64(selections))
low, high := expected-delta, expected+delta
for peer, count := range seen {
if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
}
}

// one bad observation should mark the node as bad
remotes.Observe(peers[0], -DefaultObservationWeight)

seen = map[api.Peer]int{} // resut
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

seen[peer]++
}

tolerance = 0.10 // switch to 10% tolerance for two peers
// same check as above, with only 2 peers, the bad peer should be unseen
expected, delta = selections/(len(peers)-1), int(tolerance*float64(selections))
low, high = expected-delta, expected+delta
for peer, count := range seen {
if peer == peers[0] {
// we have an *extremely* low probability of selecting this node
// (like 0.005%) once. Selecting this more than a few times will
// fail the test.
if count > 3 {
t.Fatalf("downweighted peer should not be selected, selected %v times", count)
}
}

if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
}
}
}

var peers = []api.Peer{
{Addr: "one"}, {Addr: "two"}, {Addr: "three"},
{Addr: "four"}, {Addr: "five"}, {Addr: "six"},
Expand Down Expand Up @@ -320,6 +381,6 @@ func benchmarkRemotesObserve(b *testing.B, peers ...api.Peer) {
remotes := NewRemotes(peers...)

for i := 0; i < b.N; i++ {
remotes.Observe(peers[i%len(peers)], 1.0)
remotes.Observe(peers[i%len(peers)], DefaultObservationWeight)
}
}

0 comments on commit bfd10a1

Please sign in to comment.