Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions experiment/ndt7/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ndt7

import "time"

type (
callbackJSON func(data []byte) error
callbackPerformance func(elapsed time.Duration, count int64)
)
10 changes: 10 additions & 0 deletions experiment/ndt7/callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ndt7

import "time"

func defaultCallbackJSON(data []byte) error {
return nil
}

func defaultCallbackPerformance(elapsed time.Duration, count int64) {
}
54 changes: 54 additions & 0 deletions experiment/ndt7/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package ndt7

import (
"context"
"crypto/tls"
"net/http"
"net/url"

"github.com/gorilla/websocket"
)

type dialManager struct {
hostname string
port string
readBufferSize int
scheme string
tlsConfig *tls.Config
writeBufferSize int
}

func newDialManager(hostname string) dialManager {
return dialManager{
hostname: hostname,
port: "443",
readBufferSize: paramMaxMessageSize,
scheme: "wss",
writeBufferSize: paramMaxMessageSize,
}
}

func (mgr dialManager) dialWithTestName(ctx context.Context, testName string) (*websocket.Conn, error) {
dialer := websocket.Dialer{
ReadBufferSize: mgr.readBufferSize,
TLSClientConfig: mgr.tlsConfig,
WriteBufferSize: mgr.writeBufferSize,
}
URL := url.URL{
Scheme: mgr.scheme,
Host: mgr.hostname + ":" + mgr.port,
}
URL.Path = "/ndt/v7/" + testName
headers := http.Header{}
headers.Add("Sec-WebSocket-Protocol", "net.measurementlab.ndt.v7")
conn, _, err := dialer.DialContext(ctx, URL.String(), headers)
return conn, err
}

func (mgr dialManager) dialDownload(ctx context.Context) (*websocket.Conn, error) {
return mgr.dialWithTestName(ctx, "download")
}

func (mgr dialManager) dialUpload(ctx context.Context) (*websocket.Conn, error) {
return mgr.dialWithTestName(ctx, "upload")
}
33 changes: 33 additions & 0 deletions experiment/ndt7/dial_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package ndt7

import (
"context"
"strings"
"testing"
)

func TestDialDownloadWithCancelledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // immediately halt
mgr := newDialManager("hostname.fake")
conn, err := mgr.dialDownload(ctx)
if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") {
t.Fatal("not the error we expected")
}
if conn != nil {
t.Fatal("expected nil conn here")
}
}

func TestDialUploadWithCancelledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // immediately halt
mgr := newDialManager("hostname.fake")
conn, err := mgr.dialUpload(ctx)
if err == nil || !strings.HasSuffix(err.Error(), "operation was canceled") {
t.Fatal("not the error we expected")
}
if conn != nil {
t.Fatal("expected nil conn here")
}
}
71 changes: 71 additions & 0 deletions experiment/ndt7/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package ndt7

import (
"context"
"io"
"io/ioutil"
"time"

"github.com/gorilla/websocket"
)

type downloadManager struct {
conn mockableConn
maxMessageSize int64
maxRuntime time.Duration
measureInterval time.Duration
onJSON callbackJSON
onPerformance callbackPerformance
}

func newDownloadManager(
conn mockableConn, onPerformance callbackPerformance,
onJSON callbackJSON,
) downloadManager {
return downloadManager{
conn: conn,
maxMessageSize: paramMaxMessageSize,
maxRuntime: paramMaxRuntime,
measureInterval: paramMeasureInterval,
onJSON: onJSON,
onPerformance: onPerformance,
}
}

func (mgr downloadManager) run(ctx context.Context) error {
var total int64
start := time.Now()
if err := mgr.conn.SetReadDeadline(start.Add(mgr.maxRuntime)); err != nil {
return err
}
mgr.conn.SetReadLimit(mgr.maxMessageSize)
ticker := time.NewTicker(mgr.measureInterval)
defer ticker.Stop()
for ctx.Err() == nil {
kind, reader, err := mgr.conn.NextReader()
if err != nil {
return err
}
if kind == websocket.TextMessage {
data, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
if err := mgr.onJSON(data); err != nil {
return err
}
}
n, err := io.Copy(ioutil.Discard, reader)
if err != nil {
return err
}
total += int64(n)
select {
case now := <-ticker.C:
mgr.onPerformance(now.Sub(start), total)
default:
// NOTHING
}
}
return nil
}
145 changes: 145 additions & 0 deletions experiment/ndt7/download_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package ndt7

