Skip to content

Commit

Permalink
Merge pull request #1267 from stevvooe/increase-observation-weight
Browse files Browse the repository at this point in the history
picker: increase range of peer observation
  • Loading branch information
aaronlehmann committed Jul 29, 2016
2 parents 069614f + bbc4243 commit f271cc7
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
4 changes: 2 additions & 2 deletions agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (n *Node) run(ctx context.Context) (err error) {
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
if n.config.JoinAddr != "" {
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, 1)
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, picker.DefaultObservationWeight)
}
}

Expand Down Expand Up @@ -647,7 +647,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, 5)
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
Expand Down
7 changes: 6 additions & 1 deletion manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -138,7 +139,11 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
NodeID: m.NodeID,
Addr: m.Addr,
},
Weight: 1,

// TODO(stevvooe): Calculate weight of manager selection based on
// cluster-level observations, such as number of connections and
// load.
Weight: picker.DefaultObservationWeight,
})
}
return mgrs
Expand Down
24 changes: 14 additions & 10 deletions picker/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (

var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")

// DefaultObservationWeight provides a weight to use for positive observations
// that will balance well under repeated observations.
const DefaultObservationWeight = 10

// Remotes keeps track of remote addresses by weight, informed by
// observations.
type Remotes interface {
Expand Down Expand Up @@ -49,7 +53,7 @@ func NewRemotes(peers ...api.Peer) Remotes {
}

for _, peer := range peers {
mwr.Observe(peer, 1)
mwr.Observe(peer, DefaultObservationWeight)
}

return mwr
Expand Down Expand Up @@ -165,7 +169,7 @@ const (
// See
// https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_exponential_smoothing
// for details.
remoteWeightSmoothingFactor = 0.7
remoteWeightSmoothingFactor = 0.5
remoteWeightMax = 1 << 8
)

Expand Down Expand Up @@ -228,7 +232,7 @@ func (p *Picker) Init(cc *grpc.ClientConn) error {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
c, err := grpc.NewConn(cc)
if err != nil {
return err
Expand All @@ -248,7 +252,7 @@ func (p *Picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Unlock()
transport, err := p.conn.Wait(ctx)
if err != nil {
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return transport, err
Expand All @@ -261,7 +265,7 @@ func (p *Picker) PickAddr() (string, error) {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, -1) // downweight the current addr
p.r.ObserveIfExists(peer, -DefaultObservationWeight) // downweight the current addr

var err error
peer, err = p.r.Select()
Expand Down Expand Up @@ -299,15 +303,15 @@ func (p *Picker) WaitForStateChange(ctx context.Context, sourceState grpc.Connec
// TODO(stevvooe): This is questionable, but we'll see how it works.
switch state {
case grpc.Idle:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Connecting:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Ready:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.TransientFailure:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
case grpc.Shutdown:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return state, err
Expand Down
79 changes: 70 additions & 9 deletions picker/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRemotesExclude(t *testing.T) {
// value.
func TestRemotesConvergence(t *testing.T) {
remotes := NewRemotes()
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)

// zero weighted against 1
if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -115,7 +115,7 @@ func TestRemotesConvergence(t *testing.T) {

// crank it up
for i := 0; i < 10; i++ {
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)
}

if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -127,7 +127,7 @@ func TestRemotesConvergence(t *testing.T) {
}

// provided a poor review
remotes.Observe(api.Peer{Addr: "one"}, -1)
remotes.Observe(api.Peer{Addr: "one"}, -DefaultObservationWeight)

if remotes.Weights()[api.Peer{Addr: "one"}] > 0 {
t.Fatalf("should be below zero: %v", remotes.Weights()[api.Peer{Addr: "one"}])
Expand All @@ -149,7 +149,7 @@ func TestRemotesZeroWeights(t *testing.T) {
}

seen := map[api.Peer]struct{}{}
for i := 0; i < 25; i++ {
for i := 0; i < 1000; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("unexpected error from Select: %v", err)
Expand All @@ -165,7 +165,7 @@ func TestRemotesZeroWeights(t *testing.T) {
}

// Pump up number 3!
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)

count := map[api.Peer]int{}
for i := 0; i < 100; i++ {
Expand All @@ -178,7 +178,7 @@ func TestRemotesZeroWeights(t *testing.T) {
count[peer]++

// keep observing three
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)
}

// here, we ensure that three is at least three times more likely to be
Expand Down Expand Up @@ -238,10 +238,10 @@ func TestRemotesDownweight(t *testing.T) {
}

for _, p := range peers {
remotes.Observe(p, 1)
remotes.Observe(p, DefaultObservationWeight)
}

remotes.Observe(peers[0], -1)
remotes.Observe(peers[0], -DefaultObservationWeight)

samples := 100000
choosen := 0
Expand All @@ -262,6 +262,67 @@ func TestRemotesDownweight(t *testing.T) {
}
}

// TestRemotesPractical ensures that under a single poor observation, such as
// an error, the likelihood of selecting the node dramatically decreases.
func TestRemotesPractical(t *testing.T) {
peers := []api.Peer{{Addr: "one"}, {Addr: "two"}, {Addr: "three"}}
remotes := NewRemotes(peers...)
seen := map[api.Peer]int{}
selections := 1000
tolerance := 0.20 // allow 20% delta to reduce test failure probability

// set a baseline, where selections should be even
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

remotes.Observe(peer, DefaultObservationWeight)
seen[peer]++
}

expected, delta := selections/len(peers), int(tolerance*float64(selections))
low, high := expected-delta, expected+delta
for peer, count := range seen {
if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
}
}

// one bad observation should mark the node as bad
remotes.Observe(peers[0], -DefaultObservationWeight)

seen = map[api.Peer]int{} // resut
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

seen[peer]++
}

tolerance = 0.10 // switch to 10% tolerance for two peers
// same check as above, with only 2 peers, the bad peer should be unseen
expected, delta = selections/(len(peers)-1), int(tolerance*float64(selections))
low, high = expected-delta, expected+delta
for peer, count := range seen {
if peer == peers[0] {
// we have an *extremely* low probability of selecting this node
// (like 0.005%) once. Selecting this more than a few times will
// fail the test.
if count > 3 {
t.Fatalf("downweighted peer should not be selected, selected %v times", count)
}
}

if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
}
}
}

var peers = []api.Peer{
{Addr: "one"}, {Addr: "two"}, {Addr: "three"},
{Addr: "four"}, {Addr: "five"}, {Addr: "six"},
Expand Down Expand Up @@ -320,6 +381,6 @@ func benchmarkRemotesObserve(b *testing.B, peers ...api.Peer) {
remotes := NewRemotes(peers...)

for i := 0; i < b.N; i++ {
remotes.Observe(peers[i%len(peers)], 1.0)
remotes.Observe(peers[i%len(peers)], DefaultObservationWeight)
}
}

0 comments on commit f271cc7

Please sign in to comment.