forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conflictwriter.go
176 lines (148 loc) · 3.87 KB
/
conflictwriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package format
import (
"bytes"
"fmt"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
)
// ConflictWriter is a Writer that redirects conflicting data to an alternate output.
type ConflictWriter struct {
w Writer
c Writer
bw aggregateBucketWriter
}
// NewConflictWriter returns a Writer that redirects invalid point data to the conflict Writer.
func NewConflictWriter(w, conflict Writer) *ConflictWriter {
return &ConflictWriter{w: w, c: conflict}
}
func (cw *ConflictWriter) NewBucket(start, end int64) (bw BucketWriter, err error) {
cw.bw.w, err = cw.w.NewBucket(start, end)
if err != nil {
return nil, err
}
cw.bw.c, err = cw.c.NewBucket(start, end)
if err != nil {
cw.bw.w.Close()
return nil, err
}
return &cw.bw, nil
}
func (cw *ConflictWriter) Close() error {
// we care if either error and prioritize the conflict writer lower.
cerr := cw.c.Close()
if err := cw.w.Close(); err != nil {
return err
}
return cerr
}
type bucketState int
const (
beginSeriesBucketState bucketState = iota
writeBucketState
writeConflictsBucketState
)
type aggregateBucketWriter struct {
w BucketWriter
c BucketWriter
state bucketState
// current series
name []byte
field []byte
typ influxql.DataType
tags models.Tags
mf map[string]influxql.DataType
}
func (bw *aggregateBucketWriter) Err() error {
switch {
case bw.w.Err() != nil:
return bw.w.Err()
case bw.c.Err() != nil:
return bw.c.Err()
default:
return nil
}
}
func (bw *aggregateBucketWriter) BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags) {
bw.w.BeginSeries(name, field, typ, tags)
if !bytes.Equal(bw.name, name) {
// new measurement
bw.name = append(bw.name[:0], name...)
bw.mf = make(map[string]influxql.DataType)
}
bw.field = append(bw.field[:0], field...)
bw.tags = tags
var ok bool
bw.typ, ok = bw.mf[string(field)]
if !ok {
bw.mf[string(field)] = typ
bw.typ = typ
}
bw.state = writeBucketState
}
func (bw *aggregateBucketWriter) EndSeries() {
switch {
case bw.state == writeBucketState:
bw.w.EndSeries()
case bw.state == writeConflictsBucketState:
bw.w.EndSeries()
bw.c.EndSeries()
default:
panic(fmt.Sprintf("ConflictWriter state: got=%v, exp=%v,%v", bw.state, writeBucketState, writeConflictsBucketState))
}
bw.state = beginSeriesBucketState
}
func (bw *aggregateBucketWriter) conflictState(other influxql.DataType) {
if bw.state == writeBucketState {
bw.c.BeginSeries(bw.name, bw.field, bw.typ, bw.tags)
bw.state = writeConflictsBucketState
}
}
func (bw *aggregateBucketWriter) WriteIntegerCursor(cur tsdb.IntegerArrayCursor) {
if bw.typ == influxql.Integer {
bw.w.WriteIntegerCursor(cur)
} else {
bw.conflictState(influxql.Integer)
bw.c.WriteIntegerCursor(cur)
}
}
func (bw *aggregateBucketWriter) WriteFloatCursor(cur tsdb.FloatArrayCursor) {
if bw.typ == influxql.Float {
bw.w.WriteFloatCursor(cur)
} else {
bw.conflictState(influxql.Float)
bw.c.WriteFloatCursor(cur)
}
}
func (bw *aggregateBucketWriter) WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor) {
if bw.typ == influxql.Unsigned {
bw.w.WriteUnsignedCursor(cur)
} else {
bw.conflictState(influxql.Unsigned)
bw.c.WriteUnsignedCursor(cur)
}
}
func (bw *aggregateBucketWriter) WriteBooleanCursor(cur tsdb.BooleanArrayCursor) {
if bw.typ == influxql.Boolean {
bw.w.WriteBooleanCursor(cur)
} else {
bw.conflictState(influxql.Boolean)
bw.c.WriteBooleanCursor(cur)
}
}
func (bw *aggregateBucketWriter) WriteStringCursor(cur tsdb.StringArrayCursor) {
if bw.typ == influxql.String {
bw.w.WriteStringCursor(cur)
} else {
bw.conflictState(influxql.String)
bw.c.WriteStringCursor(cur)
}
}
func (bw *aggregateBucketWriter) Close() error {
// we care if either error and prioritize the conflict writer lower.
cerr := bw.c.Close()
if err := bw.w.Close(); err != nil {
return err
}
return cerr
}