-
Notifications
You must be signed in to change notification settings - Fork 7
/
ndt7_result.go
228 lines (198 loc) · 6.92 KB
/
ndt7_result.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package parser
// This file defines the Parser subtype that handles NDT7Result data.
import (
"encoding/json"
"log"
"strings"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
v2as "github.com/m-lab/annotation-service/api/v2"
"github.com/m-lab/etl/annotation"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/metrics"
"github.com/m-lab/etl/row"
"github.com/m-lab/etl/schema"
"github.com/m-lab/go/logx"
"github.com/m-lab/ndt-server/ndt7/model"
)
//=====================================================================================
// NDT7Result Parser
//=====================================================================================
// NDT7ResultParser handles parsing of NDT7Result archives.
type NDT7ResultParser struct {
*row.Base
table string
suffix string
}
// NewNDT7ResultParser returns a parser for NDT7Result archives.
func NewNDT7ResultParser(sink row.Sink, table, suffix string, ann v2as.Annotator) etl.Parser {
bufSize := etl.NDT7.BQBufferSize()
if ann == nil {
ann = v2as.GetAnnotator(annotation.BatchURL)
}
return &NDT7ResultParser{
Base: row.NewBase(table, sink, bufSize, ann),
table: table,
suffix: suffix,
}
}
// TaskError returns non-nil if the task had enough failures to justify
// recording the entire task as in error. For now, this is any failure
// rate exceeding 10%.
func (dp *NDT7ResultParser) TaskError() error {
stats := dp.GetStats()
if stats.Total() < 10*stats.Failed {
log.Printf("Warning: high row commit errors (more than 10%%): %d failed of %d accepted\n",
stats.Failed, stats.Total())
return etl.ErrHighInsertionFailureRate
}
return nil
}
// IsParsable returns the canonical test type and whether to parse data.
func (dp *NDT7ResultParser) IsParsable(testName string, data []byte) (string, bool) {
// Files look like:
// ndt7-{upload,download}-YYYYMMDDTHHMMSS.066461502Z.<UUID>.json.gz
if strings.Contains(testName, "ndt7") && (strings.HasSuffix(testName, "json.gz") || strings.HasSuffix(testName, "json")) {
return "ndt7_result", true
}
logx.Debug.Println("ndt7 unknown file:", testName)
return "unknown", false
}
// ParseAndInsert decodes the data.NDT7Result JSON and inserts it into BQ.
func (dp *NDT7ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error {
// TODO: derive 'ndt5' (or 'ndt7') labels from testName.
metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt7_result").Inc()
defer metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt7_result").Dec()
row := schema.NDT7ResultRow{
Parser: schema.ParseInfo{
Version: Version(),
Time: time.Now(),
ArchiveURL: meta["filename"].(string),
Filename: testName,
},
}
// Parse the test.
err := json.Unmarshal(test, &row.Raw)
if err != nil {
log.Println(meta["filename"].(string), testName, err)
metrics.TestCount.WithLabelValues(dp.TableName(), "ndt7_result", "Unmarshal").Inc()
return err
}
// This is a hack to deal with the ConnectionInfo fields that are not intended to be
// exported to bigquery. With the GCS row.Sink, we convert to json, but we cannot
// tag the json, because the json tag is already used for the NDT7 client comms.
if row.Raw.Download != nil && row.Raw.Download.ServerMeasurements != nil {
for i := range row.Raw.Download.ServerMeasurements {
row.Raw.Download.ServerMeasurements[i].ConnectionInfo = nil
}
}
if row.Raw.Upload != nil && row.Raw.Upload.ServerMeasurements != nil {
for i := range row.Raw.Upload.ServerMeasurements {
row.Raw.Upload.ServerMeasurements[i].ConnectionInfo = nil
}
}
// NOTE: Civil is not TZ adjusted. It takes the year, month, and date from
// the given timestamp, regardless of the timestamp's timezone. Since we
// run our systems in UTC, all timestamps will be relative to UTC and as
// will these dates.
row.Date = meta["date"].(civil.Date)
if row.Raw.Download != nil {
row.A = downSummary(row.Raw.Download)
} else if row.Raw.Upload != nil {
row.A = upSummary(row.Raw.Upload)
} else {
metrics.WarningCount.WithLabelValues(
dp.TableName(), "ndt7", "download and upload are both nil").Inc()
}
row.ID = row.A.UUID
// Estimate the row size based on the input JSON size.
metrics.RowSizeHistogram.WithLabelValues(
dp.TableName()).Observe(float64(len(test)))
// Insert the row.
err = dp.Base.Put(&row)
if err != nil {
return err
}
// Count successful inserts.
metrics.TestCount.WithLabelValues(dp.TableName(), "ndt7_result", "ok").Inc()
return nil
}
func downSummary(down *model.ArchivalData) schema.NDT7Summary {
return schema.NDT7Summary{
UUID: down.UUID,
TestTime: down.StartTime,
CongestionControl: "bbr",
MeanThroughputMbps: downRate(down.ServerMeasurements),
MinRTT: minRTT(down.ServerMeasurements),
LossRate: lossRate(down.ServerMeasurements),
}
}
func upSummary(up *model.ArchivalData) schema.NDT7Summary {
return schema.NDT7Summary{
UUID: up.UUID,
TestTime: up.StartTime,
CongestionControl: "bbr", // TODO: what is the right value here?
MeanThroughputMbps: upRate(up.ServerMeasurements),
MinRTT: minRTT(up.ServerMeasurements),
LossRate: 0, // TODO: what is the correct measure for upload?
}
}
func lossRate(m []model.Measurement) float64 {
var loss float64
if len(m) > 0 {
loss = float64(m[len(m)-1].TCPInfo.BytesRetrans) / float64(m[len(m)-1].TCPInfo.BytesSent)
}
return loss
}
func downRate(m []model.Measurement) float64 {
var mbps float64
if len(m) > 0 {
// Convert to Mbps.
mbps = 8 * float64(m[len(m)-1].TCPInfo.BytesAcked) / float64(m[len(m)-1].TCPInfo.ElapsedTime)
}
return mbps
}
func upRate(m []model.Measurement) float64 {
var mbps float64
if len(m) > 0 {
// Convert to Mbps.
mbps = 8 * float64(m[len(m)-1].TCPInfo.BytesReceived) / float64(m[len(m)-1].TCPInfo.ElapsedTime)
}
return mbps
}
func minRTT(m []model.Measurement) float64 {
var rtt float64
if len(m) > 0 {
// Convert to seconds.
rtt = float64(m[len(m)-1].TCPInfo.MinRTT) / 1000000
}
return rtt
}
// NB: These functions are also required to complete the etl.Parser interface.
// For NDT7Result, we just forward the calls to the Inserter.
func (dp *NDT7ResultParser) Flush() error {
return dp.Base.Flush()
}
func (dp *NDT7ResultParser) TableName() string {
return dp.table
}
func (dp *NDT7ResultParser) FullTableName() string {
return dp.table + dp.suffix
}
// RowsInBuffer returns the count of rows currently in the buffer.
func (dp *NDT7ResultParser) RowsInBuffer() int {
return dp.GetStats().Pending
}
// Committed returns the count of rows successfully committed to BQ.
func (dp *NDT7ResultParser) Committed() int {
return dp.GetStats().Committed
}
// Accepted returns the count of all rows received through InsertRow(s)
func (dp *NDT7ResultParser) Accepted() int {
return dp.GetStats().Total()
}
// Failed returns the count of all rows that could not be committed.
func (dp *NDT7ResultParser) Failed() int {
return dp.GetStats().Failed
}