Skip to content

Commit

Permalink
[PeerRenames][Part 6] Move RoundRobin to 'x/roundrobin' package
Browse files Browse the repository at this point in the history
Summary: In following with our rename of list/single.go to
/single/list.go this diff updates the roundrobin outbound to do the same
  • Loading branch information
willhug committed Nov 29, 2016
1 parent 87336fa commit 0971c60
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 32 deletions.
58 changes: 29 additions & 29 deletions peer/x/list/roundrobin.go → peer/x/roundrobin/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package list
package roundrobin

import (
"context"
Expand All @@ -31,9 +31,9 @@ import (
"go.uber.org/atomic"
)

// NewRoundRobin creates a new round robin PeerList using
func NewRoundRobin(peerIDs []peer.Identifier, agent peer.Agent) (*RoundRobin, error) {
rr := &RoundRobin{
// New creates a new round robin PeerList
func New(peerIDs []peer.Identifier, agent peer.Agent) (*List, error) {
rr := &List{
unavailablePeers: make(map[string]peer.Peer, len(peerIDs)),
availablePeerRing: NewPeerRing(len(peerIDs)),
agent: agent,
Expand All @@ -44,8 +44,8 @@ func NewRoundRobin(peerIDs []peer.Identifier, agent peer.Agent) (*RoundRobin, er
return rr, err
}

// RoundRobin is a PeerList which rotates which peers are to be selected in a circle
type RoundRobin struct {
// List is a PeerList which rotates which peers are to be selected in a circle
type List struct {
lock sync.Mutex

unavailablePeers map[string]peer.Peer
Expand All @@ -55,7 +55,7 @@ type RoundRobin struct {
started atomic.Bool
}

func (pl *RoundRobin) addAll(peerIDs []peer.Identifier) error {
func (pl *List) addAll(peerIDs []peer.Identifier) error {
pl.lock.Lock()
defer pl.lock.Unlock()

Expand All @@ -71,15 +71,15 @@ func (pl *RoundRobin) addAll(peerIDs []peer.Identifier) error {
}

// Add a peer identifier to the round robin
func (pl *RoundRobin) Add(pid peer.Identifier) error {
func (pl *List) Add(pid peer.Identifier) error {
pl.lock.Lock()
err := pl.addPeerIdentifier(pid)
pl.lock.Unlock()
return err
}

// Must be run inside a mutex.Lock()
func (pl *RoundRobin) addPeerIdentifier(pid peer.Identifier) error {
func (pl *List) addPeerIdentifier(pid peer.Identifier) error {
p, err := pl.agent.RetainPeer(pid, pl)
if err != nil {
return err
Expand All @@ -89,7 +89,7 @@ func (pl *RoundRobin) addPeerIdentifier(pid peer.Identifier) error {
}

// Must be run in a mutex.Lock()
func (pl *RoundRobin) addPeer(p peer.Peer) error {
func (pl *List) addPeer(p peer.Peer) error {
if p.Status().ConnectionStatus != peer.Available {
return pl.addToUnavailablePeers(p)
}
Expand All @@ -98,13 +98,13 @@ func (pl *RoundRobin) addPeer(p peer.Peer) error {
}

// Must be run in a mutex.Lock()
func (pl *RoundRobin) addToUnavailablePeers(p peer.Peer) error {
func (pl *List) addToUnavailablePeers(p peer.Peer) error {
pl.unavailablePeers[p.Identifier()] = p
return nil
}

// Must be run in a mutex.Lock()
func (pl *RoundRobin) addToAvailablePeers(p peer.Peer) error {
func (pl *List) addToAvailablePeers(p peer.Peer) error {
if err := pl.availablePeerRing.Add(p); err != nil {
return err
}
Expand All @@ -113,23 +113,23 @@ func (pl *RoundRobin) addToAvailablePeers(p peer.Peer) error {
return nil
}

// Start notifies the RoundRobin that requests will start coming
func (pl *RoundRobin) Start() error {
// Start notifies the List that requests will start coming
func (pl *List) Start() error {
if pl.started.Swap(true) {
return peer.ErrPeerListAlreadyStarted("RoundRobinList")
}
return nil
}

// Stop notifies the RoundRobin that requests will stop coming
func (pl *RoundRobin) Stop() error {
// Stop notifies the List that requests will stop coming
func (pl *List) Stop() error {
if !pl.started.Swap(false) {
return peer.ErrPeerListNotStarted("RoundRobinList")
}
return pl.clearPeers()
}

func (pl *RoundRobin) clearPeers() error {
func (pl *List) clearPeers() error {
pl.lock.Lock()
defer pl.lock.Unlock()

Expand All @@ -147,7 +147,7 @@ func (pl *RoundRobin) clearPeers() error {
// removeAllUnavailable will clear the unavailablePeers list and
// return all the Peers in the list in a slice
// Must be run in a mutex.Lock()
func (pl *RoundRobin) removeAllUnavailable() []peer.Peer {
func (pl *List) removeAllUnavailable() []peer.Peer {
peers := make([]peer.Peer, 0, len(pl.unavailablePeers))
for id, p := range pl.unavailablePeers {
peers = append(peers, p)
Expand All @@ -158,7 +158,7 @@ func (pl *RoundRobin) removeAllUnavailable() []peer.Peer {

// releaseAll will iterate through a list of peers and call release
// on the agent
func (pl *RoundRobin) releaseAll(peers []peer.Peer) []error {
func (pl *List) releaseAll(peers []peer.Peer) []error {
var errs []error
for _, p := range peers {
if err := pl.agent.ReleasePeer(p, pl); err != nil {
Expand All @@ -169,7 +169,7 @@ func (pl *RoundRobin) releaseAll(peers []peer.Peer) []error {
}

// Remove a peer identifier from the round robin
func (pl *RoundRobin) Remove(pid peer.Identifier) error {
func (pl *List) Remove(pid peer.Identifier) error {
pl.lock.Lock()
defer pl.lock.Unlock()

Expand All @@ -184,7 +184,7 @@ func (pl *RoundRobin) Remove(pid peer.Identifier) error {
// removeByPeerIdentifier will search through the Available and Unavailable Peers
// for the PeerID and remove it
// Must be run in a mutex.Lock()
func (pl *RoundRobin) removeByPeerIdentifier(pid peer.Identifier) error {
func (pl *List) removeByPeerIdentifier(pid peer.Identifier) error {
if p := pl.availablePeerRing.GetPeer(pid); p != nil {
return pl.availablePeerRing.Remove(p)
}
Expand All @@ -200,12 +200,12 @@ func (pl *RoundRobin) removeByPeerIdentifier(pid peer.Identifier) error {
// removeFromUnavailablePeers remove a peer from the Unavailable Peers list
// the Peer should already be validated as non-nil and in the Unavailable list
// Must be run in a mutex.Lock()
func (pl *RoundRobin) removeFromUnavailablePeers(p peer.Peer) {
func (pl *List) removeFromUnavailablePeers(p peer.Peer) {
delete(pl.unavailablePeers, p.Identifier())
}

// ChoosePeer selects the next available peer in the round robin
func (pl *RoundRobin) ChoosePeer(ctx context.Context, req *transport.Request) (peer.Peer, error) {
func (pl *List) ChoosePeer(ctx context.Context, req *transport.Request) (peer.Peer, error) {
if !pl.started.Load() {
return nil, peer.ErrPeerListNotStarted("RoundRobinList")
}
Expand All @@ -224,7 +224,7 @@ func (pl *RoundRobin) ChoosePeer(ctx context.Context, req *transport.Request) (p

// nextPeer grabs the next available peer from the PeerRing and returns it,
// if there are no available peers it returns nil
func (pl *RoundRobin) nextPeer() peer.Peer {
func (pl *List) nextPeer() peer.Peer {
pl.lock.Lock()
p := pl.availablePeerRing.Next()
pl.lock.Unlock()
Expand All @@ -233,7 +233,7 @@ func (pl *RoundRobin) nextPeer() peer.Peer {

// notifyPeerAvailable writes to a channel indicating that a Peer is currently
// available for requests
func (pl *RoundRobin) notifyPeerAvailable() {
func (pl *List) notifyPeerAvailable() {
select {
case pl.peerAvailableEvent <- struct{}{}:
default:
Expand All @@ -243,7 +243,7 @@ func (pl *RoundRobin) notifyPeerAvailable() {
// waitForPeerAddedEvent waits until a peer is added to the peer list or the
// given context finishes.
// Must NOT be run in a mutex.Lock()
func (pl *RoundRobin) waitForPeerAddedEvent(ctx context.Context) error {
func (pl *List) waitForPeerAddedEvent(ctx context.Context) error {
if _, ok := ctx.Deadline(); !ok {
return peer.ErrChooseContextHasNoDeadline("RoundRobinList")
}
Expand All @@ -257,7 +257,7 @@ func (pl *RoundRobin) waitForPeerAddedEvent(ctx context.Context) error {
}

// NotifyStatusChanged when the peer's status changes
func (pl *RoundRobin) NotifyStatusChanged(pid peer.Identifier) {
func (pl *List) NotifyStatusChanged(pid peer.Identifier) {
pl.lock.Lock()
defer pl.lock.Unlock()

Expand All @@ -275,7 +275,7 @@ func (pl *RoundRobin) NotifyStatusChanged(pid peer.Identifier) {
// handleAvailablePeerStatusChange checks the connection status of a connected peer to potentially
// move that Peer from the PeerRing to the unavailable peer map
// Must be run in a mutex.Lock()
func (pl *RoundRobin) handleAvailablePeerStatusChange(p peer.Peer) error {
func (pl *List) handleAvailablePeerStatusChange(p peer.Peer) error {
if p.Status().ConnectionStatus == peer.Available {
// Peer is in the proper pool, ignore
return nil
Expand All @@ -293,7 +293,7 @@ func (pl *RoundRobin) handleAvailablePeerStatusChange(p peer.Peer) error {
// handleUnavailablePeerStatusChange checks the connection status of an unavailable peer to potentially
// move that Peer from the unavailablePeerMap into the available Peer Ring
// Must be run in a mutex.Lock()
func (pl *RoundRobin) handleUnavailablePeerStatusChange(p peer.Peer) error {
func (pl *List) handleUnavailablePeerStatusChange(p peer.Peer) error {
if p.Status().ConnectionStatus != peer.Available {
// Peer is in the proper pool, ignore
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package list
package roundrobin

import (
"context"
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestRoundRobinList(t *testing.T) {
ExpectPeerRetainsWithError(agent, tt.errRetainedPeerIDs, tt.retainErr)
ExpectPeerReleases(agent, tt.errReleasedPeerIDs, tt.releaseErr)

pl, err := NewRoundRobin(pids, agent)
pl, err := New(pids, agent)
assert.Equal(t, tt.expectedCreateErr, err)

deps := ListActionDeps{
Expand Down
2 changes: 1 addition & 1 deletion peer/x/list/peerring.go → peer/x/roundrobin/peerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package list
package roundrobin

import (
"container/ring"
Expand Down

0 comments on commit 0971c60

Please sign in to comment.