diff --git a/peer/x/list/roundrobin.go b/peer/x/roundrobin/list.go similarity index 80% rename from peer/x/list/roundrobin.go rename to peer/x/roundrobin/list.go index 8bb81e45f..821300d9d 100644 --- a/peer/x/list/roundrobin.go +++ b/peer/x/roundrobin/list.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package list +package roundrobin import ( "context" @@ -31,9 +31,9 @@ import ( "go.uber.org/atomic" ) -// NewRoundRobin creates a new round robin PeerList using -func NewRoundRobin(peerIDs []peer.Identifier, agent peer.Agent) (*RoundRobin, error) { - rr := &RoundRobin{ +// New creates a new round robin PeerList +func New(peerIDs []peer.Identifier, agent peer.Agent) (*List, error) { + rr := &List{ unavailablePeers: make(map[string]peer.Peer, len(peerIDs)), availablePeerRing: NewPeerRing(len(peerIDs)), agent: agent, @@ -44,8 +44,8 @@ func NewRoundRobin(peerIDs []peer.Identifier, agent peer.Agent) (*RoundRobin, er return rr, err } -// RoundRobin is a PeerList which rotates which peers are to be selected in a circle -type RoundRobin struct { +// List is a PeerList which rotates which peers are to be selected in a circle +type List struct { lock sync.Mutex unavailablePeers map[string]peer.Peer @@ -55,7 +55,7 @@ type RoundRobin struct { started atomic.Bool } -func (pl *RoundRobin) addAll(peerIDs []peer.Identifier) error { +func (pl *List) addAll(peerIDs []peer.Identifier) error { pl.lock.Lock() defer pl.lock.Unlock() @@ -71,7 +71,7 @@ func (pl *RoundRobin) addAll(peerIDs []peer.Identifier) error { } // Add a peer identifier to the round robin -func (pl *RoundRobin) Add(pid peer.Identifier) error { +func (pl *List) Add(pid peer.Identifier) error { pl.lock.Lock() err := pl.addPeerIdentifier(pid) pl.lock.Unlock() @@ -79,7 +79,7 @@ func (pl *RoundRobin) Add(pid peer.Identifier) error { } // Must be run inside a mutex.Lock() -func (pl *RoundRobin) addPeerIdentifier(pid peer.Identifier) error { +func (pl *List) addPeerIdentifier(pid peer.Identifier) error { p, err := pl.agent.RetainPeer(pid, pl) if err != nil { return err @@ -89,7 +89,7 @@ func (pl *RoundRobin) addPeerIdentifier(pid peer.Identifier) error { } // Must be run in a mutex.Lock() -func (pl *RoundRobin) addPeer(p peer.Peer) error { +func (pl *List) addPeer(p peer.Peer) error { if p.Status().ConnectionStatus != peer.Available { return pl.addToUnavailablePeers(p) } @@ -98,13 +98,13 @@ func (pl *RoundRobin) addPeer(p peer.Peer) error { } // Must be run in a mutex.Lock() -func (pl *RoundRobin) addToUnavailablePeers(p peer.Peer) error { +func (pl *List) addToUnavailablePeers(p peer.Peer) error { pl.unavailablePeers[p.Identifier()] = p return nil } // Must be run in a mutex.Lock() -func (pl *RoundRobin) addToAvailablePeers(p peer.Peer) error { +func (pl *List) addToAvailablePeers(p peer.Peer) error { if err := pl.availablePeerRing.Add(p); err != nil { return err } @@ -113,23 +113,23 @@ func (pl *RoundRobin) addToAvailablePeers(p peer.Peer) error { return nil } -// Start notifies the RoundRobin that requests will start coming -func (pl *RoundRobin) Start() error { +// Start notifies the List that requests will start coming +func (pl *List) Start() error { if pl.started.Swap(true) { return peer.ErrPeerListAlreadyStarted("RoundRobinList") } return nil } -// Stop notifies the RoundRobin that requests will stop coming -func (pl *RoundRobin) Stop() error { +// Stop notifies the List that requests will stop coming +func (pl *List) Stop() error { if !pl.started.Swap(false) { return peer.ErrPeerListNotStarted("RoundRobinList") } return pl.clearPeers() } -func (pl *RoundRobin) clearPeers() error { +func (pl *List) clearPeers() error { pl.lock.Lock() defer pl.lock.Unlock() @@ -147,7 +147,7 @@ func (pl *RoundRobin) clearPeers() error { // removeAllUnavailable will clear the unavailablePeers list and // return all the Peers in the list in a slice // Must be run in a mutex.Lock() -func (pl *RoundRobin) removeAllUnavailable() []peer.Peer { +func (pl *List) removeAllUnavailable() []peer.Peer { peers := make([]peer.Peer, 0, len(pl.unavailablePeers)) for id, p := range pl.unavailablePeers { peers = append(peers, p) @@ -158,7 +158,7 @@ func (pl *RoundRobin) removeAllUnavailable() []peer.Peer { // releaseAll will iterate through a list of peers and call release // on the agent -func (pl *RoundRobin) releaseAll(peers []peer.Peer) []error { +func (pl *List) releaseAll(peers []peer.Peer) []error { var errs []error for _, p := range peers { if err := pl.agent.ReleasePeer(p, pl); err != nil { @@ -169,7 +169,7 @@ func (pl *RoundRobin) releaseAll(peers []peer.Peer) []error { } // Remove a peer identifier from the round robin -func (pl *RoundRobin) Remove(pid peer.Identifier) error { +func (pl *List) Remove(pid peer.Identifier) error { pl.lock.Lock() defer pl.lock.Unlock() @@ -184,7 +184,7 @@ func (pl *RoundRobin) Remove(pid peer.Identifier) error { // removeByPeerIdentifier will search through the Available and Unavailable Peers // for the PeerID and remove it // Must be run in a mutex.Lock() -func (pl *RoundRobin) removeByPeerIdentifier(pid peer.Identifier) error { +func (pl *List) removeByPeerIdentifier(pid peer.Identifier) error { if p := pl.availablePeerRing.GetPeer(pid); p != nil { return pl.availablePeerRing.Remove(p) } @@ -200,12 +200,12 @@ func (pl *RoundRobin) removeByPeerIdentifier(pid peer.Identifier) error { // removeFromUnavailablePeers remove a peer from the Unavailable Peers list // the Peer should already be validated as non-nil and in the Unavailable list // Must be run in a mutex.Lock() -func (pl *RoundRobin) removeFromUnavailablePeers(p peer.Peer) { +func (pl *List) removeFromUnavailablePeers(p peer.Peer) { delete(pl.unavailablePeers, p.Identifier()) } // ChoosePeer selects the next available peer in the round robin -func (pl *RoundRobin) ChoosePeer(ctx context.Context, req *transport.Request) (peer.Peer, error) { +func (pl *List) ChoosePeer(ctx context.Context, req *transport.Request) (peer.Peer, error) { if !pl.started.Load() { return nil, peer.ErrPeerListNotStarted("RoundRobinList") } @@ -224,7 +224,7 @@ func (pl *RoundRobin) ChoosePeer(ctx context.Context, req *transport.Request) (p // nextPeer grabs the next available peer from the PeerRing and returns it, // if there are no available peers it returns nil -func (pl *RoundRobin) nextPeer() peer.Peer { +func (pl *List) nextPeer() peer.Peer { pl.lock.Lock() p := pl.availablePeerRing.Next() pl.lock.Unlock() @@ -233,7 +233,7 @@ func (pl *RoundRobin) nextPeer() peer.Peer { // notifyPeerAvailable writes to a channel indicating that a Peer is currently // available for requests -func (pl *RoundRobin) notifyPeerAvailable() { +func (pl *List) notifyPeerAvailable() { select { case pl.peerAvailableEvent <- struct{}{}: default: @@ -243,7 +243,7 @@ func (pl *RoundRobin) notifyPeerAvailable() { // waitForPeerAddedEvent waits until a peer is added to the peer list or the // given context finishes. // Must NOT be run in a mutex.Lock() -func (pl *RoundRobin) waitForPeerAddedEvent(ctx context.Context) error { +func (pl *List) waitForPeerAddedEvent(ctx context.Context) error { if _, ok := ctx.Deadline(); !ok { return peer.ErrChooseContextHasNoDeadline("RoundRobinList") } @@ -257,7 +257,7 @@ func (pl *RoundRobin) waitForPeerAddedEvent(ctx context.Context) error { } // NotifyStatusChanged when the peer's status changes -func (pl *RoundRobin) NotifyStatusChanged(pid peer.Identifier) { +func (pl *List) NotifyStatusChanged(pid peer.Identifier) { pl.lock.Lock() defer pl.lock.Unlock() @@ -275,7 +275,7 @@ func (pl *RoundRobin) NotifyStatusChanged(pid peer.Identifier) { // handleAvailablePeerStatusChange checks the connection status of a connected peer to potentially // move that Peer from the PeerRing to the unavailable peer map // Must be run in a mutex.Lock() -func (pl *RoundRobin) handleAvailablePeerStatusChange(p peer.Peer) error { +func (pl *List) handleAvailablePeerStatusChange(p peer.Peer) error { if p.Status().ConnectionStatus == peer.Available { // Peer is in the proper pool, ignore return nil @@ -293,7 +293,7 @@ func (pl *RoundRobin) handleAvailablePeerStatusChange(p peer.Peer) error { // handleUnavailablePeerStatusChange checks the connection status of an unavailable peer to potentially // move that Peer from the unavailablePeerMap into the available Peer Ring // Must be run in a mutex.Lock() -func (pl *RoundRobin) handleUnavailablePeerStatusChange(p peer.Peer) error { +func (pl *List) handleUnavailablePeerStatusChange(p peer.Peer) error { if p.Status().ConnectionStatus != peer.Available { // Peer is in the proper pool, ignore return nil diff --git a/peer/x/list/roundrobin_test.go b/peer/x/roundrobin/list_test.go similarity index 99% rename from peer/x/list/roundrobin_test.go rename to peer/x/roundrobin/list_test.go index dd70cf489..5ca3285de 100644 --- a/peer/x/list/roundrobin_test.go +++ b/peer/x/roundrobin/list_test.go @@ -1,4 +1,4 @@ -package list +package roundrobin import ( "context" @@ -633,7 +633,7 @@ func TestRoundRobinList(t *testing.T) { ExpectPeerRetainsWithError(agent, tt.errRetainedPeerIDs, tt.retainErr) ExpectPeerReleases(agent, tt.errReleasedPeerIDs, tt.releaseErr) - pl, err := NewRoundRobin(pids, agent) + pl, err := New(pids, agent) assert.Equal(t, tt.expectedCreateErr, err) deps := ListActionDeps{ diff --git a/peer/x/list/peerring.go b/peer/x/roundrobin/peerring.go similarity index 99% rename from peer/x/list/peerring.go rename to peer/x/roundrobin/peerring.go index 3b2d44730..6bf5a8cdd 100644 --- a/peer/x/list/peerring.go +++ b/peer/x/roundrobin/peerring.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package list +package roundrobin import ( "container/ring"