Skip to content

Commit

Permalink
Merge 1e99314 into 276f3b1
Browse files Browse the repository at this point in the history
  • Loading branch information
pboothe committed May 7, 2019
2 parents 276f3b1 + 1e99314 commit 5ae95b9
Show file tree
Hide file tree
Showing 15 changed files with 703 additions and 617 deletions.
5 changes: 2 additions & 3 deletions TestDockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ WORKDIR /ndt/src
RUN make web100clt

# Build the image in which the server will be tested.
FROM golang:1.11 as build
FROM golang:1.12 as build
COPY --from=ndtrawjson /ndt/src/web100clt /bin/web100clt-with-json-support
COPY --from=ndtrawnojson /ndt/src/web100clt /bin/web100clt-without-json-support
RUN curl -sL https://deb.nodesource.com/setup_10.x | bash -
RUN apt-get update && apt-get install -y nodejs libjansson4 libssl1.1 libssl1.0
ENV GOPATH=/go
RUN go get github.com/mattn/goveralls
ADD . /go/src/github.com/m-lab/ndt-server
RUN go get -v gopkg.in/m-lab/pipe.v3
RUN go get github.com/m-lab/ndt-server/cmd/ndt-client
RUN go get github.com/mattn/goveralls
WORKDIR /go/src/github.com/m-lab/ndt-server/testdata
RUN npm install .
WORKDIR /go/src/github.com/m-lab/ndt-server
Expand Down
15 changes: 10 additions & 5 deletions legacy/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# legacy ndt-server code

All code in this directory tree is related to the support of the legacy NDT
protocol. We have many extant clients that use this protocol, and we don't want
to leave them high and dry, but new clients are encouraged to use the services
provided by the ndt7 subtree. The test is streamlined, the client is easier to
protocol. We have many extant clients that use this protocol, and we don't
want to leave them high and dry, but new clients are encouraged to use the
services provided by ndt7. The test is streamlined, the client is easier to
write, and basically everything about it is better.

In this tree, we support existing clients, but we will be adding no new
functionality.
In this subtree, we support existing clients, but we will be adding no new
functionality. If you are reading this and trying to decide how to implement
a speed test, use ndt7 and not the legacy protocol. The legacy protocol is
deprecated. It will be supported until usage drops to very low levels, but it
is also not recommended for new integrations or code.
172 changes: 60 additions & 112 deletions legacy/c2s/c2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,139 +2,87 @@ package c2s

import (
"context"
"fmt"
"log"
"net/http"
"strconv"
"time"

"github.com/m-lab/ndt-server/legacy/metrics"
"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/legacy/protocol"
"github.com/m-lab/ndt-server/legacy/testresponder"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/m-lab/ndt-server/legacy/singleserving"
)

const (
ready = float64(-1)
)

