diff --git a/experiment/ndt7/callback.go b/experiment/ndt7/callback.go new file mode 100644 index 00000000..00f72f39 --- /dev/null +++ b/experiment/ndt7/callback.go @@ -0,0 +1,8 @@ +package ndt7 + +import "time" + +type ( + callbackJSON func(data []byte) error + callbackPerformance func(elapsed time.Duration, count int64) +) diff --git a/experiment/ndt7/callback_test.go b/experiment/ndt7/callback_test.go new file mode 100644 index 00000000..1a786131 --- /dev/null +++ b/experiment/ndt7/callback_test.go @@ -0,0 +1,10 @@ +package ndt7 + +import "time" + +func defaultCallbackJSON(data []byte) error { + return nil +} + +func defaultCallbackPerformance(elapsed time.Duration, count int64) { +} diff --git a/experiment/ndt7/dial.go b/experiment/ndt7/dial.go new file mode 100644 index 00000000..2f82245c --- /dev/null +++ b/experiment/ndt7/dial.go @@ -0,0 +1,54 @@ +package ndt7 + +import ( + "context" + "crypto/tls" + "net/http" + "net/url" + + "github.com/gorilla/websocket" +) + +type dialManager struct { + hostname string + port string + readBufferSize int + scheme string + tlsConfig *tls.Config + writeBufferSize int +} + +func newDialManager(hostname string) dialManager { + return dialManager{ + hostname: hostname, + port: "443", + readBufferSize: paramMaxMessageSize, + scheme: "wss", + writeBufferSize: paramMaxMessageSize, + } +} + +func (mgr dialManager) dialWithTestName(ctx context.Context, testName string) (*websocket.Conn, error) { + dialer := websocket.Dialer{ + ReadBufferSize: mgr.readBufferSize, + TLSClientConfig: mgr.tlsConfig, + WriteBufferSize: mgr.writeBufferSize, + } + URL := url.URL{ + Scheme: mgr.scheme, + Host: mgr.hostname + ":" + mgr.port, + } + URL.Path = "/ndt/v7/" + testName + headers := http.Header{} + headers.Add("Sec-WebSocket-Protocol", "net.measurementlab.ndt.v7") + conn, _, err := dialer.DialContext(ctx, URL.String(), headers) + return conn, err +} + +func (mgr dialManager) dialDownload(ctx context.Context) (*websocket.Conn, error) { + return mgr.dialWithTestName(ctx, "download") +} + +func (mgr dialManager) dialUpload(ctx context.Context) (*websocket.Conn, error) { + return mgr.dialWithTestName(ctx, "upload") +} diff --git a/experiment/ndt7/dial_test.go b/experiment/ndt7/dial_test.go new file mode 100644 index 00000000..7aa38536 --- /dev/null +++ b/experiment/ndt7/dial_test.go @@ -0,0 +1,33 @@ +package ndt7 + +import ( + "context" + "strings" + "testing" +) + +func TestDialDownloadWithCancelledContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately halt + mgr := newDialManager("hostname.fake") + conn, err := mgr.dialDownload(ctx) + if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") { + t.Fatal("not the error we expected") + } + if conn != nil { + t.Fatal("expected nil conn here") + } +} + +func TestDialUploadWithCancelledContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately halt + mgr := newDialManager("hostname.fake") + conn, err := mgr.dialUpload(ctx) + if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") { + t.Fatal("not the error we expected") + } + if conn != nil { + t.Fatal("expected nil conn here") + } +} diff --git a/experiment/ndt7/download.go b/experiment/ndt7/download.go new file mode 100644 index 00000000..37977d14 --- /dev/null +++ b/experiment/ndt7/download.go @@ -0,0 +1,71 @@ +package ndt7 + +import ( + "context" + "io" + "io/ioutil" + "time" + + "github.com/gorilla/websocket" +) + +type downloadManager struct { + conn mockableConn + maxMessageSize int64 + maxRuntime time.Duration + measureInterval time.Duration + onJSON callbackJSON + onPerformance callbackPerformance +} + +func newDownloadManager( + conn mockableConn, onPerformance callbackPerformance, + onJSON callbackJSON, +) downloadManager { + return downloadManager{ + conn: conn, + maxMessageSize: paramMaxMessageSize, + maxRuntime: paramMaxRuntime, + measureInterval: paramMeasureInterval, + onJSON: onJSON, + onPerformance: onPerformance, + } +} + +func (mgr downloadManager) run(ctx context.Context) error { + var total int64 + start := time.Now() + if err := mgr.conn.SetReadDeadline(start.Add(mgr.maxRuntime)); err != nil { + return err + } + mgr.conn.SetReadLimit(mgr.maxMessageSize) + ticker := time.NewTicker(mgr.measureInterval) + defer ticker.Stop() + for ctx.Err() == nil { + kind, reader, err := mgr.conn.NextReader() + if err != nil { + return err + } + if kind == websocket.TextMessage { + data, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + if err := mgr.onJSON(data); err != nil { + return err + } + } + n, err := io.Copy(ioutil.Discard, reader) + if err != nil { + return err + } + total += int64(n) + select { + case now := <-ticker.C: + mgr.onPerformance(now.Sub(start), total) + default: + // NOTHING + } + } + return nil +} diff --git a/experiment/ndt7/download_test.go b/experiment/ndt7/download_test.go new file mode 100644 index 00000000..bc7bcbeb --- /dev/null +++ b/experiment/ndt7/download_test.go @@ -0,0 +1,145 @@ +package ndt7 + +import ( + "context" + "encoding/json" + "errors" + "io" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func TestUnitDownloadSetReadDeadlineFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newDownloadManager( + &mockableConnMock{ + ReadDeadlineErr: expected, + }, + defaultCallbackPerformance, + defaultCallbackJSON, + ) + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +func TestUnitDownloadNextReaderFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newDownloadManager( + &mockableConnMock{ + NextReaderErr: expected, + }, + defaultCallbackPerformance, + defaultCallbackJSON, + ) + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +func TestUnitDownloadTextMessageReadAllFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newDownloadManager( + &mockableConnMock{ + NextReaderMsgType: websocket.TextMessage, + NextReaderReader: func() io.Reader { + return &alwaysFailingReader{ + Err: expected, + } + }, + }, + defaultCallbackPerformance, + defaultCallbackJSON, + ) + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +type alwaysFailingReader struct { + Err error +} + +func (r *alwaysFailingReader) Read(p []byte) (int, error) { + return 0, r.Err +} + +func TestUnitDownloadBinaryMessageReadAllFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newDownloadManager( + &mockableConnMock{ + NextReaderMsgType: websocket.BinaryMessage, + NextReaderReader: func() io.Reader { + return &alwaysFailingReader{ + Err: expected, + } + }, + }, + defaultCallbackPerformance, + defaultCallbackJSON, + ) + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +func TestUnitDownloadOnJSONCallbackError(t *testing.T) { + mgr := newDownloadManager( + &mockableConnMock{ + NextReaderMsgType: websocket.TextMessage, + NextReaderReader: func() io.Reader { + return &invalidJSONReader{} + }, + }, + defaultCallbackPerformance, + func(data []byte) error { + var v interface{} + return json.Unmarshal(data, &v) + }, + ) + err := mgr.run(context.Background()) + if err == nil || !strings.HasSuffix(err.Error(), "unexpected end of JSON input") { + t.Fatal("not the error we expected") + } +} + +type invalidJSONReader struct{} + +func (r *invalidJSONReader) Read(p []byte) (int, error) { + return copy(p, []byte(`{`)), io.EOF +} + +func TestUnitDownloadOnJSONLoop(t *testing.T) { + mgr := newDownloadManager( + &mockableConnMock{ + NextReaderMsgType: websocket.TextMessage, + NextReaderReader: func() io.Reader { + return &goodJSONReader{} + }, + }, + defaultCallbackPerformance, + func(data []byte) error { + var v interface{} + return json.Unmarshal(data, &v) + }, + ) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := mgr.run(ctx) + if err != nil { + t.Fatal(err) + } +} + +type goodJSONReader struct{} + +func (r *goodJSONReader) Read(p []byte) (int, error) { + return copy(p, []byte(`{}`)), io.EOF +} diff --git a/experiment/ndt7/mockable.go b/experiment/ndt7/mockable.go new file mode 100644 index 00000000..4c0d6d77 --- /dev/null +++ b/experiment/ndt7/mockable.go @@ -0,0 +1,16 @@ +package ndt7 + +import ( + "io" + "time" + + "github.com/gorilla/websocket" +) + +type mockableConn interface { + NextReader() (int, io.Reader, error) + SetReadDeadline(time.Time) error + SetReadLimit(int64) + SetWriteDeadline(time.Time) error + WritePreparedMessage(*websocket.PreparedMessage) error +} diff --git a/experiment/ndt7/mockable_test.go b/experiment/ndt7/mockable_test.go new file mode 100644 index 00000000..7e0d61ca --- /dev/null +++ b/experiment/ndt7/mockable_test.go @@ -0,0 +1,39 @@ +package ndt7 + +import ( + "io" + "time" + + "github.com/gorilla/websocket" +) + +type mockableConnMock struct { + NextReaderMsgType int + NextReaderErr error + NextReaderReader func() io.Reader + ReadDeadlineErr error + WriteDeadlineErr error + WritePreparedMessageErr error +} + +func (c *mockableConnMock) NextReader() (int, io.Reader, error) { + var reader io.Reader + if c.NextReaderReader != nil { + reader = c.NextReaderReader() + } + return c.NextReaderMsgType, reader, c.NextReaderErr +} + +func (c *mockableConnMock) SetReadDeadline(time.Time) error { + return c.ReadDeadlineErr +} + +func (c *mockableConnMock) SetReadLimit(int64) {} + +func (c *mockableConnMock) SetWriteDeadline(time.Time) error { + return c.WriteDeadlineErr +} + +func (c *mockableConnMock) WritePreparedMessage(*websocket.PreparedMessage) error { + return c.WritePreparedMessageErr +} diff --git a/experiment/ndt7/ndt7.go b/experiment/ndt7/ndt7.go index 64bf940f..0af5a995 100644 --- a/experiment/ndt7/ndt7.go +++ b/experiment/ndt7/ndt7.go @@ -5,63 +5,62 @@ import ( "context" "encoding/json" "fmt" - "io" - "net/http" + "time" "github.com/dustin/go-humanize" - - upstream "github.com/m-lab/ndt7-client-go" - "github.com/m-lab/ndt7-client-go/mlabns" "github.com/m-lab/ndt7-client-go/spec" - + "github.com/ooni/probe-engine/internal/mlablocate" "github.com/ooni/probe-engine/model" ) const ( testName = "ndt7" - testVersion = "0.1.0" + testVersion = "0.2.0" ) // Config contains the experiment settings type Config struct{} +// Summary is the measurement summary +type Summary struct { + AvgRTT float64 `json:"avg_rtt"` // Average RTT [ms] + Download float64 `json:"download"` // download speed [kbit/s] + MSS int64 `json:"mss"` // MSS + MaxRTT float64 `json:"max_rtt"` // Max AvgRTT sample seen [ms] + MinRTT float64 `json:"min_rtt"` // Min RTT according to kernel [ms] + Ping float64 `json:"ping"` // Equivalent to MinRTT [ms] + RetransmitRate float64 `json:"retransmit_rate"` // bytes_retrans/bytes_sent [0..1] + Upload float64 `json:"upload"` // upload speed [kbit/s] +} + // TestKeys contains the test keys type TestKeys struct { - // Failure is the failure string - Failure string `json:"failure"` - // Download contains download results Download []spec.Measurement `json:"download"` + // Failure is the failure string + Failure *string `json:"failure"` + + // Summary contains the measurement summary + Summary Summary `json:"summary"` + // Upload contains upload results Upload []spec.Measurement `json:"upload"` } -func discover(ctx context.Context, sess model.ExperimentSession) (string, error) { - client := mlabns.NewClient("ndt7", sess.UserAgent()) - // Basically: (1) make sure we're using our tracing and possibly proxied - // client rather than default; (2) if we have an explicit proxy make sure - // we tell mlab-ns to use our IP address rather than the proxy one. - client.HTTPClient = sess.DefaultHTTPClient() - if sess.ExplicitProxy() { - client.RequestMaker = func( - method, url string, body io.Reader, - ) (*http.Request, error) { - req, err := http.NewRequest(method, url, body) - if err != nil { - return nil, err - } - values := req.URL.Query() - values.Set("ip", sess.ProbeIP()) - req.URL.RawQuery = values.Encode() - return req, nil - } - } - return client.Query(ctx) +type measurer struct { + config Config + jsonUnmarshal func(data []byte, v interface{}) error + preDownloadHook func() + preUploadHook func() } -type measurer struct { - config Config +func (m *measurer) discover(ctx context.Context, sess model.ExperimentSession) (string, error) { + client := mlablocate.NewClient(sess.DefaultHTTPClient(), sess.Logger(), sess.UserAgent()) + if sess.ExplicitProxy() { + client.NewRequest = mlablocate.NewRequestWithProxy(sess.ProbeIP()) + } + return client.Query(ctx, "ndt7") } func (m *measurer) ExperimentName() string { @@ -72,71 +71,133 @@ func (m *measurer) ExperimentVersion() string { return testVersion } -func (m *measurer) Run( +func (m *measurer) doDownload( ctx context.Context, sess model.ExperimentSession, - measurement *model.Measurement, callbacks model.ExperimentCallbacks, + callbacks model.ExperimentCallbacks, tk *TestKeys, + hostname string, ) error { - const maxRuntime = 15.0 // second (conservative) - testkeys := &TestKeys{} - measurement.TestKeys = testkeys - client := upstream.NewClient(sess.SoftwareName(), sess.SoftwareVersion()) - FQDN, err := discover(ctx, sess) + conn, err := newDialManager(hostname).dialDownload(ctx) if err != nil { - testkeys.Failure = err.Error() return err } - client.FQDN = FQDN // skip client's own mlabns call - sess.Logger().Debugf("ndt7: mlabns returned %s to us", FQDN) - ch, err := client.StartDownload(ctx) - if err != nil { - testkeys.Failure = err.Error() - return err - } - callbacks.OnProgress(0, fmt.Sprintf("server: %s", client.FQDN)) - for ev := range ch { - testkeys.Download = append(testkeys.Download, ev) - if ev.AppInfo != nil && ev.Origin == "client" { - elapsed := float64(ev.AppInfo.ElapsedTime) / 1e06 // to seconds + defer conn.Close() + mgr := newDownloadManager( + conn, + func(timediff time.Duration, count int64) { + elapsed := timediff.Seconds() // The percentage of completion of download goes from 0 to // 50% of the whole experiment, hence the `/2.0`. - percentage := elapsed / maxRuntime / 2.0 - speed := float64(ev.AppInfo.NumBytes) * 8.0 / elapsed - message := fmt.Sprintf( - "download-speed %s", humanize.SI(float64(speed), "bit/s"), - ) + percentage := elapsed / paramMaxRuntimeUpperBound / 2.0 + speed := float64(count) * 8.0 / elapsed + message := fmt.Sprintf("download-speed %s", humanize.SI(float64(speed), "bit/s")) + tk.Summary.Download = speed / 1e03 /* bit/s => kbit/s */ callbacks.OnProgress(percentage, message) - } - data, err := json.Marshal(ev) - if err != nil { - testkeys.Failure = err.Error() - return err - } - sess.Logger().Debugf("%s", string(data)) + tk.Download = append(tk.Download, spec.Measurement{ + AppInfo: &spec.AppInfo{ + ElapsedTime: int64(timediff / time.Microsecond), + NumBytes: count, + }, + Origin: "client", + Test: "download", + }) + }, + func(data []byte) error { + sess.Logger().Debugf("%s", string(data)) + var measurement spec.Measurement + if err := m.jsonUnmarshal(data, &measurement); err != nil { + return err + } + if measurement.TCPInfo != nil { + rtt := float64(measurement.TCPInfo.RTT) / 1e03 /* us => ms */ + tk.Summary.AvgRTT = rtt + tk.Summary.MSS = int64(measurement.TCPInfo.AdvMSS) + if tk.Summary.MaxRTT < rtt { + tk.Summary.MaxRTT = rtt + } + tk.Summary.MinRTT = float64(measurement.TCPInfo.MinRTT) / 1e03 /* us => ms */ + tk.Summary.Ping = tk.Summary.MinRTT + if measurement.TCPInfo.BytesSent > 0 { + tk.Summary.RetransmitRate = (float64(measurement.TCPInfo.BytesRetrans) / + float64(measurement.TCPInfo.BytesSent)) + } + measurement.BBRInfo = nil // don't encourage people to use it + measurement.ConnectionInfo = nil // do we need to save it? + measurement.Origin = "server" + measurement.Test = "download" + tk.Download = append(tk.Download, measurement) + } + return nil + }, + ) + if err := mgr.run(ctx); err != nil { + sess.Logger().Warnf("download: %s", err) } - ch, err = client.StartUpload(ctx) + return nil // failure is only when we cannot connect +} + +func (m *measurer) doUpload( + ctx context.Context, sess model.ExperimentSession, + callbacks model.ExperimentCallbacks, tk *TestKeys, + hostname string, +) error { + conn, err := newDialManager(hostname).dialUpload(ctx) if err != nil { - testkeys.Failure = err.Error() return err } - for ev := range ch { - testkeys.Upload = append(testkeys.Upload, ev) - if ev.AppInfo != nil && ev.Origin == "client" { - elapsed := float64(ev.AppInfo.ElapsedTime) / 1e06 // to seconds + defer conn.Close() + mgr := newUploadManager( + conn, + func(timediff time.Duration, count int64) { + elapsed := timediff.Seconds() // The percentage of completion of upload goes from 50% to 100% of // the whole experiment, hence `0.5 +` and `/2.0`. - percentage := 0.5 + elapsed/maxRuntime/2.0 - speed := float64(ev.AppInfo.NumBytes) * 8.0 / elapsed - message := fmt.Sprintf( - "upload-speed %s", humanize.SI(float64(speed), "bit/s"), - ) + percentage := 0.5 + elapsed/paramMaxRuntimeUpperBound/2.0 + speed := float64(count) * 8.0 / elapsed + message := fmt.Sprintf("upload-speed %s", humanize.SI(float64(speed), "bit/s")) + tk.Summary.Upload = speed / 1e03 /* bit/s => kbit/s */ callbacks.OnProgress(percentage, message) - } - data, err := json.Marshal(ev) - if err != nil { - testkeys.Failure = err.Error() - return err - } - sess.Logger().Debugf("%s", string(data)) + tk.Upload = append(tk.Upload, spec.Measurement{ + AppInfo: &spec.AppInfo{ + ElapsedTime: int64(timediff / time.Microsecond), + NumBytes: count, + }, + Origin: "client", + Test: "upload", + }) + }, + ) + if err := mgr.run(ctx); err != nil { + sess.Logger().Warnf("upload: %s", err) + } + return nil // failure is only when we cannot connect +} + +func (m *measurer) Run( + ctx context.Context, sess model.ExperimentSession, + measurement *model.Measurement, callbacks model.ExperimentCallbacks, +) error { + tk := new(TestKeys) + measurement.TestKeys = tk + hostname, err := m.discover(ctx, sess) + if err != nil { + tk.Failure = failureFromError(err) + return err + } + callbacks.OnProgress(0, fmt.Sprintf("downloading: %s", hostname)) + if m.preDownloadHook != nil { + m.preDownloadHook() + } + if err := m.doDownload(ctx, sess, callbacks, tk, hostname); err != nil { + tk.Failure = failureFromError(err) + return err + } + callbacks.OnProgress(0.5, fmt.Sprintf("uploading: %s", hostname)) + if m.preUploadHook != nil { + m.preUploadHook() + } + if err := m.doUpload(ctx, sess, callbacks, tk, hostname); err != nil { + tk.Failure = failureFromError(err) + return err } callbacks.OnProgress(1, "done") return nil @@ -144,5 +205,13 @@ func (m *measurer) Run( // NewExperimentMeasurer creates a new ExperimentMeasurer. func NewExperimentMeasurer(config Config) model.ExperimentMeasurer { - return &measurer{config: config} + return &measurer{config: config, jsonUnmarshal: json.Unmarshal} +} + +func failureFromError(err error) (failure *string) { + if err != nil { + s := err.Error() + failure = &s + } + return } diff --git a/experiment/ndt7/ndt7_test.go b/experiment/ndt7/ndt7_test.go index 17fcd186..52838f6b 100644 --- a/experiment/ndt7/ndt7_test.go +++ b/experiment/ndt7/ndt7_test.go @@ -1,18 +1,206 @@ -package ndt7_test +package ndt7 import ( + "context" + "errors" + "net/http" + "strings" "testing" - "github.com/ooni/probe-engine/experiment/mktesting" - "github.com/ooni/probe-engine/experiment/ndt7" + "github.com/apex/log" + "github.com/ooni/probe-engine/experiment/handler" + "github.com/ooni/probe-engine/internal/mockable" "github.com/ooni/probe-engine/model" ) +func TestUnitNewExperimentMeasurer(t *testing.T) { + measurer := NewExperimentMeasurer(Config{}) + if measurer.ExperimentName() != "ndt7" { + t.Fatal("unexpected name") + } + if measurer.ExperimentVersion() != "0.2.0" { + t.Fatal("unexpected version") + } +} + +func TestUnitDiscoverCancelledContext(t *testing.T) { + m := new(measurer) + sess := &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + MockableUserAgent: "miniooni/0.1.0-dev", + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately cancel + fqdn, err := m.discover(ctx, sess) + if !errors.Is(err, context.Canceled) { + t.Fatal("not the error we expected") + } + if fqdn != "" { + t.Fatal("not the fqdn we expected") + } +} + +func TestUnitDiscoverWithExplicitProxy(t *testing.T) { + m := new(measurer) + expected := errors.New("expected error") + sess := &mockable.ExperimentSession{ + MockableExplicitProxy: true, + MockableHTTPClient: &http.Client{ + Transport: &verifyRequestTransport{ + ExpectedError: expected, + }, + }, + MockableLogger: log.Log, + MockableProbeIP: "1.2.3.4", + MockableUserAgent: "miniooni/0.1.0-dev", + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately cancel + fqdn, err := m.discover(ctx, sess) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } + if fqdn != "" { + t.Fatal("not the fqdn we expected") + } +} + +type verifyRequestTransport struct { + ExpectedError error +} + +func (txp *verifyRequestTransport) RoundTrip(req *http.Request) (*http.Response, error) { + if req.URL.RawQuery != "ip=1.2.3.4" { + return nil, errors.New("invalid req.URL.RawQuery") + } + return nil, txp.ExpectedError +} + +func TestUnitDoDownloadWithCancelledContext(t *testing.T) { + m := new(measurer) + sess := &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + MockableUserAgent: "miniooni/0.1.0-dev", + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately cancel + err := m.doDownload(ctx, sess, handler.NewPrinterCallbacks(log.Log), new(TestKeys), "host.name") + if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") { + t.Fatal("not the error we expected") + } +} + +func TestUnitDoUploadWithCancelledContext(t *testing.T) { + m := new(measurer) + sess := &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + MockableUserAgent: "miniooni/0.1.0-dev", + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately cancel + err := m.doUpload(ctx, sess, handler.NewPrinterCallbacks(log.Log), new(TestKeys), "host.name") + if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") { + t.Fatal("not the error we expected") + } +} + +func TestUnitRunWithCancelledContext(t *testing.T) { + m := new(measurer) + sess := &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + MockableUserAgent: "miniooni/0.1.0-dev", + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately cancel + err := m.Run(ctx, sess, new(model.Measurement), handler.NewPrinterCallbacks(log.Log)) + if !errors.Is(err, context.Canceled) { + t.Fatal("not the error we expected") + } +} + func TestIntegration(t *testing.T) { - err := mktesting.Run("", func() model.ExperimentMeasurer { - return ndt7.NewExperimentMeasurer(ndt7.Config{}) - }) + measurer := NewExperimentMeasurer(Config{}) + err := measurer.Run( + context.Background(), + &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + }, + new(model.Measurement), + handler.NewPrinterCallbacks(log.Log), + ) + if err != nil { + t.Fatal(err) + } +} + +func TestIntegrationFailDownload(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + measurer := NewExperimentMeasurer(Config{}).(*measurer) + measurer.preDownloadHook = func() { + cancel() + } + err := measurer.Run( + ctx, + &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + }, + new(model.Measurement), + handler.NewPrinterCallbacks(log.Log), + ) + if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") { + t.Fatal(err) + } +} + +func TestIntegrationFailUpload(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + measurer := NewExperimentMeasurer(Config{}).(*measurer) + measurer.preUploadHook = func() { + cancel() + } + err := measurer.Run( + ctx, + &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + }, + new(model.Measurement), + handler.NewPrinterCallbacks(log.Log), + ) + if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") { + t.Fatal(err) + } +} + +func TestIntegrationDownloadJSONUnmarshalFail(t *testing.T) { + measurer := NewExperimentMeasurer(Config{}).(*measurer) + var seenError bool + expected := errors.New("expected error") + measurer.jsonUnmarshal = func(data []byte, v interface{}) error { + seenError = true + return expected + } + err := measurer.Run( + context.Background(), + &mockable.ExperimentSession{ + MockableHTTPClient: http.DefaultClient, + MockableLogger: log.Log, + }, + new(model.Measurement), + handler.NewPrinterCallbacks(log.Log), + ) if err != nil { t.Fatal(err) } + if !seenError { + t.Fatal("did not see expected error") + } } diff --git a/experiment/ndt7/param.go b/experiment/ndt7/param.go new file mode 100644 index 00000000..45e74e4d --- /dev/null +++ b/experiment/ndt7/param.go @@ -0,0 +1,13 @@ +package ndt7 + +import "time" + +const ( + paramFractionForScaling = 16 + paramMinMessageSize = 1 << 10 + paramMaxScaledMessageSize = 1 << 20 + paramMaxMessageSize = 1 << 24 + paramMaxRuntimeUpperBound = 15.0 // seconds + paramMaxRuntime = 10 * time.Second + paramMeasureInterval = 250 * time.Millisecond +) diff --git a/experiment/ndt7/upload.go b/experiment/ndt7/upload.go new file mode 100644 index 00000000..26637c97 --- /dev/null +++ b/experiment/ndt7/upload.go @@ -0,0 +1,75 @@ +package ndt7 + +import ( + "context" + "time" + + "github.com/gorilla/websocket" +) + +func newMessage(n int) (*websocket.PreparedMessage, error) { + return websocket.NewPreparedMessage(websocket.BinaryMessage, make([]byte, n)) +} + +type uploadManager struct { + conn mockableConn + fractionForScaling int64 + maxRuntime time.Duration + maxMessageSize int + maxScaledMessageSize int + measureInterval time.Duration + minMessageSize int + newMessage func(int) (*websocket.PreparedMessage, error) + onPerformance callbackPerformance +} + +func newUploadManager( + conn mockableConn, onPerformance callbackPerformance, +) uploadManager { + return uploadManager{ + conn: conn, + fractionForScaling: paramFractionForScaling, + maxRuntime: paramMaxRuntime, + maxMessageSize: paramMaxMessageSize, + maxScaledMessageSize: paramMaxScaledMessageSize, + measureInterval: paramMeasureInterval, + minMessageSize: paramMinMessageSize, + newMessage: newMessage, + onPerformance: onPerformance, + } +} + +func (mgr uploadManager) run(ctx context.Context) error { + var total int64 + start := time.Now() + if err := mgr.conn.SetWriteDeadline(time.Now().Add(mgr.maxRuntime)); err != nil { + return err + } + size := mgr.minMessageSize + message, err := mgr.newMessage(size) + if err != nil { + return err + } + ticker := time.NewTicker(mgr.measureInterval) + defer ticker.Stop() + for ctx.Err() == nil { + if err := mgr.conn.WritePreparedMessage(message); err != nil { + return err + } + total += int64(size) + select { + case now := <-ticker.C: + mgr.onPerformance(now.Sub(start), total) + default: + // NOTHING + } + if size >= mgr.maxScaledMessageSize || int64(size) >= (total/mgr.fractionForScaling) { + continue + } + size <<= 1 + if message, err = mgr.newMessage(size); err != nil { + return err + } + } + return nil +} diff --git a/experiment/ndt7/upload_test.go b/experiment/ndt7/upload_test.go new file mode 100644 index 00000000..1bf28f83 --- /dev/null +++ b/experiment/ndt7/upload_test.go @@ -0,0 +1,89 @@ +package ndt7 + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func TestUnitUploadSetWriteDeadlineFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newUploadManager( + &mockableConnMock{ + WriteDeadlineErr: expected, + }, + defaultCallbackPerformance, + ) + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +func TestUnitUploadNewMessageFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newUploadManager( + &mockableConnMock{}, + defaultCallbackPerformance, + ) + mgr.newMessage = func(int) (*websocket.PreparedMessage, error) { + return nil, expected + } + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +func TestUnitUploadWritePreparedMessageFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newUploadManager( + &mockableConnMock{ + WritePreparedMessageErr: expected, + }, + defaultCallbackPerformance, + ) + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +func TestUnitUploadWritePreparedMessageSubsequentFailure(t *testing.T) { + expected := errors.New("mocked error") + mgr := newUploadManager( + &mockableConnMock{}, + defaultCallbackPerformance, + ) + var already bool + mgr.newMessage = func(int) (*websocket.PreparedMessage, error) { + if !already { + already = true + return new(websocket.PreparedMessage), nil + } + return nil, expected + } + err := mgr.run(context.Background()) + if !errors.Is(err, expected) { + t.Fatal("not the error we expected") + } +} + +func TestUnitUploadLoop(t *testing.T) { + mgr := newUploadManager( + &mockableConnMock{}, + defaultCallbackPerformance, + ) + mgr.newMessage = func(int) (*websocket.PreparedMessage, error) { + return new(websocket.PreparedMessage), nil + } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := mgr.run(ctx) + if err != nil { + t.Fatal(err) + } +} diff --git a/go.mod b/go.mod index b6f627be..718adf71 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/gobwas/glob v0.2.4-0.20180402141543-f00a7392b439 // indirect github.com/golang/mock v1.3.1 // indirect github.com/google/gxui v0.0.0-20151028112939-f85e0a97b3a4 // indirect + github.com/gorilla/websocket v1.4.1 github.com/grafov/m3u8 v0.0.0-20171211212457-6ab8f28ed427 // indirect github.com/hashicorp/golang-lru v0.5.1 // indirect github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334