Skip to content
This repository was archived by the owner on Jun 20, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions ipam/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ func (g *allocate) Try(alloc *Allocator) bool {
}

// out of space
if donor, err := alloc.ring.ChoosePeerToAskForSpace(g.r.Start, g.r.End); err == nil {
alloc.debugln("Decided to ask peer", donor, "for space in range", g.r)
alloc.sendSpaceRequest(donor, g.r)
donors := alloc.ring.ChoosePeersToAskForSpace(g.r.Start, g.r.End)
for _, donor := range donors {
if err := alloc.sendSpaceRequest(donor, g.r); err != nil {
alloc.debugln("Problem asking peer", donor, "for space:", err)
} else {
alloc.debugln("Decided to ask peer", donor, "for space in range", g.r)
break
}
}

return false
Expand Down
4 changes: 2 additions & 2 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,14 @@ func (alloc *Allocator) propose() {
alloc.gossip.GossipBroadcast(alloc.Gossip())
}

func (alloc *Allocator) sendSpaceRequest(dest router.PeerName, r address.Range) {
func (alloc *Allocator) sendSpaceRequest(dest router.PeerName, r address.Range) error {
buf := new(bytes.Buffer)
enc := gob.NewEncoder(buf)
if err := enc.Encode(r); err != nil {
panic(err)
}
msg := router.Concat([]byte{msgSpaceRequest}, buf.Bytes())
alloc.gossip.GossipUnicast(dest, msg)
return alloc.gossip.GossipUnicast(dest, msg)
}

func (alloc *Allocator) sendRingUpdate(dest router.PeerName) {
Expand Down
41 changes: 25 additions & 16 deletions ipam/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,20 @@ func (r *Ring) ReportFree(freespace map[address.Address]address.Offset) {
}
}

// ChoosePeerToAskForSpace chooses a weighted-random peer to ask
// for space in the range [start, end). Assumes start<end.
func (r *Ring) ChoosePeerToAskForSpace(start, end address.Address) (result router.PeerName, err error) {
type weightedPeer struct {
weight float64
peername router.PeerName
}
type weightedPeers []weightedPeer

// Note Less is using > so that bigger weights sort earlier
func (ws weightedPeers) Less(i, j int) bool { return ws[i].weight > ws[j].weight }
func (ws weightedPeers) Len() int { return len(ws) }
func (ws weightedPeers) Swap(i, j int) { ws[i], ws[j] = ws[j], ws[i] }

// ChoosePeersToAskForSpace returns all peers we can ask for space in
// the range [start, end), in weighted-random order. Assumes start<end.
func (r *Ring) ChoosePeersToAskForSpace(start, end address.Address) []router.PeerName {
var (
sum address.Offset
totalSpacePerPeer = make(map[router.PeerName]address.Offset) // Compute total free space per peer
Expand Down Expand Up @@ -423,21 +434,19 @@ func (r *Ring) ChoosePeerToAskForSpace(start, end address.Address) (result route
sum += entry.Free
}

if sum == 0 {
err = ErrNoFreeSpace
return
}

// Pick random peer, weighted by total free space
rn := rand.Int63n(int64(sum))
// Compute weighted random numbers, then sort.
// This isn't perfect, e.g. an item with weight 2 will get chosen more than
// twice as often as an item with weight 1, but it's good enough for our purposes.
ws := make(weightedPeers, 0, len(totalSpacePerPeer))
for peername, space := range totalSpacePerPeer {
rn -= int64(space)
if rn < 0 {
return peername, nil
}
ws = append(ws, weightedPeer{weight: float64(space) * rand.Float64(), peername: peername})
}

panic("Should never reach this point")
sort.Sort(ws)
result := make([]router.PeerName, len(ws))
for i, wp := range ws {
result[i] = wp.peername
}
return result
}

func (r *Ring) PickPeerForTransfer() router.PeerName {
Expand Down
37 changes: 20 additions & 17 deletions ipam/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,39 +345,42 @@ func TestGossip(t *testing.T) {
func TestFindFree(t *testing.T) {
ring1 := New(start, end, peer1name)

_, err := ring1.ChoosePeerToAskForSpace(start, end)
wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace")
peers := ring1.ChoosePeersToAskForSpace(start, end)
wt.AssertTrue(t, len(peers) == 0, "Expected no peers")

ring1.Entries = []*entry{{Token: start, Peer: peer1name}}
_, err = ring1.ChoosePeerToAskForSpace(start, end)
wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace")
peers = ring1.ChoosePeersToAskForSpace(start, end)
wt.AssertTrue(t, len(peers) == 0, "Expected no peers")

// We shouldn't return outselves
ring1.ReportFree(map[address.Address]address.Offset{start: 10})
_, err = ring1.ChoosePeerToAskForSpace(start, end)
wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace")
peers = ring1.ChoosePeersToAskForSpace(start, end)
wt.AssertTrue(t, len(peers) == 0, "Expected no peers")

ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 1},
{Token: middle, Peer: peer1name, Free: 1}}
_, err = ring1.ChoosePeerToAskForSpace(start, end)
wt.AssertTrue(t, err == ErrNoFreeSpace, "Expected ErrNoFreeSpace")
peers = ring1.ChoosePeersToAskForSpace(start, end)
wt.AssertTrue(t, len(peers) == 0, "Expected no peers")
ring1.assertInvariants()

// We should return others
ring1.Entries = []*entry{{Token: start, Peer: peer2name, Free: 1}}
peer, err := ring1.ChoosePeerToAskForSpace(start, end)
wt.AssertSuccess(t, err)
wt.AssertEquals(t, peer, peer2name)
peers = ring1.ChoosePeersToAskForSpace(start, end)
wt.AssertTrue(t, len(peers) == 1, "Expected one peer")
wt.AssertEquals(t, peers[0], peer2name)

ring1.Entries = []*entry{{Token: start, Peer: peer2name, Free: 1},
{Token: middle, Peer: peer3name, Free: 1}}
peer, err = ring1.ChoosePeerToAskForSpace(start, middle)
wt.AssertSuccess(t, err)
wt.AssertEquals(t, peer, peer2name)
peers = ring1.ChoosePeersToAskForSpace(start, middle)
wt.AssertTrue(t, len(peers) == 1, "Expected one peer")
wt.AssertEquals(t, peers[0], peer2name)

peer, err = ring1.ChoosePeerToAskForSpace(middle, end)
wt.AssertSuccess(t, err)
wt.AssertEquals(t, peer, peer3name)
peers = ring1.ChoosePeersToAskForSpace(middle, end)
wt.AssertTrue(t, len(peers) == 1, "Expected one peer")
wt.AssertEquals(t, peers[0], peer3name)

peers = ring1.ChoosePeersToAskForSpace(start, end)
wt.AssertTrue(t, len(peers) == 2, "Expected two peers")
ring1.assertInvariants()
}

Expand Down
16 changes: 11 additions & 5 deletions router/gossip_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package router
import (
"bytes"
"encoding/gob"
"fmt"
"log"
"sync"
)
Expand Down Expand Up @@ -58,7 +59,12 @@ func (c *GossipChannel) deliverUnicast(srcName PeerName, origPayload []byte, dec
return err
}
if c.ourself.Name != destName {
return c.relayUnicast(destName, origPayload)
if err := c.relayUnicast(destName, origPayload); err != nil {
// just log errors from relayUnicast; a problem between us and destination
// is not enough reason to break the connection from the source
c.log(err)
}
return nil
}
var payload []byte
if err := dec.Decode(&payload); err != nil {
Expand Down Expand Up @@ -160,15 +166,15 @@ func (c *GossipChannel) GossipBroadcast(update GossipData) error {
return c.relayBroadcast(c.ourself.Name, update)
}

func (c *GossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) error {
func (c *GossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) (err error) {
if relayPeerName, found := c.routes.UnicastAll(dstPeerName); !found {
c.log("unknown relay destination:", dstPeerName)
err = fmt.Errorf("unknown relay destination: %s", dstPeerName)
} else if conn, found := c.ourself.ConnectionTo(relayPeerName); !found {
c.log("unable to find connection to relay peer", relayPeerName)
err = fmt.Errorf("unable to find connection to relay peer %s", relayPeerName)
} else {
conn.(ProtocolSender).SendProtocolMsg(ProtocolMsg{ProtocolGossipUnicast, buf})
}
return nil
return err
}

func (c *GossipChannel) relayBroadcast(srcName PeerName, update GossipData) error {
Expand Down