From 9f784b3c316d557e45345044a96ec5443d7941c4 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 25 Jun 2015 12:22:53 +0100 Subject: [PATCH 1/6] Report errors in unicasting gossip --- ipam/allocator.go | 4 ++-- router/gossip_channel.go | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/ipam/allocator.go b/ipam/allocator.go index 8b609b96dd..98f36f3de8 100644 --- a/ipam/allocator.go +++ b/ipam/allocator.go @@ -575,14 +575,14 @@ func (alloc *Allocator) propose() { alloc.gossip.GossipBroadcast(alloc.Gossip()) } -func (alloc *Allocator) sendSpaceRequest(dest router.PeerName, r address.Range) { +func (alloc *Allocator) sendSpaceRequest(dest router.PeerName, r address.Range) error { buf := new(bytes.Buffer) enc := gob.NewEncoder(buf) if err := enc.Encode(r); err != nil { panic(err) } msg := router.Concat([]byte{msgSpaceRequest}, buf.Bytes()) - alloc.gossip.GossipUnicast(dest, msg) + return alloc.gossip.GossipUnicast(dest, msg) } func (alloc *Allocator) sendRingUpdate(dest router.PeerName) { diff --git a/router/gossip_channel.go b/router/gossip_channel.go index da7dc0ec75..bbc177db15 100644 --- a/router/gossip_channel.go +++ b/router/gossip_channel.go @@ -3,6 +3,7 @@ package router import ( "bytes" "encoding/gob" + "fmt" "log" "sync" ) @@ -58,7 +59,10 @@ func (c *GossipChannel) deliverUnicast(srcName PeerName, origPayload []byte, dec return err } if c.ourself.Name != destName { - return c.relayUnicast(destName, origPayload) + c.relayUnicast(destName, origPayload) + // ignore errors in passing on data; a problem between us and destination + // is not enough reason to break the connection from the source + return nil } var payload []byte if err := dec.Decode(&payload); err != nil { @@ -160,15 +164,18 @@ func (c *GossipChannel) GossipBroadcast(update GossipData) error { return c.relayBroadcast(c.ourself.Name, update) } -func (c *GossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) error { +func (c *GossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) (err error) { if relayPeerName, found := c.routes.UnicastAll(dstPeerName); !found { - c.log("unknown relay destination:", dstPeerName) + err = fmt.Errorf("unknown relay destination: %s", dstPeerName) } else if conn, found := c.ourself.ConnectionTo(relayPeerName); !found { - c.log("unable to find connection to relay peer", relayPeerName) + err = fmt.Errorf("unable to find connection to relay peer %s", relayPeerName) } else { conn.(ProtocolSender).SendProtocolMsg(ProtocolMsg{ProtocolGossipUnicast, buf}) } - return nil + if err != nil { + c.log(err) + } + return err } func (c *GossipChannel) relayBroadcast(srcName PeerName, update GossipData) error { From af9668f130f6b115a2d7b9a38c1384a90c19cdf8 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 25 Jun 2015 12:47:38 +0100 Subject: [PATCH 2/6] Keep trying peers to ask for space until we find one that doesn't error --- ipam/allocate.go | 7 +++++-- ipam/ring/ring.go | 31 +++++++++++++++---------------- ipam/ring/ring_test.go | 37 ++++++++++++++++++++----------------- 3 files changed, 40 insertions(+), 35 deletions(-) diff --git a/ipam/allocate.go b/ipam/allocate.go index 130380e90a..c1dcda6654 100644 --- a/ipam/allocate.go +++ b/ipam/allocate.go @@ -42,9 +42,12 @@ func (g *allocate) Try(alloc *Allocator) bool { } // out of space - if donor, err := alloc.ring.ChoosePeerToAskForSpace(g.r.Start, g.r.End); err == nil { + donors := alloc.ring.ChoosePeersToAskForSpace(g.r.Start, g.r.End) + for _, donor := range donors { alloc.debugln("Decided to ask peer", donor, "for space in range", g.r) - alloc.sendSpaceRequest(donor, g.r) + if err := alloc.sendSpaceRequest(donor, g.r); err == nil { + break + } } return false diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index fcdb362e63..4908e57180 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -392,9 +392,9 @@ func (r *Ring) ReportFree(freespace map[address.Address]address.Offset) { } } -// ChoosePeerToAskForSpace chooses a weighted-random peer to ask -// for space in the range [start, end). Assumes start 0 { + // Pick random peer, weighted by total free space + rn := rand.Int63n(int64(sum)) // note using 64-bit because rand.Intn uses signed int + for peername, space := range totalSpacePerPeer { + rn -= int64(space) + if rn < 0 { + result = append(result, peername) + sum -= space + delete(totalSpacePerPeer, peername) + break + } } } - - panic("Should never reach this point") + return } func (r *Ring) PickPeerForTransfer() router.PeerName { diff --git a/ipam/ring/ring_test.go b/ipam/ring/ring_test.go index ce63a38ef6..7c23ccdd48 100644 --- a/ipam/ring/ring_test.go +++ b/ipam/ring/ring_test.go @@ -345,39 +345,42 @@ func TestGossip(t *testing.T) { func TestFindFree(t *testing.T) { ring1 := New(start, end, peer1name) - _, err := ring1.ChoosePeerToAskForSpace(start, end) - wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace") + peers := ring1.ChoosePeersToAskForSpace(start, end) + wt.AssertTrue(t, len(peers) == 0, "Expected no peers") ring1.Entries = []*entry{{Token: start, Peer: peer1name}} - _, err = ring1.ChoosePeerToAskForSpace(start, end) - wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace") + peers = ring1.ChoosePeersToAskForSpace(start, end) + wt.AssertTrue(t, len(peers) == 0, "Expected no peers") // We shouldn't return outselves ring1.ReportFree(map[address.Address]address.Offset{start: 10}) - _, err = ring1.ChoosePeerToAskForSpace(start, end) - wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace") + peers = ring1.ChoosePeersToAskForSpace(start, end) + wt.AssertTrue(t, len(peers) == 0, "Expected no peers") ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 1}, {Token: middle, Peer: peer1name, Free: 1}} - _, err = ring1.ChoosePeerToAskForSpace(start, end) - wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace") + peers = ring1.ChoosePeersToAskForSpace(start, end) + wt.AssertTrue(t, len(peers) == 0, "Expected no peers") ring1.assertInvariants() // We should return others ring1.Entries = []*entry{{Token: start, Peer: peer2name, Free: 1}} - peer, err := ring1.ChoosePeerToAskForSpace(start, end) - wt.AssertSuccess(t, err) - wt.AssertEquals(t, peer, peer2name) + peers = ring1.ChoosePeersToAskForSpace(start, end) + wt.AssertTrue(t, len(peers) == 1, "Expected one peer") + wt.AssertEquals(t, peers[0], peer2name) ring1.Entries = []*entry{{Token: start, Peer: peer2name, Free: 1}, {Token: middle, Peer: peer3name, Free: 1}} - peer, err = ring1.ChoosePeerToAskForSpace(start, middle) - wt.AssertSuccess(t, err) - wt.AssertEquals(t, peer, peer2name) + peers = ring1.ChoosePeersToAskForSpace(start, middle) + wt.AssertTrue(t, len(peers) == 1, "Expected one peer") + wt.AssertEquals(t, peers[0], peer2name) - peer, err = ring1.ChoosePeerToAskForSpace(middle, end) - wt.AssertSuccess(t, err) - wt.AssertEquals(t, peer, peer3name) + peers = ring1.ChoosePeersToAskForSpace(middle, end) + wt.AssertTrue(t, len(peers) == 1, "Expected one peer") + wt.AssertEquals(t, peers[0], peer3name) + + peers = ring1.ChoosePeersToAskForSpace(start, end) + wt.AssertTrue(t, len(peers) == 2, "Expected two peers") ring1.assertInvariants() } From c58cc449c07941f6f38b49730e246c4822102902 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 25 Jun 2015 13:19:49 +0100 Subject: [PATCH 3/6] More complex ChoosePeersToAskForSpace, with better complexity --- ipam/ring/ring.go | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index 4908e57180..c411b163f6 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -395,10 +395,7 @@ func (r *Ring) ReportFree(freespace map[address.Address]address.Offset) { // ChoosePeerToAskForSpace returns all peers we can ask for space in // the range [start, end), in weighted-random order. Assumes start 0 { - // Pick random peer, weighted by total free space - rn := rand.Int63n(int64(sum)) // note using 64-bit because rand.Intn uses signed int - for peername, space := range totalSpacePerPeer { - rn -= int64(space) - if rn < 0 { - result = append(result, peername) - sum -= space - delete(totalSpacePerPeer, peername) - break - } + } + + // Compute cumulative distribution of spaces + type spaceCDF struct { + sum address.Offset + peername router.PeerName + } + var cdf []spaceCDF + var sum address.Offset + for peername, space := range totalSpacePerPeer { + sum += space + cdf = append(cdf, spaceCDF{sum: sum, peername: peername}) + } + + for len(cdf) > 0 { + // Pick a number up to remaining free space + // (note using 64-bit because rand.Intn is signed int) + rn := rand.Int63n(int64(cdf[len(cdf)-1].sum)) + // Find where this number lands in the CDF, and take that peer + i := sort.Search(len(cdf), func(j int) bool { return int64(cdf[j].sum) > rn }) + result = append(result, cdf[i].peername) + // Remove this entry from the CDF and adjust all following sums + space := totalSpacePerPeer[cdf[i].peername] + for j := i; j < len(cdf); j++ { + cdf[j].sum -= space } + cdf = append(cdf[:i], cdf[i+1:]...) } return } From b472da18f04b65470e7197521e5c5bbd68918f50 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 25 Jun 2015 14:04:37 +0100 Subject: [PATCH 4/6] Improve logging of unicast gossip errors --- ipam/allocate.go | 6 ++++-- router/gossip_channel.go | 11 +++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ipam/allocate.go b/ipam/allocate.go index c1dcda6654..f2da3c7470 100644 --- a/ipam/allocate.go +++ b/ipam/allocate.go @@ -44,8 +44,10 @@ func (g *allocate) Try(alloc *Allocator) bool { // out of space donors := alloc.ring.ChoosePeersToAskForSpace(g.r.Start, g.r.End) for _, donor := range donors { - alloc.debugln("Decided to ask peer", donor, "for space in range", g.r) - if err := alloc.sendSpaceRequest(donor, g.r); err == nil { + if err := alloc.sendSpaceRequest(donor, g.r); err != nil { + alloc.debugln("Problem asking peer", donor, "for space:", err) + } else { + alloc.debugln("Decided to ask peer", donor, "for space in range", g.r) break } } diff --git a/router/gossip_channel.go b/router/gossip_channel.go index bbc177db15..39de093a88 100644 --- a/router/gossip_channel.go +++ b/router/gossip_channel.go @@ -59,9 +59,11 @@ func (c *GossipChannel) deliverUnicast(srcName PeerName, origPayload []byte, dec return err } if c.ourself.Name != destName { - c.relayUnicast(destName, origPayload) - // ignore errors in passing on data; a problem between us and destination - // is not enough reason to break the connection from the source + if err := c.relayUnicast(destName, origPayload); err != nil { + // just log errors from relayUnicast; a problem between us and destination + // is not enough reason to break the connection from the source + c.log(err) + } return nil } var payload []byte @@ -172,9 +174,6 @@ func (c *GossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) (err erro } else { conn.(ProtocolSender).SendProtocolMsg(ProtocolMsg{ProtocolGossipUnicast, buf}) } - if err != nil { - c.log(err) - } return err } From c76f640e2e67d9dfeb345269dc0671eaf0ec31f4 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 25 Jun 2015 14:48:17 +0100 Subject: [PATCH 5/6] Simpler implementation of weighted-random order --- ipam/ring/ring.go | 52 +++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index c411b163f6..da65014b1f 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -392,10 +392,23 @@ func (r *Ring) ReportFree(freespace map[address.Address]address.Offset) { } } -// ChoosePeerToAskForSpace returns all peers we can ask for space in +type weightedPeer struct { + weight float64 + peername router.PeerName +} +type weightedPeers []weightedPeer + +func (ws weightedPeers) Len() int { return len(ws) } +func (ws weightedPeers) Less(i, j int) bool { return ws[i].weight < ws[j].weight } +func (ws weightedPeers) Swap(i, j int) { ws[i], ws[j] = ws[j], ws[i] } + +// ChoosePeersToAskForSpace returns all peers we can ask for space in // the range [start, end), in weighted-random order. Assumes start 0 { - // Pick a number up to remaining free space - // (note using 64-bit because rand.Intn is signed int) - rn := rand.Int63n(int64(cdf[len(cdf)-1].sum)) - // Find where this number lands in the CDF, and take that peer - i := sort.Search(len(cdf), func(j int) bool { return int64(cdf[j].sum) > rn }) - result = append(result, cdf[i].peername) - // Remove this entry from the CDF and adjust all following sums - space := totalSpacePerPeer[cdf[i].peername] - for j := i; j < len(cdf); j++ { - cdf[j].sum -= space - } - cdf = append(cdf[:i], cdf[i+1:]...) + ws = append(ws, weightedPeer{weight: float64(space) * rand.Float64(), peername: peername}) + } + sort.Sort(ws) + // Reverse order (bigger weights come last after sort) and copy to result + for i := range ws { + result = append(result, ws[len(ws)-i-1].peername) } return } From 8a4b0d536eb92c2bbcc06eaa18299aabe362255f Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 25 Jun 2015 15:30:49 +0100 Subject: [PATCH 6/6] Updated after review feedback --- ipam/ring/ring.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index da65014b1f..127175e8a2 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -398,13 +398,14 @@ type weightedPeer struct { } type weightedPeers []weightedPeer +// Note Less is using > so that bigger weights sort earlier +func (ws weightedPeers) Less(i, j int) bool { return ws[i].weight > ws[j].weight } func (ws weightedPeers) Len() int { return len(ws) } -func (ws weightedPeers) Less(i, j int) bool { return ws[i].weight < ws[j].weight } func (ws weightedPeers) Swap(i, j int) { ws[i], ws[j] = ws[j], ws[i] } // ChoosePeersToAskForSpace returns all peers we can ask for space in // the range [start, end), in weighted-random order. Assumes start