Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Wait for response disposition when Receiver w/ rcv-settle-mode second.
Browse files Browse the repository at this point in the history
Fixed some potential bugs due to receiver and sender dispostions
being treated them same in Session.mux.
  • Loading branch information
vcabbage committed Jun 18, 2018
1 parent 0438881 commit 37d5c26
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 52 deletions.
154 changes: 128 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ func (s *Session) Close(ctx context.Context) error {
}

// txFrame sends a frame to the connWriter
func (s *Session) txFrame(p frameBody, done chan deliveryState) {
s.conn.wantWriteFrame(frame{
func (s *Session) txFrame(p frameBody, done chan deliveryState) error {
return s.conn.wantWriteFrame(frame{
type_: frameTypeAMQP,
channel: s.channel,
body: p,
Expand Down Expand Up @@ -455,8 +455,9 @@ func (s *Session) mux(remoteBegin *performBegin) {
linksByName = make(map[string]*link) // maping of names to links
handles = &bitmap{max: s.handleMax} // allocated handles

handlesByDeliveryID = make(map[uint32]uint32) // mapping of deliveryIDs to handles
deliveryIDByHandle = make(map[uint32]uint32) // mapping of handles to latest deliveryID
handlesByDeliveryID = make(map[uint32]uint32) // mapping of deliveryIDs to handles
deliveryIDByHandle = make(map[uint32]uint32) // mapping of handles to latest deliveryID
handlesByRemoteDeliveryID = make(map[uint32]uint32) // mapping of remote deliveryID to handles

settlementByDeliveryID = make(map[uint32]chan deliveryState)

Expand Down Expand Up @@ -542,13 +543,18 @@ func (s *Session) mux(remoteBegin *performBegin) {
end = *body.Last
}
for deliveryID := start; deliveryID <= end; deliveryID++ {
handle, ok := handlesByDeliveryID[deliveryID]
handles := handlesByDeliveryID
if body.Role == roleSender {
handles = handlesByRemoteDeliveryID
}

handle, ok := handles[deliveryID]
if !ok {
continue
}
delete(handlesByDeliveryID, deliveryID)
delete(handles, deliveryID)

if body.Settled {
if body.Settled && body.Role == roleReceiver {
// check if settlement confirmation was requested, if so
// confirm by closing channel
if done, ok := settlementByDeliveryID[deliveryID]; ok {
Expand Down Expand Up @@ -658,6 +664,11 @@ func (s *Session) mux(remoteBegin *performBegin) {
case link.rx <- fr.body:
}

// if this message is received unsettled and link rcv-settle-mode == second, add to handlesByRemoteDeliveryID
if !body.Settled && body.DeliveryID != nil && link.receiverSettleMode != nil && *link.receiverSettleMode == ModeSecond {
handlesByRemoteDeliveryID[*body.DeliveryID] = body.Handle
}

// Update peer's outgoing window if half has been consumed.
if remoteOutgoingWindow < s.incomingWindow/2 {
nID := nextIncomingID
Expand Down Expand Up @@ -1027,6 +1038,7 @@ Loop:

// muxFlow sends tr to the session mux.
func (l *link) muxFlow() error {
// copy because sent by pointer below; prevent race
var (
linkCredit = l.receiver.maxCredit - uint32(len(l.messages))
deliveryCount = l.deliveryCount
Expand Down Expand Up @@ -1068,7 +1080,7 @@ func (l *link) muxReceive(fr performTransfer) error {
// the first frame of the message
if !l.more {
if fr.DeliveryID != nil {
l.msg.id = deliveryID(*fr.DeliveryID)
l.msg.deliveryID = *fr.DeliveryID
}

if fr.MessageFormat != nil {
Expand Down Expand Up @@ -1191,6 +1203,11 @@ func (l *link) muxHandleFrame(fr frameBody) error {
case *performDisposition:
debug(3, "RX: %s", fr)

// Unblock receivers waiting for message disposition
if l.receiver != nil {
l.receiver.inFlight.remove(fr.First, fr.Last, nil)
}

// If sending async and a message is rejected, cause a link error.
//
// This isn't ideal, but there isn't a clear better way to handle it.
Expand Down Expand Up @@ -1261,8 +1278,13 @@ func (l *link) muxDetach() {
}
}

// signal other goroutines that links is done
// signal other goroutines that link is done
close(l.done)

// unblock any in flight message dispositions
if l.receiver != nil {
l.receiver.inFlight.clear(l.err)
}
}()

// "A peer closes a link by sending the detach frame with the
Expand Down Expand Up @@ -1513,6 +1535,7 @@ type Receiver struct {
batchMaxAge time.Duration // maximum time between the start n batch and sending the batch to the server
dispositions chan messageDisposition // message dispositions are sent on this channel when batching is enabled
maxCredit uint32 // maximum allowed inflight messages
inFlight inFlight // used to track message disposition when rcv-settle-mode == second
}

// Receive returns the next message from the sender.
Expand Down Expand Up @@ -1556,24 +1579,22 @@ func (r *Receiver) Close(ctx context.Context) error {
}

type messageDisposition struct {
id deliveryID
id uint32
state interface{}
}

type deliveryID uint32

func (r *Receiver) dispositionBatcher() {
// batch operations:
// Keep track of the first and last delivery ID, incrementing as
// Accept() is called. After last-first == linkCredit, send disposition.
// Accept() is called. After last-first == batchSize, send disposition.
// If Reject()/Release() is called, send one disposition for previously
// accepted, and one for the rejected/released message. If messages are
// accepted out of order, send any existing batch and the current message.
var (
batchSize = r.maxCredit
batchStarted bool
first deliveryID
last deliveryID
first uint32
last uint32
)

// create an unstarted timer
Expand All @@ -1591,12 +1612,18 @@ func (r *Receiver) dispositionBatcher() {
// send the current batch, if any
if batchStarted {
lastCopy := last
r.sendDisposition(first, &lastCopy, &stateAccepted{})
err := r.sendDisposition(first, &lastCopy, &stateAccepted{})
if err != nil {
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
}

// send the current message
r.sendDisposition(msgDis.id, nil, msgDis.state)
err := r.sendDisposition(msgDis.id, nil, msgDis.state)
if err != nil {
r.inFlight.remove(msgDis.id, nil, err)
}
continue
}

Expand All @@ -1612,9 +1639,12 @@ func (r *Receiver) dispositionBatcher() {
}

// send batch if current size == batchSize
if uint32(last-first+1) >= batchSize {
if last-first+1 >= batchSize {
lastCopy := last
r.sendDisposition(first, &lastCopy, &stateAccepted{})
err := r.sendDisposition(first, &lastCopy, &stateAccepted{})
if err != nil {
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
if !batchTimer.Stop() {
<-batchTimer.C // batch timer must be drained if stop returns false
Expand All @@ -1624,7 +1654,10 @@ func (r *Receiver) dispositionBatcher() {
// maxBatchAge elapsed, send batch
case <-batchTimer.C:
lastCopy := last
r.sendDisposition(first, &lastCopy, &stateAccepted{})
err := r.sendDisposition(first, &lastCopy, &stateAccepted{})
if err != nil {
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
batchTimer.Stop()

Expand All @@ -1635,25 +1668,94 @@ func (r *Receiver) dispositionBatcher() {
}

// sendDisposition sends a disposition frame to the peer
func (r *Receiver) sendDisposition(first deliveryID, last *deliveryID, state interface{}) {
func (r *Receiver) sendDisposition(first uint32, last *uint32, state interface{}) error {
fr := &performDisposition{
Role: roleReceiver,
First: uint32(first),
Last: (*uint32)(last),
First: first,
Last: last,
Settled: r.link.receiverSettleMode == nil || *r.link.receiverSettleMode == ModeFirst,
State: state,
}

debug(1, "TX: %s", fr)
r.link.session.txFrame(fr, nil)
return r.link.session.txFrame(fr, nil)
}

func (r *Receiver) messageDisposition(id deliveryID, state interface{}) {
func (r *Receiver) messageDisposition(id uint32, state interface{}) error {
var wait chan error
if r.link.receiverSettleMode != nil && *r.link.receiverSettleMode == ModeSecond {
wait = r.inFlight.add(id)
}

if r.batching {
r.dispositions <- messageDisposition{id: id, state: state}
} else {
err := r.sendDisposition(id, nil, state)
if err != nil {
return err
}
}

if wait == nil {
return nil
}

return <-wait
}

// inFlight tracks in-flight message dispositions allowing receivers
// to block waiting for the server to respond when an appropriate
// settlement mode is configured.
type inFlight struct {
mu sync.Mutex
m map[uint32]chan error
}

func (f *inFlight) add(id uint32) chan error {
wait := make(chan error, 1)

f.mu.Lock()
if f.m == nil {
f.m = map[uint32]chan error{id: wait}
} else {
f.m[id] = wait
}
f.mu.Unlock()

return wait
}

func (f *inFlight) remove(first uint32, last *uint32, err error) {
f.mu.Lock()

if f.m == nil {
f.mu.Unlock()
return
}
r.sendDisposition(id, nil, state)

ll := first
if last != nil {
ll = *last
}

for i := first; i <= ll; i++ {
wait, ok := f.m[i]
if ok {
wait <- err
delete(f.m, i)
}
}

f.mu.Unlock()
}

func (f *inFlight) clear(err error) {
f.mu.Lock()
for id, wait := range f.m {
wait <- err
delete(f.m, id)
}
f.mu.Unlock()
}

const maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
Expand Down
4 changes: 3 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,12 @@ var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}

// wantWriteFrame is used by sessions and links to send frame to
// connWriter.
func (c *conn) wantWriteFrame(fr frame) {
func (c *conn) wantWriteFrame(fr frame) error {
select {
case c.txFrame <- fr:
return nil
case <-c.done:
return c.getErr()
}
}

Expand Down

0 comments on commit 37d5c26

Please sign in to comment.