forked from influxdata/influxdb
/
digest_writer.go
112 lines (89 loc) · 2.39 KB
/
digest_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package tsm1
import (
"encoding/binary"
"encoding/json"
"io"
"github.com/golang/snappy"
)
// DigestWriter allows for writing a digest of a shard. A digest is a condensed
// representation of the contents of a shard. It can be scoped to one or more series
// keys, ranges of times or sets of files.
type DigestWriter struct {
w io.WriteCloser
sw *snappy.Writer
}
func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) {
return &DigestWriter{w: w, sw: snappy.NewBufferedWriter(w)}, nil
}
func (w *DigestWriter) WriteManifest(m *DigestManifest) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
// Write length of manifest.
if err := binary.Write(w.sw, binary.BigEndian, uint32(len(b))); err != nil {
return err
}
// Write manifest.
_, err = w.sw.Write(b)
return err
}
func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error {
if err := binary.Write(w.sw, binary.BigEndian, uint16(len(key))); err != nil {
return err
}
if _, err := w.sw.Write([]byte(key)); err != nil {
return err
}
if err := binary.Write(w.sw, binary.BigEndian, uint32(t.Len())); err != nil {
return err
}
for _, tr := range t.Ranges {
if err := binary.Write(w.sw, binary.BigEndian, tr.Min); err != nil {
return err
}
if err := binary.Write(w.sw, binary.BigEndian, tr.Max); err != nil {
return err
}
if err := binary.Write(w.sw, binary.BigEndian, tr.CRC); err != nil {
return err
}
if err := binary.Write(w.sw, binary.BigEndian, uint16(tr.N)); err != nil {
return err
}
}
return nil
}
func (w *DigestWriter) Flush() error {
return w.sw.Flush()
}
func (w *DigestWriter) Close() error {
if err := w.Flush(); err != nil {
return err
}
if err := w.sw.Close(); err != nil {
return err
}
return w.w.Close()
}
type DigestTimeSpan struct {
Ranges []DigestTimeRange
}
func (a DigestTimeSpan) Len() int { return len(a.Ranges) }
func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] }
func (a DigestTimeSpan) Less(i, j int) bool {
return a.Ranges[i].Min < a.Ranges[j].Min
}
func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) {
for _, v := range t.Ranges {
if v.Min == min && v.Max == max && v.N == n && v.CRC == crc {
return
}
}
t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc})
}
type DigestTimeRange struct {
Min, Max int64
N int
CRC uint32
}