-
Notifications
You must be signed in to change notification settings - Fork 75
/
appender.go
79 lines (74 loc) · 2.24 KB
/
appender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package coder
import (
"github.com/lesismal/arpc"
"github.com/lesismal/arpc/util"
)
// Appender .
type Appender struct {
AppenderName string
FlagBitIndex int
valueToBytes func(interface{}) ([]byte, error)
bytesToValue func([]byte) (interface{}, error)
}
// Encode implements arpc MessageCoder.
func (ap *Appender) Encode(client *arpc.Client, msg *arpc.Message) *arpc.Message {
if ap.AppenderName == "" || ap.valueToBytes == nil {
return msg
}
key := ap.AppenderName
value, ok := msg.Get(key)
if !ok {
return msg
}
if err := msg.SetFlagBit(ap.FlagBitIndex, true); err != nil {
return msg
}
valueData, err := ap.valueToBytes(value)
if err != nil {
return msg
}
msg.Buffer = append(msg.Buffer, make([]byte, len(key)+len(valueData)+2)...)
appendData := msg.Buffer[len(msg.Buffer)-len(key)-len(valueData)-2:]
copy(appendData, key)
copy(appendData[len(key):], valueData)
appendLen := uint16(len(appendData))
appendData[appendLen-2], appendData[appendLen-1] = byte(appendLen>>8), byte(appendLen&0xFF)
msg.SetBodyLen(len(msg.Buffer) - 16)
return msg
}
// Decode implements arpc MessageCoder.
func (ap *Appender) Decode(client *arpc.Client, msg *arpc.Message) *arpc.Message {
if msg.IsFlagBitSet(ap.FlagBitIndex) {
bufLen := len(msg.Buffer)
if bufLen > 2 && ap.bytesToValue != nil {
key := ap.AppenderName
appendLen := (int(msg.Buffer[bufLen-2]) << 8) | int(msg.Buffer[bufLen-1])
if bufLen >= appendLen {
appenderName := util.BytesToStr(msg.Buffer[bufLen-appendLen : bufLen-appendLen+len(key)])
if appenderName != key {
return msg
}
payloadBody := msg.Buffer[bufLen-appendLen+len(appenderName) : bufLen-2]
if value, err := ap.bytesToValue(payloadBody); err == nil {
msg.Set(key, value)
}
}
msg.Buffer = msg.Buffer[:len(msg.Buffer)-appendLen]
msg.SetFlagBit(ap.FlagBitIndex, false)
msg.SetBodyLen(len(msg.Buffer) - 16)
}
}
return msg
}
// NewAppender returns the trace coding middleware.
func NewAppender(appenderName string,
flagBitIndex int,
valueToBytes func(interface{}) ([]byte, error),
bytesToValue func([]byte) (interface{}, error)) *Appender {
return &Appender{
AppenderName: appenderName,
FlagBitIndex: flagBitIndex,
valueToBytes: valueToBytes,
bytesToValue: bytesToValue,
}
}