Skip to content

Commit

Permalink
Merge 8ea6ce1 into 4cd1958
Browse files Browse the repository at this point in the history
  • Loading branch information
willhug committed Nov 21, 2016
2 parents 4cd1958 + 8ea6ce1 commit 0015ab3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
18 changes: 1 addition & 17 deletions transport/peer/x/peerlist/roundrobin/peerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package roundrobin

import (
"container/ring"
"sync"

"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/internal/errors"
Expand All @@ -37,19 +36,15 @@ func NewPeerRing(capacity int) *PeerRing {

// PeerRing provides a safe way to interact (Add/Remove/Get) with a potentially
// changing list of peer objects
// PeerRing is NOT Thread-safe, make sure to only call PeerRing functions with a lock
type PeerRing struct {
lock sync.Mutex

peerToNode map[string]*ring.Ring
nextNode *ring.Ring
}

// Add a transport.Peer to the end of the PeerRing, if the ring is empty
// it initializes the nextNode marker
func (pr *PeerRing) Add(peer transport.Peer) error {
pr.lock.Lock()
defer pr.lock.Unlock()

if _, ok := pr.peerToNode[peer.Identifier()]; ok {
// Peer Already in ring, ignore the add
return errors.ErrPeerAddAlreadyInList(peer.Identifier())
Expand Down Expand Up @@ -77,9 +72,6 @@ func newPeerRingNode(peer transport.Peer) *ring.Ring {
// Remove a peer PeerIdentifier from the PeerRing, if the PeerID is not
// in the ring return an error
func (pr *PeerRing) Remove(pid transport.PeerIdentifier) error {
pr.lock.Lock()
defer pr.lock.Unlock()

node, ok := pr.peerToNode[pid.Identifier()]
if !ok {
// Peer doesn't exist in the list
Expand All @@ -93,17 +85,13 @@ func (pr *PeerRing) Remove(pid transport.PeerIdentifier) error {

// RemoveAll pops all the peers from the ring and returns them in a list
func (pr *PeerRing) RemoveAll() []transport.Peer {
pr.lock.Lock()
defer pr.lock.Unlock()

peers := make([]transport.Peer, 0, len(pr.peerToNode))
for _, node := range pr.peerToNode {
peers = append(peers, pr.popNode(node))
}
return peers
}

// Must be run inside a mutex.Lock()
func (pr *PeerRing) popNode(node *ring.Ring) transport.Peer {
p := getPeerForRingNode(node)

Expand All @@ -122,17 +110,13 @@ func (pr *PeerRing) popNode(node *ring.Ring) transport.Peer {
return p
}

// Must be run inside a mutex.Lock()
func (pr *PeerRing) isNextNode(node *ring.Ring) bool {
return pr.nextNode == node
}

// Next returns the next peer in the ring, or nil if there is no peer in the ring
// after it has the next peer, it increments the nextPeer marker in the ring
func (pr *PeerRing) Next() transport.Peer {
pr.lock.Lock()
defer pr.lock.Unlock()

if pr.nextNode == nil {
return nil
}
Expand Down
30 changes: 28 additions & 2 deletions transport/peer/x/peerlist/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package roundrobin

import (
"context"
"sync"

yerrors "go.uber.org/yarpc/internal/errors"
"go.uber.org/yarpc/transport"
Expand All @@ -44,13 +45,18 @@ func New(peerIDs []transport.PeerIdentifier, agent transport.Agent) (*RoundRobin

// RoundRobin is a PeerList which rotates which peers are to be selected in a circle
type RoundRobin struct {
lock sync.Mutex

pr *PeerRing
peerAvailableEvent chan struct{}
agent transport.Agent
started atomic.Bool
}

func (pl *RoundRobin) addAll(peerIDs []transport.PeerIdentifier) error {
pl.lock.Lock()
defer pl.lock.Unlock()

var errs []error

for _, peerID := range peerIDs {
Expand All @@ -65,9 +71,13 @@ func (pl *RoundRobin) addAll(peerIDs []transport.PeerIdentifier) error {

// Add a peer identifier to the round robin
func (pl *RoundRobin) Add(pid transport.PeerIdentifier) error {
return pl.addPeer(pid)
pl.lock.Lock()
err := pl.addPeer(pid)
pl.lock.Unlock()
return err
}

// Must be run inside a mutex.Lock()
func (pl *RoundRobin) addPeer(pid transport.PeerIdentifier) error {
p, err := pl.agent.RetainPeer(pid, pl)
if err != nil {
Expand Down Expand Up @@ -99,6 +109,9 @@ func (pl *RoundRobin) Stop() error {
}

func (pl *RoundRobin) clearPeers() error {
pl.lock.Lock()
defer pl.lock.Unlock()

var errs []error

peers := pl.pr.RemoveAll()
Expand All @@ -113,6 +126,9 @@ func (pl *RoundRobin) clearPeers() error {

// Remove a peer identifier from the round robin
func (pl *RoundRobin) Remove(pid transport.PeerIdentifier) error {
pl.lock.Lock()
defer pl.lock.Unlock()

if err := pl.pr.Remove(pid); err != nil {
// The peer has already been removed
return err
Expand All @@ -128,7 +144,7 @@ func (pl *RoundRobin) ChoosePeer(ctx context.Context, req *transport.Request) (t
}

for {
if nextPeer := pl.pr.Next(); nextPeer != nil {
if nextPeer := pl.nextPeer(); nextPeer != nil {
pl.notifyPeerAvailable()
return nextPeer, nil
}
Expand All @@ -139,6 +155,15 @@ func (pl *RoundRobin) ChoosePeer(ctx context.Context, req *transport.Request) (t
}
}

// nextPeer grabs the next available peer from the PeerRing and returns it,
// if there are no available peers it returns nil
func (pl *RoundRobin) nextPeer() transport.Peer {
pl.lock.Lock()
peer := pl.pr.Next()
pl.lock.Unlock()
return peer
}

// notifyPeerAvailable writes to a channel indicating that a Peer is currently
// available for requests
func (pl *RoundRobin) notifyPeerAvailable() {
Expand All @@ -150,6 +175,7 @@ func (pl *RoundRobin) notifyPeerAvailable() {

// waitForPeerAddedEvent waits until a peer is added to the peer list or the
// given context finishes.
// Must NOT be run in a mutex.Lock()
func (pl *RoundRobin) waitForPeerAddedEvent(ctx context.Context) error {
if _, ok := ctx.Deadline(); !ok {
return errors.ErrChooseContextHasNoDeadline("RoundRobinList")
Expand Down

0 comments on commit 0015ab3

Please sign in to comment.