-
Notifications
You must be signed in to change notification settings - Fork 277
/
timeseries_store.go
159 lines (142 loc) · 4.93 KB
/
timeseries_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package memdb
import (
"sort"
"github.com/lindb/lindb/flow"
"github.com/lindb/lindb/pkg/timeutil"
"github.com/lindb/lindb/series/field"
"github.com/lindb/lindb/tsdb/tblstore/metricsdata"
)
//go:generate mockgen -source ./timeseries_store.go -destination=./timeseries_store_mock.go -package memdb
const emptyTimeSeriesStoreSize = 24 // fStores slice
// tStoreINTF abstracts a time-series store
type tStoreINTF interface {
// Capacity returns the size of tStoreINTF without fields
Capacity() int
// GetFStore returns the fStore in field list by field id.
GetFStore(fieldID field.ID) (fStoreINTF, bool)
// InsertFStore inserts a new fStore to field list.
InsertFStore(fStore fStoreINTF)
// FlushFieldsTo flushes the field data segment.
FlushFieldsTo(flusher metricsdata.Flusher, flushCtx *flushContext) error
// load the time series data based on field ids
load(loadCtx *flow.DataLoadContext, seriesIdxFromQuery uint16, fields field.Metas, slotRange timeutil.SlotRange)
}
// fStoreNodes implements sort.Interface
type fStoreNodes []fStoreINTF
func (f fStoreNodes) Len() int { return len(f) }
func (f fStoreNodes) Less(i, j int) bool { return f[i].GetFieldID() < f[j].GetFieldID() }
func (f fStoreNodes) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
// timeSeriesStore holds a mapping relation of field and fieldStore.
type timeSeriesStore struct {
fStoreNodes fStoreNodes // key: sorted fStore list by field-name, insert-only
}
// newTimeSeriesStore returns a new tStoreINTF.
func newTimeSeriesStore() tStoreINTF {
return &timeSeriesStore{}
}
// GetFStore returns the fStore in this list from field-id.
func (ts *timeSeriesStore) GetFStore(fieldID field.ID) (fStoreINTF, bool) {
fieldLength := len(ts.fStoreNodes)
if fieldLength == 1 {
if ts.fStoreNodes[0].GetFieldID() != fieldID {
return nil, false
}
return ts.fStoreNodes[0], true
}
// fast path
if fieldLength < 20 {
for idx := range ts.fStoreNodes {
if ts.fStoreNodes[idx].GetFieldID() == fieldID {
return ts.fStoreNodes[idx], true
}
}
return nil, false
}
idx := sort.Search(fieldLength, func(i int) bool {
return ts.fStoreNodes[i].GetFieldID() >= fieldID
})
if idx >= fieldLength || ts.fStoreNodes[idx].GetFieldID() != fieldID {
return nil, false
}
return ts.fStoreNodes[idx], true
}
func (ts *timeSeriesStore) Capacity() int {
return emptyTimeSeriesStoreSize + 8*cap(ts.fStoreNodes)
}
// InsertFStore inserts a new fStore to field list.
func (ts *timeSeriesStore) InsertFStore(fStore fStoreINTF) {
ts.fStoreNodes = append(ts.fStoreNodes, fStore)
if len(ts.fStoreNodes) > 1 {
sort.Sort(ts.fStoreNodes)
}
}
// FlushFieldsTo flushes the series data segment.
func (ts *timeSeriesStore) FlushFieldsTo(flusher metricsdata.Flusher, flushCtx *flushContext) error {
stores := ts.fStoreNodes
fStoreLen := len(stores)
// if no field store under current data family
if stores == nil || fStoreLen == 0 {
return nil
}
fieldMetas := flusher.GetFieldMetas()
idx := 0
for _, fieldMeta := range fieldMetas {
if idx < fStoreLen && fieldMeta.ID == stores[idx].GetFieldID() {
// flush field data
flushCtx.fieldIdx = idx
if err := stores[idx].FlushFieldTo(flusher, fieldMeta, flushCtx); err != nil {
return err
}
idx++
} else {
// must flush nil data for metric has multi-field.
// because each series need fill all field data in order.
_ = flusher.FlushField(nil)
}
}
return nil
}
// load the time series data based on key(family+field).
// NOTICE: field ids and fields aggregator must be in order.
func (ts *timeSeriesStore) load(loadCtx *flow.DataLoadContext,
seriesIdxFromQuery uint16,
fields field.Metas, slotRange timeutil.SlotRange,
) {
fieldLength := len(ts.fStoreNodes)
fieldCount := len(fields)
j := 0
for i := 0; i < fieldLength; i++ {
fieldStore := ts.fStoreNodes[i]
queryFieldID := fields[j].ID
storeFieldID := fieldStore.GetFieldID()
switch {
case storeFieldID == queryFieldID:
// load field data
fieldStore.Load(loadCtx, seriesIdxFromQuery, j, fields[j].Type, slotRange)
j++ // goto next query field id
// found all query fields return it
if fieldCount == j {
return
}
case storeFieldID > queryFieldID:
// store field id > query field id, return it
return
}
}
}