-
Notifications
You must be signed in to change notification settings - Fork 180
/
encode.go
109 lines (98 loc) · 2.84 KB
/
encode.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package queue
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pydio/cells/v4/common"
"github.com/pydio/cells/v4/common/broker"
"github.com/pydio/cells/v4/common/service/context/metadata"
json "github.com/pydio/cells/v4/common/utils/jsonx"
)
// EncodeProtoWithContext combines json-encoded context metadata and marshalled proto.Message into a unique []byte
func EncodeProtoWithContext(ctx context.Context, msg proto.Message) []byte {
var hh []byte
if md, ok := metadata.FromContextCopy(ctx); ok {
hh, _ = json.Marshal(md)
}
a, _ := anypb.New(msg)
bb, _ := proto.Marshal(a)
return joinWithLengthPrefix(hh, bb)
}
// DecodeToBrokerMessage tries to parse a combination of json-encoded metadata and a marshalled protobuf
func DecodeToBrokerMessage(msg []byte) (broker.Message, error) {
if bb, er := splitWithLengthPrefix(msg); er == nil && len(bb) == 2 {
headers := bb[0]
rawData := bb[1]
var mm map[string]string
if len(headers) > 0 {
md := map[string]string{}
if e := json.Unmarshal(headers, &md); e != nil {
return nil, e
} else {
mm = md
}
}
a := &anypb.Any{}
if e := proto.Unmarshal(rawData, a); e != nil {
return nil, fmt.Errorf("expecting anypb: %v", er)
}
return &pulledMessage{hh: mm, data: rawData, a: a}, nil
} else {
return nil, fmt.Errorf("cannot split event %v", er)
}
}
type pulledMessage struct {
hh map[string]string
data []byte
a *anypb.Any
}
func (p *pulledMessage) Unmarshal(target proto.Message) (context.Context, error) {
if e := p.a.UnmarshalTo(target); e != nil {
return nil, e
}
ctx := context.Background()
if p.hh != nil {
ctx = metadata.NewContext(ctx, p.hh)
// If X-Pydio-User found in meta, add it to context as well
if u, ok := p.hh[common.PydioContextUserKey]; ok {
ctx = context.WithValue(ctx, common.PydioContextUserKey, u)
}
}
return ctx, nil
}
func (p *pulledMessage) RawData() (map[string]string, []byte) {
return p.hh, p.data
}
func joinWithLengthPrefix(data ...[]byte) []byte {
var buf bytes.Buffer
for _, d := range data {
binary.Write(&buf, binary.LittleEndian, uint32(len(d))) // Write length prefix
buf.Write(d) // Write the data
}
return buf.Bytes()
}
func splitWithLengthPrefix(data []byte) ([][]byte, error) {
var splits [][]byte
buf := bytes.NewBuffer(data)
for buf.Len() > 0 {
var length uint32
err := binary.Read(buf, binary.LittleEndian, &length) // Read length prefix
if err != nil {
return nil, err
}
if int(length) == 0 {
splits = append(splits, []byte{})
continue
}
if int(length) > buf.Len() {
return nil, fmt.Errorf("not enough data left for split")
}
split := make([]byte, length)
buf.Read(split) // Read the data
splits = append(splits, split)
}
return splits, nil
}