Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Fix sender flow control and send/close race issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
vcabbage committed Dec 31, 2017
1 parent 18ca0db commit 0a9494d
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 78 deletions.
195 changes: 118 additions & 77 deletions client.go
Expand Up @@ -85,7 +85,7 @@ func (c *Client) NewSession() (*Session, error) {
s.txFrame(&performBegin{
NextOutgoingID: 0,
IncomingWindow: 1,
})
}, nil)

// wait for response
var fr frame
Expand Down Expand Up @@ -147,11 +147,12 @@ func (s *Session) Close() error {
}

// txFrame sends a frame to the connWriter
func (s *Session) txFrame(p frameBody) {
func (s *Session) txFrame(p frameBody, done chan struct{}) {
s.conn.wantWriteFrame(frame{
typ: frameTypeAMQP,
channel: s.remoteChannel,
body: p,
done: done,
})
}

Expand Down Expand Up @@ -219,6 +220,11 @@ func (s *Sender) Send(ctx context.Context, msg *Message) error {
for fr.More {
fr.Payload = append([]byte(nil), s.buf.Next(maxPayloadSize)...)
fr.More = s.buf.Len() > 0
if !fr.More {
// set done on last frame so it can be confirmed
// before returning to caller
fr.done = make(chan struct{})
}

select {
case s.link.transfers <- fr:
Expand All @@ -229,6 +235,15 @@ func (s *Sender) Send(ctx context.Context, msg *Message) error {
}
}

// wait for transfer to be confirmed
select {
case <-fr.done:
case <-s.link.done:
return s.link.err
case <-ctx.Done():
return errorWrapf(ctx.Err(), "awaiting send")
}

return nil
}

Expand Down Expand Up @@ -376,35 +391,39 @@ func (s *Session) mux() {
}

case fr := <-s.tx:
transfer, ok := fr.(*performTransfer)
if !ok {
s.txFrame(fr)
continue
}

id, ok := idsByDeliveryTag[string(transfer.DeliveryTag)]
if !ok {
// no entry for tag, allocate new DeliveryID
id = nextDeliveryID
nextDeliveryID++
switch fr := fr.(type) {
case *performTransfer:
id, ok := idsByDeliveryTag[string(fr.DeliveryTag)]
if !ok {
// no entry for tag, allocate new DeliveryID
id = nextDeliveryID
nextDeliveryID++

if transfer.More {
idsByDeliveryTag[string(transfer.DeliveryTag)] = id
if fr.More {
idsByDeliveryTag[string(fr.DeliveryTag)] = id
}
} else {
// existing entry indicates this isn't the first message,
// clear values that are only required on first message
fr.DeliveryTag = nil
fr.MessageFormat = nil

if !fr.More {
delete(idsByDeliveryTag, string(fr.DeliveryTag))
}
}
} else {
// existing entry indicates this isn't the first message,
// clear values that are only required on first message
transfer.DeliveryTag = nil
transfer.MessageFormat = nil

if !transfer.More {
delete(idsByDeliveryTag, string(transfer.DeliveryTag))
}
handlesByDeliveryID[id] = fr.Handle
fr.DeliveryID = &id
s.txFrame(fr, fr.done)
case *performFlow:
nextIncomingID := nextDeliveryID
fr.NextIncomingID = &nextIncomingID
fr.IncomingWindow = math.MaxUint32 // max number of transfer frames
s.txFrame(fr, nil)
default:
s.txFrame(fr, nil)
}

handlesByDeliveryID[id] = transfer.Handle
transfer.DeliveryID = &id
s.txFrame(transfer)
}
}
}
Expand Down Expand Up @@ -513,7 +532,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
}

// send Attach frame
s.txFrame(attach)
s.txFrame(attach, nil)

// wait for response
var fr frameBody
Expand Down Expand Up @@ -549,6 +568,51 @@ func (l *link) mux() {
isSender = !isReceiver
)

handleRx := func(fr frameBody) bool {
switch fr := fr.(type) {
// message frame
case *performTransfer:
if isSender {
// TODO: send error to remote
l.err = errorErrorf("Sender received transfer frame")
return false
}

l.transfers <- *fr

l.deliveryCount++
l.linkCredit--

// flow control frame
case *performFlow:
if isReceiver {
if fr.DeliveryCount != nil {
l.deliveryCount = *fr.DeliveryCount
}
} else {
l.linkCredit = *fr.LinkCredit - (l.deliveryCount - *fr.DeliveryCount)
}

// remote side is closing links
case *performDetach:
// don't currently support link detach and reattach
if !fr.Closed {
l.err = errorErrorf("non-closing detach not supported: %+v", fr)
return false
}

// set detach received and close link
l.detachReceived = true

l.err = errorWrapf(DetachError{fr.Error}, "received detach frame")
return false

default:
fmt.Printf("Unexpected frame: %T - %+v\n", fr, fr)
}
return true
}

