diff --git a/example_test.go b/example_test.go index 2b10fc1f..6c385852 100644 --- a/example_test.go +++ b/example_test.go @@ -40,9 +40,7 @@ func Example() { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // Send message - err = sender.Send(ctx, &amqp.Message{ - Data: []byte("Hello!"), - }) + err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!"))) if err != nil { log.Fatal("Sending message:", err) } @@ -75,7 +73,7 @@ func Example() { // Accept message msg.Accept() - fmt.Printf("Message received: %s\n", msg.Data) + fmt.Printf("Message received: %s\n", msg.GetData()) } } } diff --git a/fuzz.go b/fuzz.go index 570ed239..5797d519 100644 --- a/fuzz.go +++ b/fuzz.go @@ -61,9 +61,7 @@ func FuzzConn(data []byte) int { return 0 } - err = sender.Send(context.Background(), &Message{ - Data: []byte(data), - }) + err = sender.Send(context.Background(), NewMessage(data)) if err != nil { return 0 } diff --git a/integration_test.go b/integration_test.go index f55ffb13..79a8c1c3 100644 --- a/integration_test.go +++ b/integration_test.go @@ -122,9 +122,7 @@ func TestIntegrationRoundTrip(t *testing.T) { for i, data := range tt.data { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - err = sender.Send(ctx, &amqp.Message{ - Data: []byte(data), - }) + err = sender.Send(ctx, amqp.NewMessage([]byte(data))) cancel() if err != nil { sendErr = fmt.Errorf("Error after %d sends: %+v", i, err) @@ -161,8 +159,8 @@ func TestIntegrationRoundTrip(t *testing.T) { // Accept message msg.Accept() - if !bytes.Equal([]byte(data), msg.Data) { - receiveErr = fmt.Errorf("Expected received message %d to be %v, but it was %v", i+1, string(data), string(msg.Data)) + if !bytes.Equal([]byte(data), msg.GetData()) { + receiveErr = fmt.Errorf("Expected received message %d to be %v, but it was %v", i+1, string(data), string(msg.GetData())) } } }() @@ -240,9 +238,7 @@ func TestIntegrationSend(t *testing.T) { for i, data := range tt.data { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - err = sender.Send(ctx, &amqp.Message{ - Data: []byte(data), - }) + err = sender.Send(ctx, amqp.NewMessage([]byte(data))) cancel() if err != nil { t.Fatalf("Error after %d sends: %+v", i, err) diff --git a/marshal_test.go b/marshal_test.go index 933051e7..fceb8a84 100644 --- a/marshal_test.go +++ b/marshal_test.go @@ -471,7 +471,9 @@ var ( ApplicationProperties: map[string]interface{}{ "baz": "foo", }, - Data: []byte("A nice little data payload."), + Data: [][]byte{ + []byte("A nice little data payload."), + }, Value: uint8(42), Footer: Annotations{ "hash": []uint8{0, 1, 2, 34, 5, 6, 7, 8, 9, 0}, diff --git a/types.go b/types.go index 73d15c7c..1f810ecc 100644 --- a/types.go +++ b/types.go @@ -1700,26 +1700,13 @@ type Message struct { // the possibility of a null key) and the values are restricted to be of // simple types only, that is, excluding map, list, and array types. - // Message data payload. - // - // Data is set when the message contains a single data payload. - // If multiple data payload are included MultiData will be set. - // - // It is an error to set both Data and MultiData when sending a mesage. - Data []byte + // Data payloads. + Data [][]byte // A data section contains opaque binary data. // TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data: // "The body consists of one of the following three choices: one or more data // sections, one or more amqp-sequence sections, or a single amqp-value section." - // Multiple data payloads. - // - // MultiData is set when the message contains a multiple data payloads. - // If a single data payload is included Data will be set. - // - // It is an error to set both Data and MultiData when sending a mesage. - MultiData [][]byte - // Value payload. Value interface{} // An amqp-value section contains a single AMQP value. @@ -1735,35 +1722,24 @@ type Message struct { settled bool // whether transfer was settled by sender } -// Annotations keys must be of type string, int, or int64. +// NewMessage returns a *Message with data as the payload. // -// String keys are encoded as AMQP Symbols. -type Annotations map[interface{}]interface{} - -func (a Annotations) marshal(wr *buffer) error { - return writeMap(wr, a) -} - -func (a *Annotations) unmarshal(r *buffer) error { - _, count, err := readMapHeader(r) - if err != nil { - return err +// This constructor is intended as a helper for basic Messages with a +// single data payload. It is valid to construct a Message directly for +// more complex usages. +func NewMessage(data []byte) *Message { + return &Message{ + Data: [][]byte{data}, } +} - m := make(Annotations, count/2) - for i := uint32(0); i < count; i += 2 { - key, err := readAny(r) - if err != nil { - return err - } - value, err := readAny(r) - if err != nil { - return err - } - m[key] = value +// GetData returns the first []byte from the Data field +// or nil if Data is empty. +func (m *Message) GetData() []byte { + if len(m.Data) < 1 { + return nil } - *a = m - return nil + return m.Data[0] } // Accept notifies the server that the message has been @@ -1796,10 +1772,6 @@ func (m *Message) shouldSendDisposition() bool { // TODO: add support for sending Modified disposition func (m *Message) marshal(wr *buffer) error { - if m.Data != nil && m.MultiData != nil { - return errorNew("amqp: cannot marshal message with Data and MultiData") - } - if m.Header != nil { err := m.Header.marshal(wr) if err != nil { @@ -1838,15 +1810,7 @@ func (m *Message) marshal(wr *buffer) error { } } - if m.Data != nil { - writeDescriptor(wr, typeCodeApplicationData) - err := writeBinary(wr, m.Data) - if err != nil { - return err - } - } - - for _, data := range m.MultiData { + for _, data := range m.Data { writeDescriptor(wr, typeCodeApplicationData) err := writeBinary(wr, data) if err != nil { @@ -1916,15 +1880,7 @@ func (m *Message) unmarshal(r *buffer) error { return err } - switch { - case m.MultiData != nil: - m.MultiData = append(m.MultiData, data) - case m.Data != nil: - m.MultiData = [][]byte{m.Data, data} - m.Data = nil - default: - m.Data = data - } + m.Data = append(m.Data, data) continue case typeCodeFooter: @@ -1993,6 +1949,37 @@ func tryReadNull(r *buffer) bool { return false } +// Annotations keys must be of type string, int, or int64. +// +// String keys are encoded as AMQP Symbols. +type Annotations map[interface{}]interface{} + +func (a Annotations) marshal(wr *buffer) error { + return writeMap(wr, a) +} + +func (a *Annotations) unmarshal(r *buffer) error { + _, count, err := readMapHeader(r) + if err != nil { + return err + } + + m := make(Annotations, count/2) + for i := uint32(0); i < count; i += 2 { + key, err := readAny(r) + if err != nil { + return err + } + value, err := readAny(r) + if err != nil { + return err + } + m[key] = value + } + *a = m + return nil +} + /*