// Responder responds to c2s tests.
type Responder struct {
testresponder.TestResponder
Response chan float64
}
// ManageTest manages the c2s test lifecycle.
func ManageTest(ctx context.Context, conn protocol.Connection, f singleserving.Factory) (float64, error) {
localContext, localCancel := context.WithTimeout(ctx, 30*time.Second)
defer localCancel()

// TestServer performs the NDT c2s test.
func (tr *Responder) TestServer(w http.ResponseWriter, r *http.Request) {
upgrader := testresponder.MakeNdtUpgrader([]string{"c2s"})
wsc, err := upgrader.Upgrade(w, r, nil)
srv, err := f.SingleServingServer("c2s")
if err != nil {
// Upgrade should have already returned an HTTP error code.
log.Println("ERROR C2S: upgrader", err)
return
log.Println("Could not start SingleServingServer", err)
return 0, err
}
ws := protocol.AdaptWsConn(wsc)
tr.performTest(ws)
}

func (tr *Responder) performTest(ws protocol.MeasuredConnection) {
tr.Response <- ready
bytesPerSecond := tr.recvC2SUntil(ws)
tr.Response <- bytesPerSecond
go func() {
// After the test is supposedly over, let the socket drain a bit to not
// confuse poorly-written clients by closing unexpectedly when there is still
// buffered data. We make the judgement call that if the clients are so poorly
// written that they still have data buffered after 5 seconds and are confused
// when the c2s socket closes when buffered data is still in flight, then it
// is okay to break them.
ws.DrainUntil(time.Now().Add(5 * time.Second))
ws.Close()
}()
}

func (tr *Responder) recvC2SUntil(ws protocol.Connection) float64 {
done := make(chan float64)

go func() {
startTime := time.Now()
endTime := startTime.Add(10 * time.Second)
totalBytes, err := ws.DrainUntil(endTime)
if err != nil {
tr.Close()
return
}
bytesPerSecond := float64(totalBytes) / float64(time.Since(startTime)/time.Second)
done <- bytesPerSecond
}()

log.Println("C2S: Waiting for test to complete or timeout")
select {
case <-tr.Ctx.Done():
log.Println("C2S: Context Done!", tr.Ctx.Err())
ws.Close()
// Return zero on error.
return 0
case bytesPerSecond := <-done:
return bytesPerSecond
err = protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(srv.Port()), conn)
if err != nil {
log.Println("Could not send TestPrepare", err)
return 0, err
}
}

// ManageTest manages the c2s test lifecycle.
func ManageTest(ws protocol.Connection, config *testresponder.Config) (float64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Create a testResponder instance.
testResponder := &Responder{
Response: make(chan float64),
testConn, err := srv.ServeOnce(localContext)
if err != nil {
log.Println("Could not successfully ServeOnce", err)
return 0, err
}
testResponder.Config = config
defer warnonerror.Close(testConn, "Could not close test connection")

// Create a TLS server for running the C2S test.
serveMux := http.NewServeMux()
serveMux.HandleFunc("/ndt_protocol",
promhttp.InstrumentHandlerCounter(
metrics.TestCount.MustCurryWith(prometheus.Labels{"direction": "c2s"}),
http.HandlerFunc(testResponder.TestServer)))
err := testResponder.StartAsync(ctx, serveMux, testResponder.performTest, "C2S")
err = protocol.SendJSONMessage(protocol.TestStart, "", conn)
if err != nil {
log.Println("Could not send TestStart", err)
return 0, err
}
defer testResponder.Close()

done := make(chan float64)
go func() {
// Wait for test to run.
// Send the server port to the client.
protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(testResponder.Port), ws)
c2sReady := <-testResponder.Response
if c2sReady != ready {
log.Println("ERROR C2S: Bad value received on the c2s channel", c2sReady)
cancel()
return
seconds := float64(10)
startTime := time.Now()
endTime := startTime.Add(10 * time.Second)
errorTime := endTime.Add(5 * time.Second)
err = testConn.SetReadDeadline(errorTime)
if err != nil {
log.Println("Could not set deadline", err)
return 0, err
}
byteCount, err := testConn.DrainUntil(endTime)
if err != nil {
if byteCount == 0 {
log.Println("Could not drain the test connection", byteCount, err)
return 0, err
}
// Tell the client to start the test.
protocol.SendJSONMessage(protocol.TestStart, "", ws)

// Wait for results to be generated.
c2sBytesPerSecond := <-testResponder.Response
c2sKbps := 8 * c2sBytesPerSecond / 1000.0
// It is possible for the client to reach 10 seconds slightly before the server does.
seconds = time.Now().Sub(startTime).Seconds()
if seconds < 9 {
log.Printf("C2S test only uploaded for %f seconds\n", seconds)
return 0, err
} else if seconds > 11 {
log.Printf("C2S test uploaded-read-loop exited late (%f seconds) because the read stalled. We will continue with the test.\n", seconds)
} else {
log.Printf("C2S test had an error after %f seconds, which is within acceptable bounds. We will continue with the test.\n", seconds)
}
} else {
// Empty out the buffer for poorly-behaved clients.
testConn.DrainUntil(errorTime)
}
throughputValue := 8 * float64(byteCount) / 1000 / 10

// Finish the test.
protocol.SendJSONMessage(protocol.TestMsg, fmt.Sprintf("%.4f", c2sKbps), ws)
protocol.SendJSONMessage(protocol.TestFinalize, "", ws)
log.Println("C2S: server rate:", c2sKbps)
done <- c2sKbps
}()
err = protocol.SendJSONMessage(protocol.TestMsg, strconv.FormatFloat(throughputValue, 'g', -1, 64), conn)
if err != nil {
log.Println("Could not send TestMsg with C2S results", err)
return 0, err
}

select {
case <-ctx.Done():
log.Println("C2S: ctx Done!")
return 0, ctx.Err()
case value := <-done:
log.Println("C2S: finished ", value)
return value, nil
err = protocol.SendJSONMessage(protocol.TestFinalize, "", conn)
if err != nil {
log.Println("Could not send TestFinalize", err)
return throughputValue, err
}

return throughputValue, nil
}
67 changes: 67 additions & 0 deletions legacy/handler/wshandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package handler

import (
"log"
"net/http"

"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/legacy"
"github.com/m-lab/ndt-server/legacy/protocol"
"github.com/m-lab/ndt-server/legacy/singleserving"
"github.com/m-lab/ndt-server/legacy/ws"
)

type httpFactory struct{}

func (hf *httpFactory) SingleServingServer(dir string) (singleserving.Server, error) {
return singleserving.StartWS(dir)
}

// httpHandler handles requests that come in over HTTP or HTTPS. It should be
// created with MakeHTTPHandler() or MakeHTTPSHandler().
type httpHandler struct {
serverFactory singleserving.Factory
}

// ServeHTTP is the command channel for the NDT-WS or NDT-WSS test. All
// subsequent client communication is synchronized with this method. Returning
// closes the websocket connection, so only occurs after all tests complete or
// an unrecoverable error. It is called ServeHTTP to make sure that the Server
// implements the http.Handler interface.
func (s *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
upgrader := ws.Upgrader("ndt")
wsc, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("ERROR SERVER:", err)
return
}
ws := protocol.AdaptWsConn(wsc)
defer warnonerror.Close(ws, "Could not close connection")
legacy.HandleControlChannel(ws, s.serverFactory)
}

// NewWS returns a handler suitable for http-based connections.
func NewWS() http.Handler {
return &httpHandler{
serverFactory: &httpFactory{},
}
}

type httpsFactory struct {
certFile string
keyFile string
}

func (hf *httpsFactory) SingleServingServer(dir string) (singleserving.Server, error) {
return singleserving.StartWSS(dir, hf.certFile, hf.keyFile)
}

// NewWSS returns a handler suitable for https-based connections.
func NewWSS(certFile, keyFile string) http.Handler {
return &httpHandler{
serverFactory: &httpsFactory{
certFile: certFile,
keyFile: keyFile,
},
}
}
Loading

0 comments on commit 5ae95b9

Please sign in to comment.