Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Allow receiver to trigger flow messages to avoid deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
vcabbage committed Jun 3, 2018
1 parent e26c879 commit cc837c7
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ type link struct {
dynamicAddr bool // request a dynamic link address from the server
rx chan frameBody // sessions sends frames for this link on this channel
transfers chan performTransfer // sender uses for send; receiver uses for receive
receiverReady chan struct{} // receiver sends on channel to indicate ready for next frame
closeOnce sync.Once // closeOnce protects close from being closed multiple times
close chan struct{} // close signals the mux to shutdown
done chan struct{} // done is closed by mux/muxDetach when the link is fully detached
Expand Down Expand Up @@ -933,11 +934,12 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {

func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
l := &link{
name: randString(40),
session: s,
receiver: r,
close: make(chan struct{}),
done: make(chan struct{}),
name: randString(40),
session: s,
receiver: r,
close: make(chan struct{}),
done: make(chan struct{}),
receiverReady: make(chan struct{}, 1),
}

// configure options
Expand All @@ -961,18 +963,10 @@ func (l *link) mux() {

Loop:
for {
var outgoingTransfers chan performTransfer
switch {
// enable outgoing transfers case if sender and credits are available
case isSender && l.linkCredit > 0:
var outgoingTransfers chan performTransfer
if isSender && l.linkCredit > 0 {
outgoingTransfers = l.transfers

// if receiver && half maxCredits have been processed, send more credits
case isReceiver && l.linkCredit+uint32(len(l.transfers)) <= l.receiver.maxCredit/2:
l.err = l.muxFlow()
if l.err != nil {
return
}
}

select {
Expand Down Expand Up @@ -1011,6 +1005,15 @@ Loop:
}
}

case <-l.receiverReady:
// if more than half maxCredits have been processed, send more
if l.linkCredit <= l.receiver.maxCredit/2 {
l.err = l.muxFlow()
if l.err != nil {
return
}
}

case <-l.close:
l.err = ErrLinkClosed
return
Expand Down Expand Up @@ -1476,6 +1479,13 @@ type Receiver struct {
func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
r.buf.reset()

// indicate ready (ensures adequate link credits issued)
// TODO: This seems less than ideal, but it works. Consider alternatives.
select {
case r.link.receiverReady <- struct{}{}:
default:
}

msg := Message{receiver: r} // message to be decoded into

var (
Expand Down

0 comments on commit cc837c7

Please sign in to comment.