Skip to content

Commit

Permalink
[PeerList][Part 5] SinglePeerList (#403)
Browse files Browse the repository at this point in the history
Summary: This PR creates a concrete peerlist implementation that has a
single peer implementation.  This can be used to shim the current existing http outbounds and replace them with a single PeerList
  • Loading branch information
willhug committed Nov 10, 2016
1 parent c310f7a commit 9dcf4d9
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 0 deletions.
17 changes: 17 additions & 0 deletions transport/internal/errors/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package errors

import (
"fmt"

"go.uber.org/yarpc/transport"
)

Expand Down Expand Up @@ -37,3 +38,19 @@ type ErrInvalidPeerType struct {
func (e ErrInvalidPeerType) Error() string {
return fmt.Sprintf("expected peer type (%s) but got peer (%v)", e.ExpectedType, e.PeerIdentifier)
}

// ErrPeerListAlreadyStarted represents a failure because Start() was already
// called on the peerlist.
type ErrPeerListAlreadyStarted string

func (e ErrPeerListAlreadyStarted) Error() string {
return fmt.Sprintf("%s has already been started", string(e))
}

// ErrPeerListNotStarted represents a failure because Start() was not called
// on a peerlist or if Stop() was called.
type ErrPeerListNotStarted string

func (e ErrPeerListNotStarted) Error() string {
return fmt.Sprintf("%s has not been started or was stopped", string(e))
}
75 changes: 75 additions & 0 deletions transport/peer/peerlist/single.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package peerlist

import (
"context"
"sync"

"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/internal/errors"
)

type single struct {
lock sync.RWMutex

initialPeerID transport.PeerIdentifier
peer transport.Peer
agent transport.Agent
started bool
}

// NewSingle creates a static PeerList with a single Peer
func NewSingle(pid transport.PeerIdentifier, agent transport.Agent) transport.PeerList {
return &single{
initialPeerID: pid,
agent: agent,
started: false,
}
}

func (pl *single) Start() error {
pl.lock.Lock()
defer pl.lock.Unlock()
if pl.started {
return errors.ErrPeerListAlreadyStarted("single")
}
pl.started = true

peer, err := pl.agent.RetainPeer(pl.initialPeerID, pl)
if err != nil {
pl.started = false
return err
}
pl.peer = peer
return nil
}

func (pl *single) Stop() error {
pl.lock.Lock()
defer pl.lock.Unlock()

if !pl.started {
return errors.ErrPeerListNotStarted("single")
}
pl.started = false

err := pl.agent.ReleasePeer(pl.initialPeerID, pl)
if err != nil {
return err
}

pl.peer = nil
return nil
}

func (pl *single) ChoosePeer(context.Context, *transport.Request) (transport.Peer, error) {
pl.lock.RLock()
defer pl.lock.RUnlock()

if !pl.started {
return nil, errors.ErrPeerListNotStarted("single")
}
return pl.peer, nil
}

// NotifyStatusChanged when the Peer status changes
func (pl *single) NotifyStatusChanged(transport.Peer) {}
211 changes: 211 additions & 0 deletions transport/peer/peerlist/single_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package peerlist

import (
"context"
"fmt"
"testing"

"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/internal/errors"
"go.uber.org/yarpc/transport/transporttest"

"github.com/crossdock/crossdock-go/assert"
"github.com/golang/mock/gomock"
)

func TestSingle(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

type expectedChooseResult struct {
peer transport.Peer
err error
}

type testStruct struct {
msg string
pid transport.PeerIdentifier
agent *transporttest.MockAgent
appliedFunc func(*single) error
expectedPeerID transport.PeerIdentifier
expectedPeer transport.Peer
expectedAgent transport.Agent
expectedStarted bool
expectedErr error
expectedChooseResults []expectedChooseResult
}
tests := []testStruct{
func() (s testStruct) {
s.msg = "setup"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

s.appliedFunc = func(pl *single) error {
return nil
}

s.expectedPeerID = s.pid
s.expectedAgent = s.agent
s.expectedStarted = false
return
}(),
func() (s testStruct) {
s.msg = "stop before start"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

s.appliedFunc = func(pl *single) error {
return pl.Stop()
}

s.expectedErr = errors.ErrPeerListNotStarted("single")
s.expectedPeerID = s.pid
s.expectedAgent = s.agent
s.expectedStarted = false
return
}(),
func() (s testStruct) {
s.msg = "choose before start"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

s.appliedFunc = func(pl *single) error {
return nil
}

s.expectedPeerID = s.pid
s.expectedAgent = s.agent
s.expectedStarted = false
s.expectedChooseResults = []expectedChooseResult{{
peer: nil,
err: errors.ErrPeerListNotStarted("single"),
}}
return
}(),
func() (s testStruct) {
s.msg = "start and choose"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

s.expectedPeer = transporttest.NewMockPeer(mockCtrl)
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(s.expectedPeer, nil)

s.appliedFunc = func(pl *single) error {
return pl.Start()
}

s.expectedPeerID = s.pid
s.expectedAgent = s.agent
s.expectedStarted = true
s.expectedChooseResults = []expectedChooseResult{{
peer: s.expectedPeer,
err: nil,
}}
return
}(),
func() (s testStruct) {
s.msg = "start with agent error"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

s.expectedErr = fmt.Errorf("test error")
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(nil, s.expectedErr)

s.appliedFunc = func(pl *single) error {
return pl.Start()
}

s.expectedPeerID = s.pid
s.expectedAgent = s.agent
s.expectedStarted = false
return
}(),
func() (s testStruct) {
s.msg = "start twice"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

s.expectedPeer = transporttest.NewMockPeer(mockCtrl)
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(s.expectedPeer, nil)

s.appliedFunc = func(pl *single) error {
pl.Start()
return pl.Start()
}

s.expectedErr = errors.ErrPeerListAlreadyStarted("single")
s.expectedPeerID = s.pid
s.expectedAgent = s.agent
s.expectedStarted = true
return
}(),
func() (s testStruct) {
s.msg = "start stop"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

peer := transporttest.NewMockPeer(mockCtrl)
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(peer, nil)
s.agent.EXPECT().ReleasePeer(s.pid, gomock.Any()).Return(nil)

s.appliedFunc = func(pl *single) error {
err := pl.Start()
if err != nil {
return err
}
return pl.Stop()
}

s.expectedErr = nil
s.expectedPeerID = s.pid
s.expectedPeer = nil
s.expectedAgent = s.agent
s.expectedStarted = false
return
}(),
func() (s testStruct) {
s.msg = "start stop release failure"
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl)
s.agent = transporttest.NewMockAgent(mockCtrl)

s.expectedPeer = transporttest.NewMockPeer(mockCtrl)
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(s.expectedPeer, nil)

s.expectedErr = errors.ErrAgentHasNoReferenceToPeer{}
s.agent.EXPECT().ReleasePeer(s.pid, gomock.Any()).Return(s.expectedErr)

s.appliedFunc = func(pl *single) error {
err := pl.Start()
if err != nil {
return err
}
return pl.Stop()
}

s.expectedPeerID = s.pid
s.expectedAgent = s.agent
s.expectedStarted = false
return
}(),
}

for _, tt := range tests {
pl := NewSingle(tt.pid, tt.agent).(*single)

err := tt.appliedFunc(pl)

assert.Equal(t, tt.expectedErr, err, tt.msg)
assert.Equal(t, tt.expectedAgent, pl.agent, tt.msg)
assert.Equal(t, tt.expectedPeerID, pl.initialPeerID, tt.msg)
assert.Equal(t, tt.expectedPeer, pl.peer, tt.msg)
assert.Equal(t, tt.expectedStarted, pl.started, tt.msg)

for _, expectedResult := range tt.expectedChooseResults {
peer, err := pl.ChoosePeer(context.Background(), &transport.Request{})

assert.Equal(t, expectedResult.peer, peer, tt.msg)
assert.True(t, expectedResult.peer == peer, tt.msg)
assert.Equal(t, expectedResult.err, err, tt.msg)
}
}
}

0 comments on commit 9dcf4d9

Please sign in to comment.