import (
"context"
"encoding/json"
"errors"
"io"
"strings"
"testing"
"time"

"github.com/gorilla/websocket"
)

func TestUnitDownloadSetReadDeadlineFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
ReadDeadlineErr: expected,
},
defaultCallbackPerformance,
defaultCallbackJSON,
)
err := mgr.run(context.Background())
if !errors.Is(err, expected) {
t.Fatal("not the error we expected")
}
}

func TestUnitDownloadNextReaderFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
NextReaderErr: expected,
},
defaultCallbackPerformance,
defaultCallbackJSON,
)
err := mgr.run(context.Background())
if !errors.Is(err, expected) {
t.Fatal("not the error we expected")
}
}

func TestUnitDownloadTextMessageReadAllFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
NextReaderMsgType: websocket.TextMessage,
NextReaderReader: func() io.Reader {
return &alwaysFailingReader{
Err: expected,
}
},
},
defaultCallbackPerformance,
defaultCallbackJSON,
)
err := mgr.run(context.Background())
if !errors.Is(err, expected) {
t.Fatal("not the error we expected")
}
}

type alwaysFailingReader struct {
Err error
}

func (r *alwaysFailingReader) Read(p []byte) (int, error) {
return 0, r.Err
}

func TestUnitDownloadBinaryMessageReadAllFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
NextReaderMsgType: websocket.BinaryMessage,
NextReaderReader: func() io.Reader {
return &alwaysFailingReader{
Err: expected,
}
},
},
defaultCallbackPerformance,
defaultCallbackJSON,
)
err := mgr.run(context.Background())
if !errors.Is(err, expected) {
t.Fatal("not the error we expected")
}
}

func TestUnitDownloadOnJSONCallbackError(t *testing.T) {
mgr := newDownloadManager(
&mockableConnMock{
NextReaderMsgType: websocket.TextMessage,
NextReaderReader: func() io.Reader {
return &invalidJSONReader{}
},
},
defaultCallbackPerformance,
func(data []byte) error {
var v interface{}
return json.Unmarshal(data, &v)
},
)
err := mgr.run(context.Background())
if err == nil || !strings.HasSuffix(err.Error(), "unexpected end of JSON input") {
t.Fatal("not the error we expected")
}
}

type invalidJSONReader struct{}

func (r *invalidJSONReader) Read(p []byte) (int, error) {
return copy(p, []byte(`{`)), io.EOF
}

func TestUnitDownloadOnJSONLoop(t *testing.T) {
mgr := newDownloadManager(
&mockableConnMock{
NextReaderMsgType: websocket.TextMessage,
NextReaderReader: func() io.Reader {
return &goodJSONReader{}
},
},
defaultCallbackPerformance,
func(data []byte) error {
var v interface{}
return json.Unmarshal(data, &v)
},
)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := mgr.run(ctx)
if err != nil {
t.Fatal(err)
}
}

type goodJSONReader struct{}

func (r *goodJSONReader) Read(p []byte) (int, error) {
return copy(p, []byte(`{}`)), io.EOF
}
16 changes: 16 additions & 0 deletions experiment/ndt7/mockable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ndt7

import (
"io"
"time"

"github.com/gorilla/websocket"
)

type mockableConn interface {
NextReader() (int, io.Reader, error)
SetReadDeadline(time.Time) error
SetReadLimit(int64)
SetWriteDeadline(time.Time) error
WritePreparedMessage(*websocket.PreparedMessage) error
}
39 changes: 39 additions & 0 deletions experiment/ndt7/mockable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ndt7

import (
"io"
"time"

"github.com/gorilla/websocket"
)

type mockableConnMock struct {
NextReaderMsgType int
NextReaderErr error
NextReaderReader func() io.Reader
ReadDeadlineErr error
WriteDeadlineErr error
WritePreparedMessageErr error
}

func (c *mockableConnMock) NextReader() (int, io.Reader, error) {
var reader io.Reader
if c.NextReaderReader != nil {
reader = c.NextReaderReader()
}
return c.NextReaderMsgType, reader, c.NextReaderErr
}

func (c *mockableConnMock) SetReadDeadline(time.Time) error {
return c.ReadDeadlineErr
}

func (c *mockableConnMock) SetReadLimit(int64) {}

func (c *mockableConnMock) SetWriteDeadline(time.Time) error {
return c.WriteDeadlineErr
}

func (c *mockableConnMock) WritePreparedMessage(*websocket.PreparedMessage) error {
return c.WritePreparedMessageErr
}
Loading