diff --git a/core/msg/message.go b/core/msg/message.go index 4afe072..dd17dc7 100644 --- a/core/msg/message.go +++ b/core/msg/message.go @@ -15,6 +15,9 @@ package msg import ( "bytes" + "encoding/binary" + "errors" + "io" "github.com/golang/protobuf/proto" "github.com/wolfstudy/pulsar-client-go/pkg/api" @@ -38,3 +41,54 @@ func (m *Message) Equal(other *Message) bool { proto.Equal(m.Meta, other.Meta) && bytes.Equal(m.Payload, other.Payload) } + +// SingleMessage represents one of the elements of the batch type payload +type SingleMessage struct { + SingleMetaSize uint32 + SingleMeta *api.SingleMessageMetadata + SinglePayload []byte +} + +// DecodeBatchMessage decode message if num_messages_in_batch exist and bigger than 0 +func DecodeBatchMessage(msg *Message) ([]*SingleMessage, error) { + num := msg.Meta.GetNumMessagesInBatch() + if num == 0 { + return nil, errors.New("num_message_in_batch is nil or 0") + } + return DecodeBatchPayload(msg.Payload, num) +} + +// DecodeBatchPayload parses the payload of the batch type +// If the producer uses the batch function, msg.Payload will be a SingleMessage array structure. +func DecodeBatchPayload(bp []byte, batchNum int32) ([]*SingleMessage, error) { + buf32 := make([]byte, 4) + rdBuf := bytes.NewReader(bp) + list := make([]*SingleMessage, 0, batchNum) + for i := int32(0); i < batchNum; i++ { + // singleMetaSize + if _, err := io.ReadFull(rdBuf, buf32); err != nil { + return nil, err + } + singleMetaSize := binary.BigEndian.Uint32(buf32) + // singleMeta + singleMetaBuf := make([]byte, singleMetaSize) + if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil { + return nil, err + } + singleMeta := new(api.SingleMessageMetadata) + if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil { + return nil, err + } + // payload + singlePayload := make([]byte, singleMeta.GetPayloadSize()) + if _, err := io.ReadFull(rdBuf, singlePayload); err != nil { + return nil, err + } + d := &SingleMessage{} + d.SingleMetaSize = singleMetaSize + d.SingleMeta = singleMeta + d.SinglePayload = singlePayload + list = append(list, d) + } + return list, nil +} diff --git a/core/msg/message_test.go b/core/msg/message_test.go new file mode 100644 index 0000000..bfcff94 --- /dev/null +++ b/core/msg/message_test.go @@ -0,0 +1,21 @@ +package msg + +import ( + "testing" +) + +func TestDecodeBatchPayload(t *testing.T) { + payload := []byte{0, 0, 0, 2, 24, 12, 104, 101, 108, 108, 111, 45, 112, 117, 108, 115, 97, 114} // hello-pulsar + list, err := DecodeBatchPayload(payload, 1) + if err != nil { + t.Fatal(err) + } + if get, want := len(list), 1; get != want { + t.Errorf("want %v, but get %v", get, want) + } + + m := list[0] + if get, want := string(m.SinglePayload), "hello-pulsar"; get != want { + t.Errorf("want %v, but get %v", get, want) + } +}