Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Remove deliveryID type
Browse files Browse the repository at this point in the history
  • Loading branch information
vcabbage committed Jun 3, 2018
1 parent cc837c7 commit 2d09847
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 23 deletions.
30 changes: 14 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
// the first frame of the message
if first {
if fr.DeliveryID != nil {
msg.id = deliveryID(*fr.DeliveryID)
msg.deliveryID = *fr.DeliveryID
}

if fr.MessageFormat != nil {
Expand Down Expand Up @@ -1565,12 +1565,10 @@ func (r *Receiver) Close(ctx context.Context) error {
}

type messageDisposition struct {
id deliveryID
id uint32
state interface{}
}

type deliveryID uint32

func (r *Receiver) dispositionBatcher() {
// batch operations:
// Keep track of the first and last delivery ID, incrementing as
Expand All @@ -1581,8 +1579,8 @@ func (r *Receiver) dispositionBatcher() {
var (
batchSize = r.maxCredit
batchStarted bool
first deliveryID
last deliveryID
first uint32
last uint32
)

// create an unstarted timer
Expand All @@ -1602,15 +1600,15 @@ func (r *Receiver) dispositionBatcher() {
lastCopy := last
err := r.sendDisposition(first, &lastCopy, &stateAccepted{})
if err != nil {
r.inFlight.remove(uint32(first), (*uint32)(&lastCopy), err)
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
}

// send the current message
err := r.sendDisposition(msgDis.id, nil, msgDis.state)
if err != nil {
r.inFlight.remove(uint32(msgDis.id), nil, err)
r.inFlight.remove(msgDis.id, nil, err)
}
continue
}
Expand All @@ -1627,11 +1625,11 @@ func (r *Receiver) dispositionBatcher() {
}

// send batch if current size == batchSize
if uint32(last-first+1) >= batchSize {
if last-first+1 >= batchSize {
lastCopy := last
err := r.sendDisposition(first, &lastCopy, &stateAccepted{})
if err != nil {
r.inFlight.remove(uint32(first), (*uint32)(&lastCopy), err)
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
if !batchTimer.Stop() {
Expand All @@ -1644,7 +1642,7 @@ func (r *Receiver) dispositionBatcher() {
lastCopy := last
err := r.sendDisposition(first, &lastCopy, &stateAccepted{})
if err != nil {
r.inFlight.remove(uint32(first), (*uint32)(&lastCopy), err)
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
batchTimer.Stop()
Expand All @@ -1656,11 +1654,11 @@ func (r *Receiver) dispositionBatcher() {
}

// sendDisposition sends a disposition frame to the peer
func (r *Receiver) sendDisposition(first deliveryID, last *deliveryID, state interface{}) error {
func (r *Receiver) sendDisposition(first uint32, last *uint32, state interface{}) error {
fr := &performDisposition{
Role: roleReceiver,
First: uint32(first),
Last: (*uint32)(last),
First: first,
Last: last,
Settled: r.link.receiverSettleMode == nil || *r.link.receiverSettleMode == ModeFirst,
State: state,
}
Expand All @@ -1669,10 +1667,10 @@ func (r *Receiver) sendDisposition(first deliveryID, last *deliveryID, state int
return r.link.session.txFrame(fr, nil)
}

func (r *Receiver) messageDisposition(id deliveryID, state interface{}) error {
func (r *Receiver) messageDisposition(id uint32, state interface{}) error {
var wait chan error
if r.link.receiverSettleMode != nil && *r.link.receiverSettleMode == ModeSecond {
wait = r.inFlight.add(uint32(id))
wait = r.inFlight.add(id)
}

if r.batching {
Expand Down
14 changes: 7 additions & 7 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1721,9 +1721,9 @@ type Message struct {
// encryption details).
Footer Annotations

receiver *Receiver // Receiver the message was received from
id deliveryID // used when sending disposition
settled bool // whether transfer was settled by sender
receiver *Receiver // Receiver the message was received from
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
}

// NewMessage returns a *Message with data as the payload.
Expand Down Expand Up @@ -1752,7 +1752,7 @@ func (m *Message) Accept() error {
if !m.shouldSendDisposition() {
return nil
}
return m.receiver.messageDisposition(m.id, &stateAccepted{})
return m.receiver.messageDisposition(m.deliveryID, &stateAccepted{})
}

// Reject notifies the server that the message is invalid.
Expand All @@ -1762,7 +1762,7 @@ func (m *Message) Reject(e *Error) error {
if !m.shouldSendDisposition() {
return nil
}
return m.receiver.messageDisposition(m.id, &stateRejected{Error: e})
return m.receiver.messageDisposition(m.deliveryID, &stateRejected{Error: e})
}

// Release releases the message back to the server. The message
Expand All @@ -1771,7 +1771,7 @@ func (m *Message) Release() error {
if m.shouldSendDisposition() {
return nil
}
return m.receiver.messageDisposition(m.id, &stateReleased{})
return m.receiver.messageDisposition(m.deliveryID, &stateReleased{})
}

// Modify notifies the server that the message was not acted upon
Expand All @@ -1790,7 +1790,7 @@ func (m *Message) Modify(deliveryFailed, undeliverableHere bool, messageAnnotati
if !m.shouldSendDisposition() {
return nil
}
return m.receiver.messageDisposition(m.id, &stateModified{
return m.receiver.messageDisposition(m.deliveryID, &stateModified{
DeliveryFailed: deliveryFailed,
UndeliverableHere: undeliverableHere,
MessageAnnotations: messageAnnotations,
Expand Down

0 comments on commit 2d09847

Please sign in to comment.