Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Peer] Adjust choose interface to return a onFinish closure #522

Merged
merged 2 commits into from
Dec 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion peer/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Chooser interface {
Stop() error

// Choose a Peer for the next call, block until a peer is available (or timeout)
Choose(context.Context, *transport.Request) (Peer, error)
Choose(context.Context, *transport.Request) (peer Peer, onFinish func(error), err error)
}

// List listens to adds and removes of Peers from a PeerProvider
Expand Down
2 changes: 1 addition & 1 deletion peer/peertest/peerlistaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (a ChooseAction) Apply(t *testing.T, pl peer.Chooser, deps ListActionDeps)
defer cancel()
}

p, err := pl.Choose(ctx, a.InputRequest)
p, _, err := pl.Choose(ctx, a.InputRequest)

if a.ExpectedErr != nil {
// Note that we're not verifying anything about ExpectedPeer here because
Expand Down
9 changes: 7 additions & 2 deletions peer/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ func NewSingle(pid Identifier, transport Transport) *Single {
}

// Choose returns the single peer
func (s *Single) Choose(context.Context, *transport.Request) (Peer, error) {
return s.p, s.err
func (s *Single) Choose(context.Context, *transport.Request) (Peer, func(error), error) {
s.p.StartRequest(s)
return s.p, s.onFinish, s.err
}

func (s *Single) onFinish(_ error) {
s.p.EndRequest(s)
}

// NotifyStatusChanged receives notifications from the transport when the peer
Expand Down
16 changes: 12 additions & 4 deletions peer/x/roundrobin/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,20 @@ func (pl *List) removeFromUnavailablePeers(p peer.Peer) {
}

// Choose selects the next available peer in the round robin
func (pl *List) Choose(ctx context.Context, req *transport.Request) (peer.Peer, error) {
func (pl *List) Choose(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) {
if !pl.isRunning() {
return nil, peer.ErrPeerListNotStarted("RoundRobinList")
return nil, nil, peer.ErrPeerListNotStarted("RoundRobinList")
}

for {
if nextPeer := pl.nextPeer(); nextPeer != nil {
pl.notifyPeerAvailable()
return nextPeer, nil
nextPeer.StartRequest(pl)
return nextPeer, pl.getOnFinishFunc(nextPeer), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should cache this on the ring next to the node it effects.

}

if err := pl.waitForPeerAddedEvent(ctx); err != nil {
return nil, err
return nil, nil, err
}
}
}
Expand All @@ -253,6 +254,13 @@ func (pl *List) notifyPeerAvailable() {
}
}

// getOnFinishFunc creates a closure that will be run at the end of the request
func (pl *List) getOnFinishFunc(p peer.Peer) func(error) {
return func(_ error) {
p.EndRequest(pl)
}
}

// waitForPeerAddedEvent waits until a peer is added to the peer list or the
// given context finishes.
// Must NOT be run in a mutex.Lock()
Expand Down
28 changes: 20 additions & 8 deletions transport/http/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,25 @@ func (o *Outbound) CallOneway(ctx context.Context, treq *transport.Request) (tra
}

func (o *Outbound) call(ctx context.Context, treq *transport.Request, start time.Time, ttl time.Duration) (*transport.Response, error) {
p, err := o.getPeerForRequest(ctx, treq)
p, onFinish, err := o.getPeerForRequest(ctx, treq)
if err != nil {
return nil, err
}
p.StartRequest(nil)
defer p.EndRequest(nil)

resp, err := o.callWithPeer(ctx, treq, start, ttl, p)

// Call the onFinish method right before returning (with the error from call with peer)
onFinish(err)
return resp, err
}

func (o *Outbound) callWithPeer(
ctx context.Context,
treq *transport.Request,
start time.Time,
ttl time.Duration,
p *hostport.Peer,
) (*transport.Response, error) {
req, err := o.createRequest(p, treq)
if err != nil {
return nil, err
Expand Down Expand Up @@ -247,21 +259,21 @@ func (o *Outbound) call(ctx context.Context, treq *transport.Request, start time
return nil, getErrFromResponse(response)
}

func (o *Outbound) getPeerForRequest(ctx context.Context, treq *transport.Request) (*hostport.Peer, error) {
p, err := o.chooser.Choose(ctx, treq)
func (o *Outbound) getPeerForRequest(ctx context.Context, treq *transport.Request) (*hostport.Peer, func(error), error) {
p, onFinish, err := o.chooser.Choose(ctx, treq)
if err != nil {
return nil, err
return nil, nil, err
}

hpPeer, ok := p.(*hostport.Peer)
if !ok {
return nil, peer.ErrInvalidPeerConversion{
return nil, nil, peer.ErrInvalidPeerConversion{
Peer: p,
ExpectedType: "*hostport.Peer",
}
}

return hpPeer, nil
return hpPeer, onFinish, nil
}

func (o *Outbound) createRequest(p *hostport.Peer, treq *transport.Request) (*http.Request, error) {
Expand Down