-
Notifications
You must be signed in to change notification settings - Fork 277
/
field_store.go
340 lines (303 loc) · 11.1 KB
/
field_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package memdb
import (
"encoding/binary"
"math"
commonencoding "github.com/lindb/common/pkg/encoding"
"github.com/lindb/lindb/flow"
"github.com/lindb/lindb/pkg/bit"
"github.com/lindb/lindb/pkg/encoding"
"github.com/lindb/lindb/pkg/logger"
"github.com/lindb/lindb/pkg/stream"
"github.com/lindb/lindb/pkg/timeutil"
"github.com/lindb/lindb/series/field"
"github.com/lindb/lindb/tsdb/tblstore/metricsdata"
)
//go:generate mockgen -source ./field_store.go -destination=./field_store_mock.go -package memdb
// memory layout as below:
// header: field id[2bytes]
// + start time[2byte] + end time(delta of start time)[1byte] + mark container[2byte]
// body: data points(value.....)
// last mark flag of container marks the buf if it has data written
const (
fieldOffset = 0 // field id
startOffset = fieldOffset + 2 // start time
endOffset = startOffset + 2
markOffset = endOffset + 1
bodyOffset = markOffset + 2
headLen = 8
valueSize = 8
markContainer = 8
emptyFieldStoreSize = 24 + // empty buf slice cost
24 // empty compress slice cost
)
// fStoreINTF represents field-store,
// which abstracts a store for storing field data based on family start time + field id
type fStoreINTF interface {
// Capacity returns the size usage
Capacity() int
// GetFieldID returns the field id of metric level
GetFieldID() field.ID
// Write writes the field data into current buffer
// if time slot out of current time window, need compress time window then resets the current buffer
// if it has same time slot in current buffer, need do rollup operation by field type
Write(fieldType field.Type, slotIndex uint16, value float64)
// FlushFieldTo flushes field store data into kv store, need align slot range in metric level
FlushFieldTo(tableFlusher metricsdata.Flusher, fieldMeta field.Meta, flushCtx *flushContext) error
// Load loads field series data.
Load(ctx *flow.DataLoadContext,
seriesIdxFromQuery uint16, fieldIdx int,
fieldType field.Type, slotRange timeutil.SlotRange,
)
}
// fieldStore implements fStoreINTF interface
type fieldStore struct {
buf []byte // current write buffer, accept write data
compress []byte // immutable compress data
}
// newFieldStore creates a new field store
func newFieldStore(buf []byte, fieldID field.ID) fStoreINTF {
stream.PutUint16(buf, fieldOffset, uint16(fieldID))
return &fieldStore{
buf: buf,
}
}
// GetFieldID returns the field id of metric level
func (fs *fieldStore) GetFieldID() field.ID {
return field.ID(stream.ReadUint16(fs.buf, fieldOffset))
}
func (fs *fieldStore) Write(fieldType field.Type, slotIndex uint16, value float64) {
if fs.buf[markOffset+1] == 0 {
// no data written before
fs.writeFirstPoint(slotIndex, value)
return
}
startTime := fs.getStart()
if slotIndex < startTime || slotIndex > startTime+fs.timeWindow()-1 {
// if current slot time out of current time window, need compress block data, start new time window
fs.compact(fieldType, startTime)
// write first point after compact
fs.writeFirstPoint(slotIndex, value)
return
}
// write data in current write buffer
delta := slotIndex - startTime
pos, markIdx, flagIdx := fs.position(delta)
if fs.buf[markOffset+markIdx]&flagIdx != 0 {
// there is same point of same time slot
oldValue := math.Float64frombits(binary.LittleEndian.Uint64(fs.buf[pos:]))
value = fieldType.AggType().Aggregate(oldValue, value)
} else {
// new data for time slot
fs.buf[endOffset] = byte(delta)
fs.buf[markOffset+markIdx] |= flagIdx // mark value exist
}
// finally, write value into the body of current write buffer
binary.LittleEndian.PutUint64(fs.buf[pos:], math.Float64bits(value))
}
// FlushFieldTo flushes field store data into kv store, need align slot range in metric level
func (fs *fieldStore) FlushFieldTo(tableFlusher metricsdata.Flusher, fieldMeta field.Meta, flushCtx *flushContext) error {
var decoder *encoding.TSDDecoder
if len(fs.compress) > 0 {
// calc new start/end based on old compress values
decoder = encoding.GetTSDDecoder()
defer encoding.ReleaseTSDDecoder(decoder)
decoder.Reset(fs.compress)
}
encoder := tableFlusher.GetEncoder(flushCtx.fieldIdx)
encoder.RestWithStartTime(flushCtx.SlotRange.Start)
data, err := fs.merge(fieldMeta.Type, encoder, decoder, fs.getStart(), flushCtx.SlotRange, false)
if err != nil {
memDBLogger.Error("flush field store err, data lost", logger.Error(err))
return nil
}
return tableFlusher.FlushField(data)
}
// writeFirstPoint writes first point in current write buffer.
func (fs *fieldStore) writeFirstPoint(slotIndex uint16, value float64) {
pos, markIdx, flagIdx := fs.position(0)
binary.LittleEndian.PutUint16(fs.buf[startOffset:], slotIndex) // write start time
fs.buf[endOffset] = 0
fs.buf[markOffset+markIdx] |= flagIdx // mark value exist
fs.buf[markOffset+1] |= 1 // last mark flag marks if buf has data written
binary.LittleEndian.PutUint64(fs.buf[pos:], math.Float64bits(value))
}
// timeWindow returns the time window of current write buffer
func (fs *fieldStore) timeWindow() uint16 {
return uint16((len(fs.buf) - headLen) / valueSize)
}
// resetBuf resets the writer buffer mark, makes the current buffer is new
func (fs *fieldStore) resetBuf() {
fs.buf[markOffset] = 0
fs.buf[markOffset+1] = 0
}
func (fs *fieldStore) Capacity() int {
// notice: do not use cap as it's a allocated page
return cap(fs.compress) + len(fs.buf) + emptyFieldStoreSize
}
// compact the current write buffer,
// new compress operation will be executed when it's necessary
func (fs *fieldStore) compact(fieldType field.Type, startTime uint16) {
length := len(fs.compress)
thisSlotRange := fs.slotRange(startTime)
var decoder *encoding.TSDDecoder
if length > 0 {
// if has compress data, create tsd decoder for merge compress
decoder = encoding.GetTSDDecoder()
defer encoding.ReleaseTSDDecoder(decoder)
decoder.Reset(fs.compress)
}
encoder := encoding.TSDEncodeFunc(thisSlotRange.Start)
defer encoding.ReleaseTSDEncoder(encoder)
data, err := fs.merge(fieldType, encoder, decoder, startTime, thisSlotRange, true)
if err != nil {
memDBLogger.Error("compact field store data err", logger.Error(err))
}
fs.compress = commonencoding.MustCopy(fs.compress, data)
// !!!!! IMPORTANT: need reset current write buffer
fs.resetBuf()
}
// position returns the point write position/mark index/flag index.
// position: write value
// markIdx: mark container index
// flagIdx: flag if pos has value
func (fs *fieldStore) position(deltaOfTime uint16) (pos, markIdx uint16, flagIdx uint8) {
pos = bodyOffset + valueSize*deltaOfTime
markIdx = deltaOfTime / valueSize
flagIdx = uint8(1 << (markContainer - deltaOfTime%markContainer - 1))
return
}
// getStart returns the start time in current write buffer
func (fs *fieldStore) getStart() uint16 {
return stream.ReadUint16(fs.buf, startOffset)
}
// getEnd returns the delta time of start time in current write buffer
func (fs *fieldStore) getEnd() uint16 {
return uint16(fs.buf[endOffset])
}
// merge the current and compress data based on field aggregate function,
// startTime => current write start time
// start/end slot => target compact time slot
func (fs *fieldStore) merge(
fieldType field.Type,
encoder *encoding.TSDEncoder,
decoder *encoding.TSDDecoder,
startTime uint16,
thisSlotRange timeutil.SlotRange,
withTimeRange bool,
) (compress []byte, err error) {
for i := thisSlotRange.Start; i <= thisSlotRange.End; i++ {
newValue, hasNewValue := fs.getCurrentValue(startTime, i)
oldValue, hasOldValue := getOldFloatValue(decoder, i)
switch {
case hasNewValue && !hasOldValue:
// just compress current block value with pos
encoder.AppendTime(bit.One)
encoder.AppendValue(math.Float64bits(newValue))
case hasNewValue && hasOldValue:
// merge and compress
encoder.AppendTime(bit.One)
encoder.AppendValue(math.Float64bits(fieldType.AggType().Aggregate(newValue, oldValue)))
case !hasNewValue && hasOldValue:
// compress old value
encoder.AppendTime(bit.One)
encoder.AppendValue(math.Float64bits(oldValue))
default:
// append empty value
encoder.AppendTime(bit.Zero)
}
}
if withTimeRange {
compress, err = encoder.Bytes()
if err != nil {
return nil, err
}
return compress, err
}
// get compress data without time slot range
compress, err = encoder.BytesWithoutTime()
if err != nil {
return nil, err
}
return compress, err
}
// Load loads field series data.
func (fs *fieldStore) Load(ctx *flow.DataLoadContext,
seriesIdxFromQuery uint16, fieldIdx int,
_ field.Type, slotRange timeutil.SlotRange,
) {
var tsd *encoding.TSDDecoder
size := len(fs.compress)
if size > 0 {
tsd = ctx.Decoder
tsd.Reset(fs.compress)
ctx.DownSampling(slotRange, seriesIdxFromQuery, fieldIdx, tsd)
}
ctx.DownSampling(slotRange, seriesIdxFromQuery, fieldIdx, fs)
}
// GetValue returns value by time slot, if it hasn't, return false.
func (fs *fieldStore) GetValue(slot uint16) (float64, bool) {
return fs.getCurrentValue(fs.getStart(), slot)
}
// slotRange returns time slot range in current/compress buffer
func (fs *fieldStore) slotRange(currentStart uint16) timeutil.SlotRange {
startSlot := currentStart
endSlot := currentStart + fs.getEnd()
if len(fs.compress) == 0 {
return timeutil.NewSlotRange(startSlot, endSlot)
}
start, end := encoding.DecodeTSDTime(fs.compress)
return getTimeSlotRange(start, end, startSlot, endSlot)
}
// getTimeSlotRange returns the final time slot range based on start/end
func getTimeSlotRange(startSlot1, endSlot1, startSlot2, endSlot2 uint16) timeutil.SlotRange {
sr := timeutil.NewSlotRange(startSlot1, endSlot1)
if sr.End < endSlot2 {
sr.End = endSlot2
}
if sr.Start > startSlot2 {
sr.Start = startSlot2
}
return sr
}
// getCurrentValue returns the value in current write buffer
func (fs *fieldStore) getCurrentValue(startTime, timeSlot uint16) (value float64, hasValue bool) {
if timeSlot < startTime || timeSlot > startTime+fs.getEnd() {
return
}
delta := timeSlot - startTime
pos, markIdx, flagIdx := fs.position(delta)
if fs.buf[markOffset+markIdx]&flagIdx == 0 {
return
}
hasValue = true
value = math.Float64frombits(binary.LittleEndian.Uint64(fs.buf[pos:]))
return
}
// getOldFloatValue returns the value in compress buffer
func getOldFloatValue(tsd *encoding.TSDDecoder, timeSlot uint16) (value float64, hasValue bool) {
if tsd == nil {
return
}
if !tsd.HasValueWithSlot(timeSlot) {
return
}
hasValue = true
value = math.Float64frombits(tsd.Value())
return
}