Skip to content

Commit

Permalink
Add heap based PeerList (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
prashantv committed Jan 31, 2017
1 parent 96ee15b commit 324ea65
Show file tree
Hide file tree
Showing 6 changed files with 1,201 additions and 3 deletions.
10 changes: 7 additions & 3 deletions api/peer/peertest/peerlistaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type StopAction struct {
// Apply runs "Stop" on the peerList and validates the error
func (a StopAction) Apply(t *testing.T, pl peer.Chooser, deps ListActionDeps) {
err := pl.Stop()
assert.Equal(t, a.ExpectedErr, err)
assert.Equal(t, a.ExpectedErr, err, "Stop action expected error %v, got %v", a.ExpectedErr, err)
}

// ChooseMultiAction will run Choose multiple times on the PeerList
Expand All @@ -77,7 +77,8 @@ type ChooseMultiAction struct {
func (a ChooseMultiAction) Apply(t *testing.T, pl peer.Chooser, deps ListActionDeps) {
for _, expectedPeer := range a.ExpectedPeers {
action := ChooseAction{
ExpectedPeer: expectedPeer,
ExpectedPeer: expectedPeer,
InputContextTimeout: 20 * time.Millisecond,
}
action.Apply(t, pl, deps)
}
Expand All @@ -104,7 +105,10 @@ func (a ChooseAction) Apply(t *testing.T, pl peer.Chooser, deps ListActionDeps)
defer cancel()
}

p, _, err := pl.Choose(ctx, a.InputRequest)
p, finish, err := pl.Choose(ctx, a.InputRequest)
if err == nil {
finish(nil)
}

if a.ExpectedErr != nil {
// Note that we're not verifying anything about ExpectedPeer here because
Expand Down
105 changes: 105 additions & 0 deletions peer/x/peerheap/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package peerheap

import (
"container/heap"
"fmt"

"go.uber.org/yarpc/api/peer"
)

type peerHeap struct {
transport peer.Transport
peers []*peerScore

// next is an incrementing counter for every push, which is compared when
// scores are equal. This ends up implementing round-robin when scores are
// equal.
next int
}

func (ph *peerHeap) Len() int {
return len(ph.peers)
}

// Less returns whether the left peer has a lower score. If the scores are
// equal, it returns the older peer (where "last" is lower.)
func (ph *peerHeap) Less(i, j int) bool {
p1 := ph.peers[i]
p2 := ph.peers[j]
if p1.score == p2.score {
return p1.last < p2.last
}
return p1.score < p2.score
}

// Swap implements the heap.Interface. Do NOT use this method directly.
func (ph *peerHeap) Swap(i, j int) {
p1 := ph.peers[i]
p2 := ph.peers[j]

ph.peers[i], ph.peers[j] = ph.peers[j], ph.peers[i]
p1.idx = j
p2.idx = i
}

// Push implements the heap.Interface. Do NOT use this method directly.
// Use pushPeer instead.
func (ph *peerHeap) Push(x interface{}) {
ps := x.(*peerScore)
ps.idx = len(ph.peers)
ph.peers = append(ph.peers, ps)
}

// Pop implements the heap.Interface. Do NOT use this method directly.
// Use popPeer instead.
func (ph *peerHeap) Pop() interface{} {
lastIdx := len(ph.peers) - 1
last := ph.peers[lastIdx]
ph.peers = ph.peers[:lastIdx]
return last
}

func (ph *peerHeap) delete(idx int) {
// Swap the element we want to delete with the last element, then pop it off.
ph.Swap(idx, ph.Len()-1)
ph.Pop()

// If the original index still exists in the list, it contains a different
// element so update the heap.
if idx < ph.Len() {
ph.update(idx)
}
}

func (ph *peerHeap) validate(ps *peerScore) error {
if ps.idx < 0 || ps.idx >= ph.Len() || ph.peers[ps.idx] != ps {
return fmt.Errorf("peerHeap bug: %+v has bad index %v (len %v)", ps, ps.idx, ph.Len())
}
return nil
}

func (ph *peerHeap) pushPeer(ps *peerScore) {
ph.next++
ps.last = ph.next
heap.Push(ph, ps)
}

func (ph *peerHeap) peekPeer() (*peerScore, bool) {
if ph.Len() == 0 {
return nil, false
}
return ph.peers[0], true
}

func (ph *peerHeap) popPeer() (*peerScore, bool) {
if ph.Len() == 0 {
return nil, false
}

peer := heap.Pop(ph).(*peerScore)
return peer, true
}

func (ph *peerHeap) update(i int) {
heap.Fix(ph, i)
}
124 changes: 124 additions & 0 deletions peer/x/peerheap/heap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package peerheap

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPeerHeapEmpty(t *testing.T) {
var ph peerHeap
assert.Zero(t, ph.Len(), "New peer heap should be empty")
popAndVerifyHeap(t, &ph)
}

func TestPeerHeapOrdering(t *testing.T) {
p1 := &peerScore{score: 1}
p2 := &peerScore{score: 2}
p3 := &peerScore{score: 3}

// same score as p3, but always pushed after p3, so it will be returned last.
p4 := &peerScore{score: 3}

want := []*peerScore{p1, p2, p3, p4}
tests := [][]*peerScore{
{p1, p2, p3, p4},
{p3, p4, p2, p1},
{p3, p1, p2, p4},
}

for _, tt := range tests {
var h peerHeap
for _, ps := range tt {
h.pushPeer(ps)
}

popped := popAndVerifyHeap(t, &h)
assert.Equal(t, want, popped, "Unexpected ordering of peers")
}
}

func TestPeerHeapUpdate(t *testing.T) {
var h peerHeap
p1 := &peerScore{score: 1}
p2 := &peerScore{score: 2}
p3 := &peerScore{score: 3}

h.pushPeer(p3)
h.pushPeer(p1)
h.pushPeer(p2)

ps, ok := h.popPeer()
require.True(t, ok, "pop with non-empty heap should succeed")
assert.Equal(t, p1, ps, "Wrong peer")

// Now update p2's score to be higher than p3.
p2.score = 10
h.update(p2.idx)

popped := popAndVerifyHeap(t, &h)
assert.Equal(t, []*peerScore{p3, p2}, popped, "Unexpected order after p2 update")
}

func TestPeerHeapDelete(t *testing.T) {
const numPeers = 10

var h peerHeap
peers := make([]*peerScore, numPeers)
for i := range peers {
peers[i] = &peerScore{score: int64(i)}
h.pushPeer(peers[i])
}

// The first peer is the lowest, remove it so it swaps with the last peer.
h.delete(0)

// Now when we pop peers, we expect peers 1 to N.
want := peers[1:]
popped := popAndVerifyHeap(t, &h)
assert.Equal(t, want, popped, "Unexpected peers after delete peer 0")
}

func TestPeerHeapValidate(t *testing.T) {
var h peerHeap
h.pushPeer(&peerScore{score: 1})

for _, i := range []int{0, -1, 5} {
ps := &peerScore{idx: i}
assert.Error(t, h.validate(ps), "peer %v should not validate", ps)
}
}

func popAndVerifyHeap(t *testing.T, h *peerHeap) []*peerScore {
var popped []*peerScore

lastScore := int64(-1)
for h.Len() > 0 {
verifyIndexes(t, h)

ps, ok := h.popPeer()
require.True(t, ok, "pop with non-empty heap should succeed")
popped = append(popped, ps)

if lastScore == -1 {
lastScore = ps.score
continue
}

if ps.score < lastScore {
t.Fatalf("heap returned peer %v with lower score than %v", ps, lastScore)
}
lastScore = ps.score
}

_, ok := h.popPeer()
require.False(t, ok, "Expected no peers to be returned with empty list")
return popped
}

func verifyIndexes(t *testing.T, h *peerHeap) {
for i := range h.peers {
assert.Equal(t, i, h.peers[i].idx, "wrong index for peer %v", h.peers[i])
}
}
Loading

0 comments on commit 324ea65

Please sign in to comment.