-
Notifications
You must be signed in to change notification settings - Fork 7
/
base.go
241 lines (209 loc) · 6.54 KB
/
base.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package parser
// TODO integrate this functionality into the parser.go code.
// Probably should have Base implement Parser.
import (
"context"
"errors"
"log"
"reflect"
"time"
"github.com/prometheus/client_golang/prometheus"
v2as "github.com/m-lab/annotation-service/api/v2"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/metrics"
"github.com/m-lab/etl/row"
)
// Errors that may be returned by BaseRowBuffer functions.
var (
ErrAnnotationError = errors.New("Annotation error")
ErrNotAnnotatable = errors.New("object does not implement Annotatable")
ErrRowNotPointer = errors.New("Row should be a pointer type")
)
// RowBuffer provides all basic functionality generally needed for buffering, annotating, and inserting
// rows that implement Annotatable.
type RowBuffer struct {
bufferSize int
rows []interface{} // Actually these are Annotatable, but we cast them later.
ann v2as.Annotator
}
// AddRow simply inserts a row into the buffer. Returns error if buffer is full.
// Not thread-safe. Should only be called by owning thread.
func (buf *RowBuffer) AddRow(r interface{}) error {
if !reflect.TypeOf(r).Implements(reflect.TypeOf((*row.Annotatable)(nil)).Elem()) {
log.Println(reflect.TypeOf(r), "not Annotatable")
return ErrNotAnnotatable
}
for len(buf.rows) > buf.bufferSize-1 {
return etl.ErrBufferFull
}
buf.rows = append(buf.rows, r)
return nil
}
// NumRowsForTest allows tests to find number of rows in buffer.
func (buf *RowBuffer) NumRowsForTest() int {
return len(buf.rows)
}
// TakeRows returns all rows in the buffer, and clears the buffer.
// Not thread-safe. Should only be called by owning thread.
func (buf *RowBuffer) TakeRows() []interface{} {
res := buf.rows
buf.rows = make([]interface{}, 0, buf.bufferSize)
return res
}
// TODO update this to use local cache of high quality annotations.
// label is used to label metrics and errors in GetAnnotations
func (buf *RowBuffer) annotateServers(label string) error {
serverIPs := make(map[string]struct{})
logTime := time.Time{}
for i := range buf.rows {
r, ok := buf.rows[i].(row.Annotatable)
if !ok {
return ErrNotAnnotatable
}
// Only ask for the IP if it is non-empty.
ip := r.GetServerIP()
if ip != "" {
serverIPs[ip] = struct{}{}
}
if (logTime == time.Time{}) {
logTime = r.GetLogTime()
}
}
ipSlice := make([]string, 0, len(buf.rows))
for ip := range serverIPs {
ipSlice = append(ipSlice, ip)
}
if len(ipSlice) == 0 {
return nil
}
response, err := buf.ann.GetAnnotations(context.Background(), logTime, ipSlice, label)
if err != nil {
log.Println("error in server GetAnnotations: ", err)
metrics.AnnotationErrorCount.With(prometheus.
Labels{"source": "Server IP: RPC err in GetAnnotations."}).Inc()
return err
}
annMap := response.Annotations
if annMap == nil {
log.Println("empty server annotation response")
metrics.AnnotationErrorCount.With(prometheus.
Labels{"source": "Server IP: empty response"}).Inc()
return ErrAnnotationError
}
for i := range buf.rows {
r, ok := buf.rows[i].(row.Annotatable)
if !ok {
err = ErrNotAnnotatable
} else {
ip := r.GetServerIP()
if ip != "" {
ann, ok := annMap[ip]
if ok {
r.AnnotateServer(ann)
}
}
}
}
return err
}
// label is used to label metrics and errors in GetAnnotations
func (buf *RowBuffer) annotateClients(label string) error {
ipSlice := make([]string, 0, 2*len(buf.rows)) // This may be inadequate, but its a reasonable start.
logTime := time.Time{}
for i := range buf.rows {
r, ok := buf.rows[i].(row.Annotatable)
if !ok {
return ErrNotAnnotatable
}
ipSlice = append(ipSlice, r.GetClientIPs()...)
if (logTime == time.Time{}) {
logTime = r.GetLogTime()
}
}
response, err := buf.ann.GetAnnotations(context.Background(), logTime, ipSlice, label)
if err != nil {
log.Println("error in client GetAnnotations: ", err)
metrics.AnnotationErrorCount.With(prometheus.
Labels{"source": "Client IP: RPC err in GetAnnotations."}).Inc()
return err
}
annMap := response.Annotations
if annMap == nil {
log.Println("empty client annotation response")
metrics.AnnotationErrorCount.With(prometheus.
Labels{"source": "Client IP: empty response"}).Inc()
return ErrAnnotationError
}
for i := range buf.rows {
r, ok := buf.rows[i].(row.Annotatable)
if !ok {
err = ErrNotAnnotatable
} else {
// Will not error because we check for nil annMap above.
r.AnnotateClients(annMap)
}
}
return err
}
// Annotate fetches annotations for all rows in the buffer.
// Not thread-safe. Should only be called by owning thread.
// TODO should convert this to operate on the rows, instead of the buffer.
// Then we can do it after TakeRows().
func (buf *RowBuffer) Annotate(metricLabel string) error {
metrics.WorkerState.WithLabelValues(metricLabel, "annotate").Inc()
defer metrics.WorkerState.WithLabelValues(metricLabel, "annotate").Dec()
if len(buf.rows) == 0 {
return nil
}
start := time.Now()
defer metrics.AnnotationTimeSummary.With(prometheus.Labels{"test_type": metricLabel}).Observe(float64(time.Since(start).Nanoseconds()))
// TODO Consider doing these in parallel?
clientErr := buf.annotateClients(metricLabel)
serverErr := buf.annotateServers(metricLabel)
if clientErr != nil {
return clientErr
}
if serverErr != nil {
return serverErr
}
return nil
}
// Base provides common parser functionality.
type Base struct {
etl.Inserter
RowBuffer
}
// NewBase creates a new parser.Base. This will generally be embedded in a type specific parser.
func NewBase(ins etl.Inserter, bufSize int, ann v2as.Annotator) *Base {
buf := RowBuffer{bufSize, make([]interface{}, 0, bufSize), ann}
return &Base{ins, buf}
}
// TaskError return the task level error, based on failed rows, or any other criteria.
func (pb *Base) TaskError() error {
return nil
}
// Flush synchronously flushes any pending rows.
// Caller should generally call Annotate first, or use AnnotateAndFlush.
func (pb *Base) Flush() error {
rows := pb.TakeRows()
pb.Put(rows)
return pb.Inserter.Flush()
}
// AnnotateAndFlush annotates the rows in the buffer, and synchronously
// pushes them through Inserter.
func (pb *Base) AnnotateAndFlush(metricLabel string) error {
annErr := pb.Annotate(metricLabel)
flushErr := pb.Flush()
if flushErr != nil {
return flushErr
}
return annErr
}
// AnnotateAndPutAsync annotates the rows in the buffer (synchronously),
// and asynchronously pushes them to the Inserter.
func (pb *Base) AnnotateAndPutAsync(metricLabel string) error {
annErr := pb.Annotate(metricLabel)
rows := pb.TakeRows()
pb.PutAsync(rows)
return annErr
}