Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
WIP: change Data to [][]byte and add helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
vcabbage committed Feb 17, 2018
1 parent 5551c26 commit 15d9747
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 78 deletions.
6 changes: 2 additions & 4 deletions example_test.go
Expand Up @@ -40,9 +40,7 @@ func Example() {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

// Send message
err = sender.Send(ctx, &amqp.Message{
Data: []byte("Hello!"),
})
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
if err != nil {
log.Fatal("Sending message:", err)
}
Expand Down Expand Up @@ -75,7 +73,7 @@ func Example() {
// Accept message
msg.Accept()

fmt.Printf("Message received: %s\n", msg.Data)
fmt.Printf("Message received: %s\n", msg.GetData())
}
}
}
4 changes: 1 addition & 3 deletions fuzz.go
Expand Up @@ -61,9 +61,7 @@ func FuzzConn(data []byte) int {
return 0
}

err = sender.Send(context.Background(), &Message{
Data: []byte(data),
})
err = sender.Send(context.Background(), NewMessage(data))
if err != nil {
return 0
}
Expand Down
12 changes: 4 additions & 8 deletions integration_test.go
Expand Up @@ -122,9 +122,7 @@ func TestIntegrationRoundTrip(t *testing.T) {

for i, data := range tt.data {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = sender.Send(ctx, &amqp.Message{
Data: []byte(data),
})
err = sender.Send(ctx, amqp.NewMessage([]byte(data)))
cancel()
if err != nil {
sendErr = fmt.Errorf("Error after %d sends: %+v", i, err)
Expand Down Expand Up @@ -161,8 +159,8 @@ func TestIntegrationRoundTrip(t *testing.T) {
// Accept message
msg.Accept()

if !bytes.Equal([]byte(data), msg.Data) {
receiveErr = fmt.Errorf("Expected received message %d to be %v, but it was %v", i+1, string(data), string(msg.Data))
if !bytes.Equal([]byte(data), msg.GetData()) {
receiveErr = fmt.Errorf("Expected received message %d to be %v, but it was %v", i+1, string(data), string(msg.GetData()))
}
}
}()
Expand Down Expand Up @@ -240,9 +238,7 @@ func TestIntegrationSend(t *testing.T) {

for i, data := range tt.data {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = sender.Send(ctx, &amqp.Message{
Data: []byte(data),
})
err = sender.Send(ctx, amqp.NewMessage([]byte(data)))
cancel()
if err != nil {
t.Fatalf("Error after %d sends: %+v", i, err)
Expand Down
4 changes: 3 additions & 1 deletion marshal_test.go
Expand Up @@ -471,7 +471,9 @@ var (
ApplicationProperties: map[string]interface{}{
"baz": "foo",
},
Data: []byte("A nice little data payload."),
Data: [][]byte{
[]byte("A nice little data payload."),
},
Value: uint8(42),
Footer: Annotations{
"hash": []uint8{0, 1, 2, 34, 5, 6, 7, 8, 9, 0},
Expand Down
111 changes: 49 additions & 62 deletions types.go
Expand Up @@ -1700,26 +1700,13 @@ type Message struct {
// the possibility of a null key) and the values are restricted to be of
// simple types only, that is, excluding map, list, and array types.

// Message data payload.
//
// Data is set when the message contains a single data payload.
// If multiple data payload are included MultiData will be set.
//
// It is an error to set both Data and MultiData when sending a mesage.
Data []byte
// Data payloads.
Data [][]byte
// A data section contains opaque binary data.
// TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data:
// "The body consists of one of the following three choices: one or more data
// sections, one or more amqp-sequence sections, or a single amqp-value section."

// Multiple data payloads.
//
// MultiData is set when the message contains a multiple data payloads.
// If a single data payload is included Data will be set.
//
// It is an error to set both Data and MultiData when sending a mesage.
MultiData [][]byte

// Value payload.
Value interface{}
// An amqp-value section contains a single AMQP value.
Expand All @@ -1735,35 +1722,24 @@ type Message struct {
settled bool // whether transfer was settled by sender
}

// Annotations keys must be of type string, int, or int64.
// NewMessage returns a *Message with data as the payload.
//
// String keys are encoded as AMQP Symbols.
type Annotations map[interface{}]interface{}

func (a Annotations) marshal(wr *buffer) error {
return writeMap(wr, a)
}

func (a *Annotations) unmarshal(r *buffer) error {
_, count, err := readMapHeader(r)
if err != nil {
return err
// This constructor is intended as a helper for basic Messages with a
// single data payload. It is valid to construct a Message directly for
// more complex usages.
func NewMessage(data []byte) *Message {
return &Message{
Data: [][]byte{data},
}
}

m := make(Annotations, count/2)
for i := uint32(0); i < count; i += 2 {
key, err := readAny(r)
if err != nil {
return err
}
value, err := readAny(r)
if err != nil {
return err
}
m[key] = value
// GetData returns the first []byte from the Data field
// or nil if Data is empty.
func (m *Message) GetData() []byte {
if len(m.Data) < 1 {
return nil
}
*a = m
return nil
return m.Data[0]
}

// Accept notifies the server that the message has been
Expand Down Expand Up @@ -1796,10 +1772,6 @@ func (m *Message) shouldSendDisposition() bool {
// TODO: add support for sending Modified disposition

func (m *Message) marshal(wr *buffer) error {
if m.Data != nil && m.MultiData != nil {
return errorNew("amqp: cannot marshal message with Data and MultiData")
}

if m.Header != nil {
err := m.Header.marshal(wr)
if err != nil {
Expand Down Expand Up @@ -1838,15 +1810,7 @@ func (m *Message) marshal(wr *buffer) error {
}
}

if m.Data != nil {
writeDescriptor(wr, typeCodeApplicationData)
err := writeBinary(wr, m.Data)
if err != nil {
return err
}
}

for _, data := range m.MultiData {
for _, data := range m.Data {
writeDescriptor(wr, typeCodeApplicationData)
err := writeBinary(wr, data)
if err != nil {
Expand Down Expand Up @@ -1916,15 +1880,7 @@ func (m *Message) unmarshal(r *buffer) error {
return err
}

switch {
case m.MultiData != nil:
m.MultiData = append(m.MultiData, data)
case m.Data != nil:
m.MultiData = [][]byte{m.Data, data}
m.Data = nil
default:
m.Data = data
}
m.Data = append(m.Data, data)
continue

case typeCodeFooter:
Expand Down Expand Up @@ -1993,6 +1949,37 @@ func tryReadNull(r *buffer) bool {
return false
}

// Annotations keys must be of type string, int, or int64.
//
// String keys are encoded as AMQP Symbols.
type Annotations map[interface{}]interface{}

func (a Annotations) marshal(wr *buffer) error {
return writeMap(wr, a)
}

func (a *Annotations) unmarshal(r *buffer) error {
_, count, err := readMapHeader(r)
if err != nil {
return err
}

m := make(Annotations, count/2)
for i := uint32(0); i < count; i += 2 {
key, err := readAny(r)
if err != nil {
return err
}
value, err := readAny(r)
if err != nil {
return err
}
m[key] = value
}
*a = m
return nil
}

/*
<type name="header" class="composite" source="list" provides="section">
<descriptor name="amqp:header:list" code="0x00000000:0x00000070"/>
Expand Down

0 comments on commit 15d9747

Please sign in to comment.