Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

picker: increase range of peer observation #1267

Merged
merged 1 commit into from
Jul 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (n *Node) run(ctx context.Context) (err error) {
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
if n.config.JoinAddr != "" {
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, 1)
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, picker.DefaultObservationWeight)
}
}

Expand Down Expand Up @@ -647,7 +647,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, 5)
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
Expand Down
7 changes: 6 additions & 1 deletion manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -138,7 +139,11 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
NodeID: m.NodeID,
Addr: m.Addr,
},
Weight: 1,

// TODO(stevvooe): Calculate weight of manager selection based on
// cluster-level observations, such as number of connections and
// load.
Weight: picker.DefaultObservationWeight,
})
}
return mgrs
Expand Down
24 changes: 14 additions & 10 deletions picker/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (

var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")

// DefaultObservationWeight provides a weight to use for positive observations
// that will balance well under repeated observations.
const DefaultObservationWeight = 10

// Remotes keeps track of remote addresses by weight, informed by
// observations.
type Remotes interface {
Expand Down Expand Up @@ -49,7 +53,7 @@ func NewRemotes(peers ...api.Peer) Remotes {
}

for _, peer := range peers {
mwr.Observe(peer, 1)
mwr.Observe(peer, DefaultObservationWeight)
}

return mwr
Expand Down Expand Up @@ -165,7 +169,7 @@ const (
// See
// https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_exponential_smoothing
// for details.
remoteWeightSmoothingFactor = 0.7
remoteWeightSmoothingFactor = 0.5
remoteWeightMax = 1 << 8
)

Expand Down Expand Up @@ -228,7 +232,7 @@ func (p *Picker) Init(cc *grpc.ClientConn) error {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
c, err := grpc.NewConn(cc)
if err != nil {
return err
Expand All @@ -248,7 +252,7 @@ func (p *Picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Unlock()
transport, err := p.conn.Wait(ctx)
if err != nil {
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return transport, err
Expand All @@ -261,7 +265,7 @@ func (p *Picker) PickAddr() (string, error) {
peer := p.peer
p.mu.Unlock()

p.r.ObserveIfExists(peer, -1) // downweight the current addr
p.r.ObserveIfExists(peer, -DefaultObservationWeight) // downweight the current addr

var err error
peer, err = p.r.Select()
Expand Down Expand Up @@ -299,15 +303,15 @@ func (p *Picker) WaitForStateChange(ctx context.Context, sourceState grpc.Connec
// TODO(stevvooe): This is questionable, but we'll see how it works.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove the TODO now that we are taking a closer look at this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still questionable... and we're probably getting rid of this after the move to grpc.LoadBalancer.

switch state {
case grpc.Idle:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Connecting:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Ready:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.TransientFailure:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
case grpc.Shutdown:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}

return state, err
Expand Down
79 changes: 70 additions & 9 deletions picker/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRemotesExclude(t *testing.T) {
// value.
func TestRemotesConvergence(t *testing.T) {
remotes := NewRemotes()
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)

// zero weighted against 1
if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -115,7 +115,7 @@ func TestRemotesConvergence(t *testing.T) {

// crank it up
for i := 0; i < 10; i++ {
remotes.Observe(api.Peer{Addr: "one"}, 1)
remotes.Observe(api.Peer{Addr: "one"}, DefaultObservationWeight)
}

if float64(remotes.Weights()[api.Peer{Addr: "one"}]) < remoteWeightSmoothingFactor {
Expand All @@ -127,7 +127,7 @@ func TestRemotesConvergence(t *testing.T) {
}

// provided a poor review
remotes.Observe(api.Peer{Addr: "one"}, -1)
remotes.Observe(api.Peer{Addr: "one"}, -DefaultObservationWeight)

if remotes.Weights()[api.Peer{Addr: "one"}] > 0 {
t.Fatalf("should be below zero: %v", remotes.Weights()[api.Peer{Addr: "one"}])
Expand All @@ -149,7 +149,7 @@ func TestRemotesZeroWeights(t *testing.T) {
}

seen := map[api.Peer]struct{}{}
for i := 0; i < 25; i++ {
for i := 0; i < 1000; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("unexpected error from Select: %v", err)
Expand All @@ -165,7 +165,7 @@ func TestRemotesZeroWeights(t *testing.T) {
}

// Pump up number 3!
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)

count := map[api.Peer]int{}
for i := 0; i < 100; i++ {
Expand All @@ -178,7 +178,7 @@ func TestRemotesZeroWeights(t *testing.T) {
count[peer]++

// keep observing three
remotes.Observe(api.Peer{Addr: "three"}, 10)
remotes.Observe(api.Peer{Addr: "three"}, DefaultObservationWeight)
}

// here, we ensure that three is at least three times more likely to be
Expand Down Expand Up @@ -238,10 +238,10 @@ func TestRemotesDownweight(t *testing.T) {
}

for _, p := range peers {
remotes.Observe(p, 1)
remotes.Observe(p, DefaultObservationWeight)
}

remotes.Observe(peers[0], -1)
remotes.Observe(peers[0], -DefaultObservationWeight)

samples := 100000
choosen := 0
Expand All @@ -262,6 +262,67 @@ func TestRemotesDownweight(t *testing.T) {
}
}

// TestRemotesPractical ensures that under a single poor observation, such as
// an error, the likelihood of selecting the node dramatically decreases.
func TestRemotesPractical(t *testing.T) {
peers := []api.Peer{{Addr: "one"}, {Addr: "two"}, {Addr: "three"}}
remotes := NewRemotes(peers...)
seen := map[api.Peer]int{}
selections := 1000
tolerance := 0.20 // allow 20% delta to reduce test failure probability

// set a baseline, where selections should be even
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

remotes.Observe(peer, DefaultObservationWeight)
seen[peer]++
}

expected, delta := selections/len(peers), int(tolerance*float64(selections))
low, high := expected-delta, expected+delta
for peer, count := range seen {
if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the selection is randomized, isn't it possible for you to get unlucky and have one address be returned much more often than others? Wouldn't 1000 runs that return the same address be a valid, if improbable, result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's extremely unlikely.

}
}

// one bad observation should mark the node as bad
remotes.Observe(peers[0], -DefaultObservationWeight)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we observe a few times with a positive weight before this downweighting to make sure that we're still unlikely to select this node even if its weight started as > 0 (as it will in practice)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can either have balanced downweighting have failures immediately reduce selection probability or have less negative than positive to require several observations before it is fully downweighted.

In this case, initial condition is DefaultObservationWeight and we downweight with -DefaultObservationWeight, expecting it to cross zero.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed the initial condition. Where is it set up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial loop converges it to DefaultObservationWeight.


seen = map[api.Peer]int{} // resut
for i := 0; i < selections; i++ {
peer, err := remotes.Select()
if err != nil {
t.Fatalf("error selecting peer: %v", err)
}

seen[peer]++
}

tolerance = 0.10 // switch to 10% tolerance for two peers
// same check as above, with only 2 peers, the bad peer should be unseen
expected, delta = selections/(len(peers)-1), int(tolerance*float64(selections))
low, high = expected-delta, expected+delta
for peer, count := range seen {
if peer == peers[0] {
// we have an *extremely* low probability of selecting this node
// (like 0.005%) once. Selecting this more than a few times will
// fail the test.
if count > 3 {
t.Fatalf("downweighted peer should not be selected, selected %v times", count)
}
}

if !(count >= low && count <= high) {
t.Fatalf("weighted selection not balanced: %v selected %v/%v, expected range %v, %v", peer, count, selections, low, high)
}
}
}

var peers = []api.Peer{
{Addr: "one"}, {Addr: "two"}, {Addr: "three"},
{Addr: "four"}, {Addr: "five"}, {Addr: "six"},
Expand Down Expand Up @@ -320,6 +381,6 @@ func benchmarkRemotesObserve(b *testing.B, peers ...api.Peer) {
remotes := NewRemotes(peers...)

for i := 0; i < b.N; i++ {
remotes.Observe(peers[i%len(peers)], 1.0)
remotes.Observe(peers[i%len(peers)], DefaultObservationWeight)
}
}