forked from influxdata/influxdb
/
stats.go
233 lines (197 loc) · 6.63 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package tsi1
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"sort"
"github.com/influxdata/influxdb/pkg/binaryutil"
)
const (
// MeasurementCardinalityStatsMagicNumber is written as the first 4 bytes
// of a data file to identify the file as a tsi1 cardinality file.
MeasurementCardinalityStatsMagicNumber string = "TSIS"
// MeasurementCardinalityVersion indicates the version of the TSIC file format.
MeasurementCardinalityStatsVersion byte = 1
)
// MeasurementCardinalityStats represents a set of measurement sizes.
type MeasurementCardinalityStats map[string]int
// NewMeasurementCardinality returns a new instance of MeasurementCardinality.
func NewMeasurementCardinalityStats() MeasurementCardinalityStats {
return make(MeasurementCardinalityStats)
}
// MeasurementNames returns a list of sorted measurement names.
func (s MeasurementCardinalityStats) MeasurementNames() []string {
a := make([]string, 0, len(s))
for name := range s {
a = append(a, name)
}
sort.Strings(a)
return a
}
// Inc increments a measurement count by 1.
func (s MeasurementCardinalityStats) Inc(name []byte) {
s[string(name)]++
}
// Dec decrements a measurement count by 1. Deleted if zero.
func (s MeasurementCardinalityStats) Dec(name []byte) {
v := s[string(name)]
if v <= 1 {
delete(s, string(name))
} else {
s[string(name)] = v - 1
}
}
// Add adds the values of all measurements in other to s.
func (s MeasurementCardinalityStats) Add(other MeasurementCardinalityStats) {
for name, v := range other {
s[name] += v
}
}
// Sub subtracts the values of all measurements in other from s.
func (s MeasurementCardinalityStats) Sub(other MeasurementCardinalityStats) {
for name, v := range other {
s[name] -= v
}
}
// Clone returns a copy of s.
func (s MeasurementCardinalityStats) Clone() MeasurementCardinalityStats {
other := make(MeasurementCardinalityStats, len(s))
for k, v := range s {
other[k] = v
}
return other
}
// ReadFrom reads stats from r in a binary format. Reader must also be an io.ByteReader.
func (s MeasurementCardinalityStats) ReadFrom(r io.Reader) (n int64, err error) {
br, ok := r.(io.ByteReader)
if !ok {
return 0, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: ByteReader required")
}
// Read & verify magic.
magic := make([]byte, 4)
nn, err := io.ReadFull(r, magic)
if n += int64(nn); err != nil {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: cannot read stats magic: %s", err)
} else if string(magic) != MeasurementCardinalityStatsMagicNumber {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: invalid tsm1 stats file")
}
// Read & verify version.
version := make([]byte, 1)
nn, err = io.ReadFull(r, version)
if n += int64(nn); err != nil {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: cannot read stats version: %s", err)
} else if version[0] != MeasurementCardinalityStatsVersion {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: incompatible tsm1 stats version: %d", version[0])
}
// Read checksum.
checksum := make([]byte, 4)
nn, err = io.ReadFull(r, checksum)
if n += int64(nn); err != nil {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: cannot read checksum: %s", err)
}
// Read measurement count.
measurementN, err := binary.ReadVarint(br)
if err != nil {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: cannot read stats measurement count: %s", err)
}
n += int64(binaryutil.VarintSize(measurementN))
// Read measurements.
for i := int64(0); i < measurementN; i++ {
nn64, err := s.readMeasurementFrom(r)
if n += nn64; err != nil {
return n, err
}
}
// Expect end-of-file.
buf := make([]byte, 1)
if _, err := r.Read(buf); err != io.EOF {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.ReadFrom: file too large, expected EOF")
}
return n, nil
}
// readMeasurementFrom reads a measurement stat from r in a binary format.
func (s MeasurementCardinalityStats) readMeasurementFrom(r io.Reader) (n int64, err error) {
br, ok := r.(io.ByteReader)
if !ok {
return 0, fmt.Errorf("tsm1.MeasurementCardinalityStats.readMeasurementFrom: ByteReader required")
}
// Read measurement name length.
nameLen, err := binary.ReadVarint(br)
if err != nil {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.readMeasurementFrom: cannot read stats measurement name length: %s", err)
}
n += int64(binaryutil.VarintSize(nameLen))
// Read measurement name. Use large capacity so it can usually be stack allocated.
// Go allocates unescaped variables smaller than 64KB on the stack.
name := make([]byte, nameLen)
nn, err := io.ReadFull(r, name)
if n += int64(nn); err != nil {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.readMeasurementFrom: cannot read stats measurement name: %s", err)
}
// Read size.
sz, err := binary.ReadVarint(br)
if err != nil {
return n, fmt.Errorf("tsm1.MeasurementCardinalityStats.readMeasurementFrom: cannot read stats measurement size: %s", err)
}
n += int64(binaryutil.VarintSize(sz))
// Insert into map.
s[string(name)] = int(sz)
return n, nil
}
// WriteTo writes stats to w in a binary format.
func (s MeasurementCardinalityStats) WriteTo(w io.Writer) (n int64, err error) {
// Write magic & version.
nn, err := io.WriteString(w, MeasurementCardinalityStatsMagicNumber)
if n += int64(nn); err != nil {
return n, err
}
nn, err = w.Write([]byte{MeasurementCardinalityStatsVersion})
if n += int64(nn); err != nil {
return n, err
}
// Write measurement count.
var buf bytes.Buffer
b := make([]byte, binary.MaxVarintLen64)
if _, err = buf.Write(b[:binary.PutVarint(b, int64(len(s)))]); err != nil {
return n, err
}
// Write all measurements in sorted order.
for _, name := range s.MeasurementNames() {
if _, err := s.writeMeasurementTo(&buf, name, s[name]); err != nil {
return n, err
}
}
data := buf.Bytes()
// Compute & write checksum.
if err := binary.Write(w, binary.BigEndian, crc32.ChecksumIEEE(data)); err != nil {
return n, err
}
n += 4
// Write buffer.
nn, err = w.Write(data)
if n += int64(nn); err != nil {
return n, err
}
return n, err
}
func (s MeasurementCardinalityStats) writeMeasurementTo(w io.Writer, name string, sz int) (n int64, err error) {
// Write measurement name length.
buf := make([]byte, binary.MaxVarintLen64)
nn, err := w.Write(buf[:binary.PutVarint(buf, int64(len(name)))])
if n += int64(nn); err != nil {
return n, err
}
// Write measurement name.
nn, err = io.WriteString(w, name)
if n += int64(nn); err != nil {
return n, err
}
// Write size.
nn, err = w.Write(buf[:binary.PutVarint(buf, int64(sz))])
if n += int64(nn); err != nil {
return n, err
}
return n, err
}