-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PeerList][Part 5] SinglePeerList #403
Changes from all commits
812e161
00f341d
16506bb
8510cbd
47b001d
8712de5
e3c5ff2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package peerlist | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"go.uber.org/yarpc/transport" | ||
"go.uber.org/yarpc/transport/internal/errors" | ||
) | ||
|
||
type single struct { | ||
lock sync.RWMutex | ||
|
||
initialPeerID transport.PeerIdentifier | ||
peer transport.Peer | ||
agent transport.Agent | ||
started bool | ||
} | ||
|
||
// NewSingle creates a static PeerList with a single Peer | ||
func NewSingle(pid transport.PeerIdentifier, agent transport.Agent) transport.PeerList { | ||
return &single{ | ||
initialPeerID: pid, | ||
agent: agent, | ||
started: false, | ||
} | ||
} | ||
|
||
func (pl *single) Start() error { | ||
pl.lock.Lock() | ||
defer pl.lock.Unlock() | ||
if pl.started { | ||
return errors.ErrPeerListAlreadyStarted("single") | ||
} | ||
pl.started = true | ||
|
||
peer, err := pl.agent.RetainPeer(pl.initialPeerID, pl) | ||
if err != nil { | ||
pl.started = false | ||
return err | ||
} | ||
pl.peer = peer | ||
return nil | ||
} | ||
|
||
func (pl *single) Stop() error { | ||
pl.lock.Lock() | ||
defer pl.lock.Unlock() | ||
|
||
if !pl.started { | ||
return errors.ErrPeerListNotStarted("single") | ||
} | ||
pl.started = false | ||
|
||
err := pl.agent.ReleasePeer(pl.initialPeerID, pl) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
pl.peer = nil | ||
return nil | ||
} | ||
|
||
func (pl *single) ChoosePeer(context.Context, *transport.Request) (transport.Peer, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do I understand that every new request will call this function first locking the mutex on the way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With a read lock yes, for this list it doesn't make as much sense, but once we start adding/removing peers I think it will be a good idea, or at least we'll need to be explicitly aware when we remove the mutexes (for performance reasons). |
||
pl.lock.RLock() | ||
defer pl.lock.RUnlock() | ||
|
||
if !pl.started { | ||
return nil, errors.ErrPeerListNotStarted("single") | ||
} | ||
return pl.peer, nil | ||
} | ||
|
||
// NotifyStatusChanged when the Peer status changes | ||
func (pl *single) NotifyStatusChanged(transport.Peer) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. outdated comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ping |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
package peerlist | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"go.uber.org/yarpc/transport" | ||
"go.uber.org/yarpc/transport/internal/errors" | ||
"go.uber.org/yarpc/transport/transporttest" | ||
|
||
"github.com/crossdock/crossdock-go/assert" | ||
"github.com/golang/mock/gomock" | ||
) | ||
|
||
func TestSingle(t *testing.T) { | ||
mockCtrl := gomock.NewController(t) | ||
defer mockCtrl.Finish() | ||
|
||
type expectedChooseResult struct { | ||
peer transport.Peer | ||
err error | ||
} | ||
|
||
type testStruct struct { | ||
msg string | ||
pid transport.PeerIdentifier | ||
agent *transporttest.MockAgent | ||
appliedFunc func(*single) error | ||
expectedPeerID transport.PeerIdentifier | ||
expectedPeer transport.Peer | ||
expectedAgent transport.Agent | ||
expectedStarted bool | ||
expectedErr error | ||
expectedChooseResults []expectedChooseResult | ||
} | ||
tests := []testStruct{ | ||
func() (s testStruct) { | ||
s.msg = "setup" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
return nil | ||
} | ||
|
||
s.expectedPeerID = s.pid | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = false | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of combining all these tests, it's okay (preferable even) to split There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
}(), | ||
func() (s testStruct) { | ||
s.msg = "stop before start" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
return pl.Stop() | ||
} | ||
|
||
s.expectedErr = errors.ErrPeerListNotStarted("single") | ||
s.expectedPeerID = s.pid | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = false | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
s.msg = "choose before start" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
return nil | ||
} | ||
|
||
s.expectedPeerID = s.pid | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = false | ||
s.expectedChooseResults = []expectedChooseResult{{ | ||
peer: nil, | ||
err: errors.ErrPeerListNotStarted("single"), | ||
}} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
s.msg = "start and choose" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
s.expectedPeer = transporttest.NewMockPeer(mockCtrl) | ||
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(s.expectedPeer, nil) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
return pl.Start() | ||
} | ||
|
||
s.expectedPeerID = s.pid | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = true | ||
s.expectedChooseResults = []expectedChooseResult{{ | ||
peer: s.expectedPeer, | ||
err: nil, | ||
}} | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
s.msg = "start with agent error" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
s.expectedErr = fmt.Errorf("test error") | ||
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(nil, s.expectedErr) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
return pl.Start() | ||
} | ||
|
||
s.expectedPeerID = s.pid | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = false | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
s.msg = "start twice" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
s.expectedPeer = transporttest.NewMockPeer(mockCtrl) | ||
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(s.expectedPeer, nil) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
pl.Start() | ||
return pl.Start() | ||
} | ||
|
||
s.expectedErr = errors.ErrPeerListAlreadyStarted("single") | ||
s.expectedPeerID = s.pid | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = true | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
s.msg = "start stop" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
peer := transporttest.NewMockPeer(mockCtrl) | ||
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(peer, nil) | ||
s.agent.EXPECT().ReleasePeer(s.pid, gomock.Any()).Return(nil) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
err := pl.Start() | ||
if err != nil { | ||
return err | ||
} | ||
return pl.Stop() | ||
} | ||
|
||
s.expectedErr = nil | ||
s.expectedPeerID = s.pid | ||
s.expectedPeer = nil | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = false | ||
return | ||
}(), | ||
func() (s testStruct) { | ||
s.msg = "start stop release failure" | ||
s.pid = transporttest.NewMockPeerIdentifier(mockCtrl) | ||
s.agent = transporttest.NewMockAgent(mockCtrl) | ||
|
||
s.expectedPeer = transporttest.NewMockPeer(mockCtrl) | ||
s.agent.EXPECT().RetainPeer(s.pid, gomock.Any()).Return(s.expectedPeer, nil) | ||
|
||
s.expectedErr = errors.ErrAgentHasNoReferenceToPeer{} | ||
s.agent.EXPECT().ReleasePeer(s.pid, gomock.Any()).Return(s.expectedErr) | ||
|
||
s.appliedFunc = func(pl *single) error { | ||
err := pl.Start() | ||
if err != nil { | ||
return err | ||
} | ||
return pl.Stop() | ||
} | ||
|
||
s.expectedPeerID = s.pid | ||
s.expectedAgent = s.agent | ||
s.expectedStarted = false | ||
return | ||
}(), | ||
} | ||
|
||
for _, tt := range tests { | ||
pl := NewSingle(tt.pid, tt.agent).(*single) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. subtests :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
err := tt.appliedFunc(pl) | ||
|
||
assert.Equal(t, tt.expectedErr, err, tt.msg) | ||
assert.Equal(t, tt.expectedAgent, pl.agent, tt.msg) | ||
assert.Equal(t, tt.expectedPeerID, pl.initialPeerID, tt.msg) | ||
assert.Equal(t, tt.expectedPeer, pl.peer, tt.msg) | ||
assert.Equal(t, tt.expectedStarted, pl.started, tt.msg) | ||
|
||
for _, expectedResult := range tt.expectedChooseResults { | ||
peer, err := pl.ChoosePeer(context.Background(), &transport.Request{}) | ||
|
||
assert.Equal(t, expectedResult.peer, peer, tt.msg) | ||
assert.True(t, expectedResult.peer == peer, tt.msg) | ||
assert.Equal(t, expectedResult.err, err, tt.msg) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to use a RW Mutex instead. There's a race here where if I
call
Start
andChoosePeer
concurrently,started
is set to true byStart
butChoosePeer
returns thenil
peer beforeStart
has a chance tofill it in.