forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
writer.go
156 lines (135 loc) · 3.23 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package format
import (
"fmt"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/storage"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
)
var (
// Discard is a Writer where all write calls succeed. The source data is also read completely, which can be useful
// for testing performance.
Discard Writer = &devNull{true}
// DevNull is a Writer where all write calls succeed, however, no source data is read.
DevNull Writer = &devNull{}
)
type Writer interface {
NewBucket(start, end int64) (BucketWriter, error)
Close() error
}
type BucketWriter interface {
Err() error
BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags)
EndSeries()
WriteIntegerCursor(cur tsdb.IntegerArrayCursor)
WriteFloatCursor(cur tsdb.FloatArrayCursor)
WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor)
WriteBooleanCursor(cur tsdb.BooleanArrayCursor)
WriteStringCursor(cur tsdb.StringArrayCursor)
Close() error
}
// WriteBucket reads data from rs covering the time range [start, end) and streams to w.
// The ResultSet must guarantee series+field keys are produced in ascending lexicographical order and values in
// ascending time order.
func WriteBucket(w Writer, start, end int64, rs *storage.ResultSet) error {
bw, err := w.NewBucket(start, end)
if err != nil {
return err
}
defer bw.Close()
for rs.Next() {
bw.BeginSeries(rs.Name(), rs.Field(), rs.FieldType(), rs.Tags())
ci := rs.CursorIterator()
for ci.Next() {
cur := ci.Cursor()
switch c := cur.(type) {
case tsdb.IntegerArrayCursor:
bw.WriteIntegerCursor(c)
case tsdb.FloatArrayCursor:
bw.WriteFloatCursor(c)
case tsdb.UnsignedArrayCursor:
bw.WriteUnsignedCursor(c)
case tsdb.BooleanArrayCursor:
bw.WriteBooleanCursor(c)
case tsdb.StringArrayCursor:
bw.WriteStringCursor(c)
case nil:
// no data for series key + field combination in this shard
continue
default:
panic(fmt.Sprintf("unreachable: %T", c))
}
cur.Close()
}
bw.EndSeries()
if bw.Err() != nil {
return bw.Err()
}
}
return nil
}
type devNull struct {
r bool
}
func (w *devNull) NewBucket(start, end int64) (BucketWriter, error) {
return w, nil
}
func (w *devNull) BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags) {}
func (w *devNull) EndSeries() {}
func (w *devNull) Err() error { return nil }
func (w *devNull) Close() error { return nil }
func (w *devNull) WriteIntegerCursor(cur tsdb.IntegerArrayCursor) {
if !w.r {
return
}
for {
a := cur.Next()
if a.Len() == 0 {
break
}
}
}
func (w *devNull) WriteFloatCursor(cur tsdb.FloatArrayCursor) {
if !w.r {
return
}
for {
a := cur.Next()
if a.Len() == 0 {
break
}
}
}
func (w *devNull) WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor) {
if !w.r {
return
}
for {
a := cur.Next()
if a.Len() == 0 {
break
}
}
}
func (w *devNull) WriteBooleanCursor(cur tsdb.BooleanArrayCursor) {
if !w.r {
return
}
for {
a := cur.Next()
if a.Len() == 0 {
break
}
}
}
func (w *devNull) WriteStringCursor(cur tsdb.StringArrayCursor) {
if !w.r {
return
}
for {
a := cur.Next()
if a.Len() == 0 {
break
}
}
}