forked from ThreeDotsLabs/watermill-redisstream
/
marshaller.go
65 lines (52 loc) · 1.58 KB
/
marshaller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package redisstream
import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"github.com/vmihailenco/msgpack"
)
const UUIDHeaderKey = "_watermill_message_uuid"
type Marshaller interface {
Marshal(topic string, msg *message.Message) (map[string]interface{}, error)
}
type Unmarshaller interface {
Unmarshal(values map[string]interface{}) (msg *message.Message, err error)
}
type MarshallerUnmarshaller interface {
Marshaller
Unmarshaller
}
type DefaultMarshallerUnmarshaller struct{}
func (DefaultMarshallerUnmarshaller) Marshal(_ string, msg *message.Message) (map[string]interface{}, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
}
var (
md []byte
err error
)
if len(msg.Metadata) > 0 {
if md, err = msgpack.Marshal(msg.Metadata); err != nil {
return nil, errors.Wrapf(err, "marshal metadata fail")
}
}
return map[string]interface{}{
UUIDHeaderKey: msg.UUID,
"metadata": md,
"payload": []byte(msg.Payload),
}, nil
}
func (DefaultMarshallerUnmarshaller) Unmarshal(values map[string]interface{}) (msg *message.Message, err error) {
msg = message.NewMessage(values[UUIDHeaderKey].(string), []byte(values["payload"].(string)))
md := values["metadata"]
if md != nil {
s := md.(string)
if s != "" {
metadata := make(message.Metadata)
if err := msgpack.Unmarshal([]byte(s), &metadata); err != nil {
return nil, errors.Wrapf(err, "unmarshal metadata fail")
}
msg.Metadata = metadata
}
}
return msg, nil
}