-
Notifications
You must be signed in to change notification settings - Fork 107
/
message.go
163 lines (143 loc) · 3.6 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package commitlog
import (
"errors"
"hash/crc32"
client "github.com/liftbridge-io/liftbridge-api/go"
)
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
// Message is the object that gets serialized and written to the log.
type Message struct {
Crc int32
MagicByte int8
Attributes int8
Key []byte
Value []byte
Headers map[string][]byte
// Transient fields
Timestamp int64
LeaderEpoch uint64
AckInbox string
CorrelationID string
AckPolicy client.AckPolicy
Offset int64
}
// Encode the Message into the packetEncoder.
func (m *Message) Encode(e packetEncoder) error {
e.Push(&crcField{})
e.PutInt8(m.MagicByte)
e.PutInt8(m.Attributes)
if err := e.PutBytes(m.Key); err != nil {
return err
}
if err := e.PutBytes(m.Value); err != nil {
return err
}
e.PutInt16(int16(len(m.Headers)))
for key, header := range m.Headers {
if err := e.PutString(key); err != nil {
return err
}
if err := e.PutBytes(header); err != nil {
return err
}
}
e.Pop()
return nil
}
// crcField is used to perform a CRC32 check on a message.
type crcField struct {
StartOffset int
}
// SaveOffset sets the position to fill the CRC digest.
func (f *crcField) SaveOffset(in int) {
f.StartOffset = in
}
// ReserveSize sets the number of bytes to reserve for the CRC digest.
func (f *crcField) ReserveSize() int {
return 4
}
// Fill sets the CRC digest.
func (f *crcField) Fill(curOffset int, buf []byte) error {
crc := crc32.Checksum(buf[f.StartOffset+4:curOffset], crc32cTable)
encoding.PutUint32(buf[f.StartOffset:], crc)
return nil
}
// Check the CRC digest.
func (f *crcField) Check(curOffset int, buf []byte) error {
crc := crc32.Checksum(buf[f.StartOffset+4:curOffset], crc32cTable)
if crc != encoding.Uint32(buf[f.StartOffset:]) {
return errors.New("crc didn't match")
}
return nil
}
// SerializedMessage is a serialized message read from the log.
type SerializedMessage []byte
// Crc returns the CRC32 digest of the message.
func (m SerializedMessage) Crc() uint32 {
return encoding.Uint32(m)
}
// MagicByte returns the byte used for encoding protocol version detection.
func (m SerializedMessage) MagicByte() int8 {
return int8(m[4])
}
// Attributes returns the byte used for message flags.
func (m SerializedMessage) Attributes() int8 {
return int8(m[5])
}
// Key returns the message key.
func (m SerializedMessage) Key() []byte {
start, end, size := m.keyOffsets()
if size == -1 {
return nil
}
return m[start+4 : end]
}
// Value returns the message value.
func (m SerializedMessage) Value() []byte {
start, end, size := m.valueOffsets()
if size == -1 {
return nil
}
return m[start+4 : end]
}
// Headers returns the message headers map.
func (m SerializedMessage) Headers() map[string][]byte {
var (
_, valueEnd, _ = m.valueOffsets()
n = valueEnd
numHeaders = encoding.Uint16(m[n:])
headers = make(map[string][]byte, numHeaders)
)
n += 2
for i := uint16(0); i < numHeaders; i++ {
keySize := encoding.Uint16(m[n:])
n += 2
key := string(m[n : n+int32(keySize)])
n += int32(keySize)
valueSize := encoding.Uint32(m[n:])
n += 4
value := m[n : n+int32(valueSize)]
n += int32(valueSize)
headers[key] = value
}
return headers
}
func (m SerializedMessage) keyOffsets() (start, end, size int32) {
start = 6
size = int32(encoding.Uint32(m[start:]))
end = start + 4
if size != -1 {
end += size
}
return
}
func (m SerializedMessage) valueOffsets() (start, end, size int32) {
_, keyEnd, _ := m.keyOffsets()
start = keyEnd
size = int32(encoding.Uint32(m[start:]))
end = start + 4
if size != -1 {
end += size
}
return
}