/
encoding.go
239 lines (194 loc) · 5.32 KB
/
encoding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package ingester
import (
"time"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/grafana/loki/pkg/logproto"
)
// RecordType represents the type of the WAL/Checkpoint record.
type RecordType byte
const (
_ = iota // ignore first value so the zero value doesn't look like a record type.
// WALRecordSeries is the type for the WAL record for series.
WALRecordSeries RecordType = iota
// WALRecordSamples is the type for the WAL record for samples.
WALRecordEntries
// CheckpointRecord is the type for the Checkpoint record based on protos.
CheckpointRecord
)
// WALRecord is a struct combining the series and samples record.
type WALRecord struct {
UserID string
Series []record.RefSeries
// entryIndexMap coordinates the RefEntries index associated with a particular fingerprint.
// This is helpful for constant time lookups during ingestion and is ignored when restoring
// from the WAL.
entryIndexMap map[uint64]int
RefEntries []RefEntries
}
func (r *WALRecord) IsEmpty() bool {
return len(r.Series) == 0 && len(r.RefEntries) == 0
}
func (r *WALRecord) Reset() {
r.UserID = ""
if len(r.Series) > 0 {
r.Series = r.Series[:0]
}
for _, ref := range r.RefEntries {
recordPool.PutEntries(ref.Entries)
}
r.RefEntries = r.RefEntries[:0]
r.entryIndexMap = make(map[uint64]int)
}
func (r *WALRecord) AddEntries(fp uint64, entries ...logproto.Entry) {
if idx, ok := r.entryIndexMap[fp]; ok {
r.RefEntries[idx].Entries = append(r.RefEntries[idx].Entries, entries...)
return
}
r.entryIndexMap[fp] = len(r.RefEntries)
r.RefEntries = append(r.RefEntries, RefEntries{
Ref: fp,
Entries: entries,
})
}
type RefEntries struct {
Ref uint64
Entries []logproto.Entry
}
func (r *WALRecord) encodeSeries(b []byte) []byte {
buf := EncWith(b)
buf.PutByte(byte(WALRecordSeries))
buf.PutUvarintStr(r.UserID)
var enc record.Encoder
// The 'encoded' already has the type header and userID here, hence re-using
// the remaining part of the slice (i.e. encoded[len(encoded):])) to encode the series.
encoded := buf.Get()
encoded = append(encoded, enc.Series(r.Series, encoded[len(encoded):])...)
return encoded
}
func (r *WALRecord) encodeEntries(b []byte) []byte {
buf := EncWith(b)
buf.PutByte(byte(WALRecordEntries))
buf.PutUvarintStr(r.UserID)
// Placeholder for the first timestamp of any sample encountered.
// All others in this record will store their timestamps as diffs relative to this
// as a space optimization.
var first int64
outer:
for _, ref := range r.RefEntries {
for _, entry := range ref.Entries {
first = entry.Timestamp.UnixNano()
buf.PutBE64int64(first)
break outer
}
}
for _, ref := range r.RefEntries {
// ignore refs with 0 entries
if len(ref.Entries) < 1 {
continue
}
buf.PutBE64(ref.Ref) // write fingerprint
buf.PutUvarint(len(ref.Entries)) // write number of entries
for _, s := range ref.Entries {
buf.PutVarint64(s.Timestamp.UnixNano() - first)
// denote line length
byteLine := []byte(s.Line)
buf.PutUvarint(len(byteLine))
buf.PutBytes(byteLine)
}
}
return buf.Get()
}
func decodeEntries(b []byte, rec *WALRecord) error {
if len(b) == 0 {
return nil
}
dec := DecWith(b)
baseTime := dec.Be64int64()
for len(dec.B) > 0 && dec.Err() == nil {
refEntries := RefEntries{
Ref: dec.Be64(),
}
nEntries := dec.Uvarint()
rem := nEntries
for ; dec.Err() == nil && rem > 0; rem-- {
timeOffset := dec.Varint64()
lineLength := dec.Uvarint()
line := dec.Bytes(lineLength)
refEntries.Entries = append(refEntries.Entries, logproto.Entry{
Timestamp: time.Unix(0, baseTime+timeOffset),
Line: string(line),
})
}
if dec.Err() != nil {
return errors.Wrapf(dec.Err(), "entry decode error after %d RefEntries", nEntries-rem)
}
rec.RefEntries = append(rec.RefEntries, refEntries)
}
if dec.Err() != nil {
return errors.Wrap(dec.Err(), "refEntry decode error")
}
if len(dec.B) > 0 {
return errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return nil
}
func decodeWALRecord(b []byte, walRec *WALRecord) (err error) {
var (
userID string
dec record.Decoder
rSeries []record.RefSeries
decbuf = DecWith(b)
t = RecordType(decbuf.Byte())
)
switch t {
case WALRecordSeries:
userID = decbuf.UvarintStr()
rSeries, err = dec.Series(decbuf.B, walRec.Series)
case WALRecordEntries:
userID = decbuf.UvarintStr()
err = decodeEntries(decbuf.B, walRec)
default:
return errors.New("unknown record type")
}
// We reach here only if its a record with type header.
if decbuf.Err() != nil {
return decbuf.Err()
}
if err != nil {
return err
}
walRec.UserID = userID
walRec.Series = rSeries
return nil
}
func EncWith(b []byte) (res Encbuf) {
res.B = b
return res
}
// Encbuf extends encoding.Encbuf with support for multi byte encoding
type Encbuf struct {
encoding.Encbuf
}
func (e *Encbuf) PutBytes(c []byte) { e.B = append(e.B, c...) }
func DecWith(b []byte) (res Decbuf) {
res.B = b
return res
}
// Decbuf extends encoding.Decbuf with support for multi byte decoding
type Decbuf struct {
encoding.Decbuf
}
func (d *Decbuf) Bytes(n int) []byte {
if d.E != nil {
return nil
}
if len(d.B) < n {
d.E = encoding.ErrInvalidSize
return nil
}
x := d.B[:n]
d.B = d.B[n:]
return x
}