Skip to content

Commit

Permalink
Add error to onFinish closure (required some refactors to avoid defer)
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Hughes committed Dec 8, 2016
1 parent 06dc847 commit 771453b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
2 changes: 1 addition & 1 deletion peer/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Chooser interface {
Stop() error

// Choose a Peer for the next call, block until a peer is available (or timeout)
Choose(context.Context, *transport.Request) (peer Peer, onFinish func(), err error)
Choose(context.Context, *transport.Request) (peer Peer, onFinish func(error), err error)
}

// List listens to adds and removes of Peers from a PeerProvider
Expand Down
4 changes: 2 additions & 2 deletions peer/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func NewSingle(pid Identifier, transport Transport) *Single {
}

// Choose returns the single peer
func (s *Single) Choose(context.Context, *transport.Request) (Peer, func(), error) {
func (s *Single) Choose(context.Context, *transport.Request) (Peer, func(error), error) {
s.p.StartRequest(s)
return s.p, s.onFinish, s.err
}

func (s *Single) onFinish() {
func (s *Single) onFinish(_ error) {
s.p.EndRequest(s)
}

Expand Down
14 changes: 9 additions & 5 deletions peer/x/roundrobin/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,16 @@ func (pl *List) removeFromUnavailablePeers(p peer.Peer) {
}

// Choose selects the next available peer in the round robin
func (pl *List) Choose(ctx context.Context, req *transport.Request) (peer.Peer, func(), error) {
func (pl *List) Choose(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) {
if !pl.isRunning() {
return nil, nil, peer.ErrPeerListNotStarted("RoundRobinList")
}

for {
if nextPeer := pl.nextPeer(); nextPeer != nil {
pl.notifyPeerAvailable()
onFinish := func() {
nextPeer.EndRequest(pl)
}
nextPeer.StartRequest(pl)
return nextPeer, onFinish, nil
return nextPeer, pl.getOnFinishFunc(nextPeer), nil
}

if err := pl.waitForPeerAddedEvent(ctx); err != nil {
Expand Down Expand Up @@ -257,6 +254,13 @@ func (pl *List) notifyPeerAvailable() {
}
}

// getOnFinishFunc creates a closure that will be run at the end of the request
func (pl *List) getOnFinishFunc(p peer.Peer) func(error) {
return func(_ error) {
p.EndRequest(pl)
}
}

// waitForPeerAddedEvent waits until a peer is added to the peer list or the
// given context finishes.
// Must NOT be run in a mutex.Lock()
Expand Down
17 changes: 15 additions & 2 deletions transport/http/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,21 @@ func (o *Outbound) call(ctx context.Context, treq *transport.Request, start time
if err != nil {
return nil, err
}
defer onFinish()

resp, err := o.callWithPeer(ctx, treq, start, ttl, p)

// Call the onFinish method right before returning (with the error from call with peer)
onFinish(err)
return resp, err
}

func (o *Outbound) callWithPeer(
ctx context.Context,
treq *transport.Request,
start time.Time,
ttl time.Duration,
p *hostport.Peer,
) (*transport.Response, error) {
req, err := o.createRequest(p, treq)
if err != nil {
return nil, err
Expand Down Expand Up @@ -234,7 +247,7 @@ func (o *Outbound) call(ctx context.Context, treq *transport.Request, start time
return nil, getErrFromResponse(response)
}

func (o *Outbound) getPeerForRequest(ctx context.Context, treq *transport.Request) (*hostport.Peer, func(), error) {
func (o *Outbound) getPeerForRequest(ctx context.Context, treq *transport.Request) (*hostport.Peer, func(error), error) {
p, onFinish, err := o.chooser.Choose(ctx, treq)
if err != nil {
return nil, nil, err
Expand Down

0 comments on commit 771453b

Please sign in to comment.