-
Notifications
You must be signed in to change notification settings - Fork 103
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Summary: This PR creates a concrete peerlist implementation that has a single peer implementation.
- Loading branch information
Showing
2 changed files
with
272 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package peers | ||
|
||
import ( | ||
"context" | ||
|
||
"go.uber.org/yarpc/internal/errors" | ||
"go.uber.org/yarpc/transport" | ||
|
||
"github.com/uber-go/atomic" | ||
) | ||
|
||
type singlePeerList struct { | ||
peerID transport.PeerIdentifier | ||
peer transport.Peer | ||
agent transport.PeerAgent | ||
started *atomic.Bool | ||
} | ||
|
||
// NewSinglePeerList creates a static PeerList with a single Peer | ||
func NewSinglePeerList(pi transport.PeerIdentifier, agent transport.PeerAgent) transport.PeerList { | ||
return &singlePeerList{ | ||
peerID: pi, | ||
agent: agent, | ||
started: atomic.NewBool(false), | ||
} | ||
} | ||
|
||
func (pl *singlePeerList) Start() error { | ||
if pl.started.Swap(true) { | ||
return errors.ErrOutboundAlreadyStarted("SinglePeerList") | ||
} | ||
peer, err := pl.agent.RetainPeer(pl.peerID, pl) | ||
if err != nil { | ||
pl.started.Swap(false) | ||
return err | ||
} | ||
pl.peer = peer | ||
return nil | ||
} | ||
|
||
func (pl *singlePeerList) Stop() error { | ||
if !pl.started.Swap(false) { | ||
return errors.ErrOutboundNotStarted("SinglePeerList") | ||
} | ||
err := pl.agent.ReleasePeer(pl.peerID, pl) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
pl.peer = nil | ||
return nil | ||
} | ||
|
||
func (pl *singlePeerList) ChoosePeer(context.Context, *transport.Request) (transport.Peer, error) { | ||
if !pl.started.Load() { | ||
return nil, errors.ErrOutboundNotStarted("peerlist was not started") | ||
} | ||
return pl.peer, nil | ||
} | ||
|
||
// NotifyAvailable when a Peer can accept requests | ||
func (pl *singlePeerList) NotifyAvailable(transport.Peer) error { | ||
return nil // Noop | ||
} | ||
|
||
// NotifyConnecting when a Peer is connecting | ||
func (pl *singlePeerList) NotifyConnecting(transport.Peer) error { | ||
return nil // Noop | ||
} | ||
|
||
// NotifyUnavailable when a Peer is cannot handle requests | ||
func (pl *singlePeerList) NotifyUnavailable(transport.Peer) error { | ||
return nil // Noop | ||
} | ||
|
||
// NotifyPending when the number of Pending requests changes | ||
func (pl *singlePeerList) NotifyPending(transport.Peer) {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package peers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"go.uber.org/yarpc/internal/errors" | ||
"go.uber.org/yarpc/transport" | ||
"go.uber.org/yarpc/transport/transporttest" | ||
|
||
"github.com/crossdock/crossdock-go/assert" | ||
"github.com/golang/mock/gomock" | ||
) | ||
|
||
func TestSinglePeerList(t *testing.T) { | ||
mockCtrl := gomock.NewController(t) | ||
defer mockCtrl.Finish() | ||
|
||
type testStruct struct { | ||
pl *singlePeerList | ||
appliedFunc func(transport.PeerList) | ||
assertFunc func(*singlePeerList) | ||
} | ||
tests := []testStruct{ | ||
func() (s testStruct) { | ||
msg := "setup" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
|
||
s.appliedFunc = func(pl transport.PeerList) {} | ||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.Nil(t, pl.peer, msg) | ||
assert.Equal(t, agent, pl.agent, msg) | ||
assert.Equal(t, pi, pl.peerID, msg) | ||
assert.Equal(t, false, pl.started.Load(), msg) | ||
} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
msg := "stop before start" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
var err error | ||
s.appliedFunc = func(pl transport.PeerList) { | ||
err = pl.Stop() | ||
} | ||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.NotNil(t, err, msg) | ||
assert.IsType(t, errors.ErrOutboundNotStarted(""), err, msg) | ||
assert.Equal(t, false, pl.started.Load(), msg) | ||
} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
msg := "choose before start" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
var err error | ||
var peer transport.Peer | ||
s.appliedFunc = func(pl transport.PeerList) { | ||
peer, err = pl.ChoosePeer(context.Background(), &transport.Request{}) | ||
} | ||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.Nil(t, peer, msg) | ||
assert.NotNil(t, err, msg) | ||
assert.IsType(t, errors.ErrOutboundNotStarted(""), err, msg) | ||
assert.Equal(t, false, pl.started.Load(), msg) | ||
} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
msg := "start and choose" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
|
||
expectedPeer := transporttest.NewMockPeer(mockCtrl) | ||
agent.EXPECT().RetainPeer(pi, s.pl).Return(expectedPeer, nil) | ||
|
||
var startErr error | ||
var chooseErr error | ||
var peer transport.Peer | ||
s.appliedFunc = func(pl transport.PeerList) { | ||
startErr = pl.Start() | ||
peer, chooseErr = pl.ChoosePeer(context.Background(), &transport.Request{}) | ||
} | ||
|
||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.Nil(t, startErr, msg) | ||
assert.Nil(t, chooseErr, msg) | ||
assert.Equal(t, expectedPeer, peer, msg) | ||
assert.Equal(t, true, pl.started.Load(), msg) | ||
} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
msg := "start with agent error" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
|
||
expectedErr := fmt.Errorf("test error") | ||
agent.EXPECT().RetainPeer(pi, s.pl).Return(nil, expectedErr) | ||
|
||
var startErr error | ||
s.appliedFunc = func(pl transport.PeerList) { | ||
startErr = pl.Start() | ||
} | ||
|
||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.Equal(t, expectedErr, startErr, msg) | ||
assert.Equal(t, false, pl.started.Load(), msg) | ||
} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
msg := "start twice" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
|
||
peer := transporttest.NewMockPeer(mockCtrl) | ||
agent.EXPECT().RetainPeer(pi, s.pl).Return(peer, nil) | ||
|
||
var startErr error | ||
s.appliedFunc = func(pl transport.PeerList) { | ||
_ = pl.Start() | ||
startErr = pl.Start() | ||
} | ||
|
||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.NotNil(t, startErr, msg) | ||
assert.IsType(t, errors.ErrOutboundAlreadyStarted(""), startErr, msg) | ||
} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
msg := "start stop" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
|
||
peer := transporttest.NewMockPeer(mockCtrl) | ||
agent.EXPECT().RetainPeer(pi, s.pl).Return(peer, nil) | ||
agent.EXPECT().ReleasePeer(pi, s.pl).Return(nil) | ||
|
||
var startErr error | ||
var stopErr error | ||
s.appliedFunc = func(pl transport.PeerList) { | ||
startErr = pl.Start() | ||
stopErr = pl.Stop() | ||
} | ||
|
||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.Nil(t, startErr, msg) | ||
assert.Nil(t, stopErr, msg) | ||
assert.Equal(t, false, pl.started.Load(), msg) | ||
} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
msg := "start stop release failure" | ||
pi := transporttest.NewMockPeerIdentifier(mockCtrl) | ||
agent := transporttest.NewMockPeerAgent(mockCtrl) | ||
s.pl = NewSinglePeerList(pi, agent).(*singlePeerList) | ||
|
||
peer := transporttest.NewMockPeer(mockCtrl) | ||
agent.EXPECT().RetainPeer(pi, s.pl).Return(peer, nil) | ||
agent.EXPECT().ReleasePeer(pi, s.pl).Return(errors.ErrAgentHasNoReferenceToPeer{}) | ||
|
||
var startErr error | ||
var stopErr error | ||
s.appliedFunc = func(pl transport.PeerList) { | ||
startErr = pl.Start() | ||
stopErr = pl.Stop() | ||
} | ||
|
||
s.assertFunc = func(pl *singlePeerList) { | ||
assert.Nil(t, startErr, msg) | ||
assert.Equal(t, errors.ErrAgentHasNoReferenceToPeer{}, stopErr, msg) | ||
} | ||
return | ||
}(), | ||
} | ||
|
||
for _, tt := range tests { | ||
tt.appliedFunc(tt.pl) | ||
|
||
tt.assertFunc(tt.pl) | ||
} | ||
} |