Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Support multiple data sections.
Browse files Browse the repository at this point in the history
* `Message.Data` changed to `[][]byte`.
* Added `NewMessage(data []byte)` and `Message.Data() []byte` helpers
  for setting/retrieving single data section messages.
  • Loading branch information
vcabbage committed Feb 17, 2018
1 parent 54320c8 commit 08befdd
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 47 deletions.
6 changes: 2 additions & 4 deletions example_test.go
Expand Up @@ -40,9 +40,7 @@ func Example() {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

// Send message
err = sender.Send(ctx, &amqp.Message{
Data: []byte("Hello!"),
})
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
if err != nil {
log.Fatal("Sending message:", err)
}
Expand Down Expand Up @@ -75,7 +73,7 @@ func Example() {
// Accept message
msg.Accept()

fmt.Printf("Message received: %s\n", msg.Data)
fmt.Printf("Message received: %s\n", msg.GetData())
}
}
}
4 changes: 1 addition & 3 deletions fuzz.go
Expand Up @@ -61,9 +61,7 @@ func FuzzConn(data []byte) int {
return 0
}

err = sender.Send(context.Background(), &Message{
Data: []byte(data),
})
err = sender.Send(context.Background(), NewMessage(data))
if err != nil {
return 0
}
Expand Down
12 changes: 4 additions & 8 deletions integration_test.go
Expand Up @@ -122,9 +122,7 @@ func TestIntegrationRoundTrip(t *testing.T) {

for i, data := range tt.data {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = sender.Send(ctx, &amqp.Message{
Data: []byte(data),
})
err = sender.Send(ctx, amqp.NewMessage([]byte(data)))
cancel()
if err != nil {
sendErr = fmt.Errorf("Error after %d sends: %+v", i, err)
Expand Down Expand Up @@ -161,8 +159,8 @@ func TestIntegrationRoundTrip(t *testing.T) {
// Accept message
msg.Accept()

if !bytes.Equal([]byte(data), msg.Data) {
receiveErr = fmt.Errorf("Expected received message %d to be %v, but it was %v", i+1, string(data), string(msg.Data))
if !bytes.Equal([]byte(data), msg.GetData()) {
receiveErr = fmt.Errorf("Expected received message %d to be %v, but it was %v", i+1, string(data), string(msg.GetData()))
}
}
}()
Expand Down Expand Up @@ -240,9 +238,7 @@ func TestIntegrationSend(t *testing.T) {

for i, data := range tt.data {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = sender.Send(ctx, &amqp.Message{
Data: []byte(data),
})
err = sender.Send(ctx, amqp.NewMessage([]byte(data)))
cancel()
if err != nil {
t.Fatalf("Error after %d sends: %+v", i, err)
Expand Down
5 changes: 4 additions & 1 deletion marshal_test.go
Expand Up @@ -471,7 +471,10 @@ var (
ApplicationProperties: map[string]interface{}{
"baz": "foo",
},
Data: []byte("A nice little data payload."),
Data: [][]byte{
[]byte("A nice little data payload."),
[]byte("More payload."),
},
Value: uint8(42),
Footer: Annotations{
"hash": []uint8{0, 1, 2, 34, 5, 6, 7, 8, 9, 0},
Expand Down
91 changes: 60 additions & 31 deletions types.go
Expand Up @@ -1700,10 +1700,10 @@ type Message struct {
// the possibility of a null key) and the values are restricted to be of
// simple types only, that is, excluding map, list, and array types.

// Message payload.
Data []byte
// Data payloads.
Data [][]byte
// A data section contains opaque binary data.
// TODO: this could be data(s), amqp-sequence(s), amqp-value rather than singe data:
// TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data:
// "The body consists of one of the following three choices: one or more data
// sections, one or more amqp-sequence sections, or a single amqp-value section."

Expand All @@ -1722,35 +1722,24 @@ type Message struct {
settled bool // whether transfer was settled by sender
}

// Annotations keys must be of type string, int, or int64.
// NewMessage returns a *Message with data as the payload.
//
// String keys are encoded as AMQP Symbols.
type Annotations map[interface{}]interface{}

func (a Annotations) marshal(wr *buffer) error {
return writeMap(wr, a)
}

func (a *Annotations) unmarshal(r *buffer) error {
_, count, err := readMapHeader(r)
if err != nil {
return err
// This constructor is intended as a helper for basic Messages with a
// single data payload. It is valid to construct a Message directly for
// more complex usages.
func NewMessage(data []byte) *Message {
return &Message{
Data: [][]byte{data},
}
}

m := make(Annotations, count/2)
for i := uint32(0); i < count; i += 2 {
key, err := readAny(r)
if err != nil {
return err
}
value, err := readAny(r)
if err != nil {
return err
}
m[key] = value
// GetData returns the first []byte from the Data field
// or nil if Data is empty.
func (m *Message) GetData() []byte {
if len(m.Data) < 1 {
return nil
}
*a = m
return nil
return m.Data[0]
}

// Accept notifies the server that the message has been
Expand Down Expand Up @@ -1821,9 +1810,9 @@ func (m *Message) marshal(wr *buffer) error {
}
}

if m.Data != nil {
for _, data := range m.Data {
writeDescriptor(wr, typeCodeApplicationData)
err := writeBinary(wr, m.Data)
err := writeBinary(wr, data)
if err != nil {
return err
}
Expand Down Expand Up @@ -1883,7 +1872,16 @@ func (m *Message) unmarshal(r *buffer) error {
section = &m.ApplicationProperties

case typeCodeApplicationData:
section = &m.Data
r.skip(3)

var data []byte
err = unmarshal(r, &data)
if err != nil {
return err
}

m.Data = append(m.Data, data)
continue

case typeCodeFooter:
section = &m.Footer
Expand Down Expand Up @@ -1951,6 +1949,37 @@ func tryReadNull(r *buffer) bool {
return false
}

// Annotations keys must be of type string, int, or int64.
//
// String keys are encoded as AMQP Symbols.
type Annotations map[interface{}]interface{}

func (a Annotations) marshal(wr *buffer) error {
return writeMap(wr, a)
}

func (a *Annotations) unmarshal(r *buffer) error {
_, count, err := readMapHeader(r)
if err != nil {
return err
}

m := make(Annotations, count/2)
for i := uint32(0); i < count; i += 2 {
key, err := readAny(r)
if err != nil {
return err
}
value, err := readAny(r)
if err != nil {
return err
}
m[key] = value
}
*a = m
return nil
}

/*
<type name="header" class="composite" source="list" provides="section">
<descriptor name="amqp:header:list" code="0x00000000:0x00000070"/>
Expand Down

0 comments on commit 08befdd

Please sign in to comment.