for {
var outgoingTransfers chan performTransfer
if isSender && l.linkCredit > 0 {
Expand All @@ -558,19 +622,20 @@ func (l *link) mux() {
// if receiver and linkCredit is half used, send more
if isReceiver && l.linkCredit < l.receiver.maxCredit/2 {
var (
creditDiff = l.receiver.maxCredit - l.linkCredit
deliveryCount = l.deliveryCount // copy because sent by pointer below; prevent race
// copy because sent by pointer below; prevent race
linkCredit = l.receiver.maxCredit
deliveryCount = l.deliveryCount
)

// send flow
l.session.txFrame(&performFlow{
IncomingWindow: math.MaxUint32, // max number of transfer frames
fr := &performFlow{
NextOutgoingID: 0,
OutgoingWindow: 0,
Handle: &l.handle,
DeliveryCount: &deliveryCount,
LinkCredit: &creditDiff, // max number of messages
})
LinkCredit: &linkCredit, // max number of messages
}
l.session.txFrame(fr, nil)

// reset credit
l.linkCredit = l.receiver.maxCredit
Expand All @@ -579,7 +644,23 @@ func (l *link) mux() {
select {
// send data
case tr := <-outgoingTransfers:
l.session.tx <- &tr // TODO: don't block
// Ensure we never block the session mux
Loop:
for {
select {
case l.session.tx <- &tr:
break Loop
case fr := <-l.rx:
if !handleRx(fr) {
return
}
case <-l.close:
return
case <-l.session.conn.done:
l.err = l.session.conn.getErr()
return
}
}
l.deliveryCount++
l.linkCredit--

Expand Down Expand Up @@ -615,49 +696,9 @@ func (l *link) mux() {

// received frame
case fr := <-l.rx:
switch fr := fr.(type) {
// message frame
case *performTransfer:
if isSender {
// TODO: send error to remote
l.err = errorErrorf("Sender received transfer frame")
return
}

l.transfers <- *fr

l.deliveryCount++
l.linkCredit--

// flow control frame
case *performFlow:
if isReceiver {
// TODO: send error to remote
l.err = errorErrorf("Receiver received flow frame")
return
}

l.linkCredit = *fr.DeliveryCount + *fr.LinkCredit - l.deliveryCount
l.deliveryCount = 0

// remote side is closing links
case *performDetach:
// don't currently support link detach and reattach
if !fr.Closed {
l.err = errorErrorf("non-closing detach not supported: %+v", fr)
return
}

// set detach received and close link
l.detachReceived = true

l.err = errorWrapf(DetachError{fr.Error}, "received detach frame")
if !handleRx(fr) {
return

default:
fmt.Printf("Unexpected frame: %T - %+v\n", fr, fr)
}

case <-l.close:
return
case <-l.session.conn.done:
Expand Down Expand Up @@ -945,7 +986,7 @@ func (r *Receiver) sendDisposition(first deliveryID, last *deliveryID, disp disp
fr.State = new(stateReleased)
}

r.link.session.txFrame(fr)
r.link.session.txFrame(fr, nil)
}

func (r *Receiver) acceptMessage(id deliveryID) {
Expand Down
12 changes: 11 additions & 1 deletion conn.go
Expand Up @@ -276,7 +276,14 @@ func (c *conn) mux() {
c.err = errorErrorf("unexpected frame: %#v", fr.body)
continue
}
ch.rx <- fr

// TODO: handle session deletion while sending frame to
// session mux?
select {
case ch.rx <- fr:
case <-c.done:
return
}

// new session request
//
Expand Down Expand Up @@ -467,6 +474,9 @@ func (c *conn) connWriter() {
// frame write request
case fr := <-c.txFrame:
err = c.writeFrame(fr)
if err == nil && fr.done != nil {
close(fr.done)
}

// keepalive timer
case <-keepalive:
Expand Down
2 changes: 2 additions & 0 deletions integration_test.go
Expand Up @@ -160,6 +160,7 @@ func TestIntegrationRoundTrip(t *testing.T) {

checkLeaks() // this is done here because queuesClient starts additional goroutines

// Wait for Azure to update stats
time.Sleep(1 * time.Second)

q, err := queuesClient.Get(resourceGroup, namespace, queueName)
Expand Down Expand Up @@ -234,6 +235,7 @@ func TestIntegrationSend(t *testing.T) {

checkLeaks() // this is done here because queuesClient starts additional goroutines

// Wait for Azure to update stats
time.Sleep(1 * time.Second)

q, err := queuesClient.Get(resourceGroup, namespace, queueName)
Expand Down

0 comments on commit 0a9494d

Please sign in to comment.