Skip to content

Commit

Permalink
Merge b44b367 into 7f77d49
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone authored Apr 10, 2019
2 parents 7f77d49 + b44b367 commit 26f937e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 52 deletions.
9 changes: 5 additions & 4 deletions ndt-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/metrics"
"github.com/m-lab/ndt-server/ndt7/listener"
"github.com/m-lab/ndt-server/ndt7/download"
"github.com/m-lab/ndt-server/ndt7/handler"
"github.com/m-lab/ndt-server/ndt7/spec"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -163,15 +163,16 @@ func main() {
ndt7Mux := http.NewServeMux()
ndt7Mux.HandleFunc("/", defaultHandler)
ndt7Mux.Handle("/static/", http.StripPrefix("/static", http.FileServer(http.Dir("html"))))
ndt7Handler := &handler.Handler{
DataDir: *dataDir,
}
ndt7Mux.Handle(
spec.DownloadURLPath,
promhttp.InstrumentHandlerInFlight(
metrics.CurrentTests.With(ndt7Label),
promhttp.InstrumentHandlerDuration(
metrics.TestDuration.MustCurryWith(ndt7Label),
&download.Handler{
DataDir: *dataDir,
})))
http.HandlerFunc(ndt7Handler.Download))))
ndt7Server := &http.Server{
Addr: *ndt7Port,
Handler: logging.MakeAccessLogHandler(ndt7Mux),
Expand Down
50 changes: 6 additions & 44 deletions ndt7/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,23 @@ package download

import (
"context"
"net/http"

"github.com/gorilla/websocket"
"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/download/measurer"
"github.com/m-lab/ndt-server/ndt7/download/receiver"
"github.com/m-lab/ndt-server/ndt7/download/sender"
"github.com/m-lab/ndt-server/ndt7/results"
"github.com/m-lab/ndt-server/ndt7/saver"
"github.com/m-lab/ndt-server/ndt7/spec"
)

// Handler handles a download subtest from the server side.
type Handler struct {
Upgrader websocket.Upgrader
DataDir string
}

func warnAndClose(writer http.ResponseWriter, message string) {
logging.Logger.Warn(message)
writer.Header().Set("Connection", "Close")
writer.WriteHeader(http.StatusBadRequest)
}

// Handle handles the download subtest.
func (dl Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
logging.Logger.Debug("download: upgrading to WebSockets")
if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol {
warnAndClose(writer, "download: missing Sec-WebSocket-Protocol in request")
return
}
headers := http.Header{}
headers.Add("Sec-WebSocket-Protocol", spec.SecWebSocketProtocol)
conn, err := dl.Upgrader.Upgrade(writer, request, headers)
if err != nil {
warnAndClose(writer, "download: cannnot UPGRADE to WebSocket")
return
}
// TODO(bassosimone): an error before this point means that the *os.File
// will stay in cache until the cache pruning mechanism is triggered. This
// should be a small amount of seconds. If Golang does not call shutdown(2)
// and close(2), we'll end up keeping sockets that caused an error in the
// code above (e.g. because the handshake was not okay) alive for the time
// in which the corresponding *os.File is kept in cache.
defer warnonerror.Close(conn, "download: ignoring conn.Close result")
logging.Logger.Debug("download: opening results file")
resultfp, err := results.OpenFor(request, conn, dl.DataDir, "download")
if err != nil {
return // error already printed
}
defer warnonerror.Close(resultfp, "download: ignoring resultfp.Close result")
// Do implements the download subtest. The ctx argument is the parent
// context for the subtest. The conn argument is the open WebSocket
// connection. The resultfp argument is the file where to save results. Both
// arguments are owned by the caller of this function.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) {
// Implementation note: use child context so that, if we cannot save the
// results in the loop below, we terminate the goroutines early
wholectx, cancel := context.WithCancel(request.Context())
wholectx, cancel := context.WithCancel(ctx)
defer cancel()
senderch := sender.Start(conn, measurer.Start(wholectx, conn))
receiverch := receiver.Start(wholectx, conn)
Expand Down
77 changes: 77 additions & 0 deletions ndt7/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Package handler implements the WebSocket handler for ndt7.
package handler

import (
"context"
"net/http"

"github.com/gorilla/websocket"
"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/download"
"github.com/m-lab/ndt-server/ndt7/results"
"github.com/m-lab/ndt-server/ndt7/spec"
)

// Handler handles ndt7 subtests.
type Handler struct {
// Upgrader is the WebSocket upgrader.
Upgrader websocket.Upgrader

// DataDir is the directory where results are saved.
DataDir string
}

// warnAndClose emits message as a warning and the sends a Bad Request
// response to the client using writer.
func warnAndClose(writer http.ResponseWriter, message string) {
logging.Logger.Warn(message)
writer.Header().Set("Connection", "Close")
writer.WriteHeader(http.StatusBadRequest)
}

// testerFunc is the function implementing a subtest. The first argument
// is the subtest context. The second argument is the connected websocket. The
// third argument is the open file where to write results. This function does
// not own the second or the third argument.
type testerFunc = func(context.Context, *websocket.Conn, *results.File)

// downloadOrUpload implements both download and upload. The writer argument
// is the HTTP response writer. The request argument is the HTTP request
// that we received. The kind argument must be spec.SubtestDownload or
// spec.SubtestUpload. The tester is a function actually implementing the
// requested ndt7 subtest.
func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Request, kind spec.SubtestKind, tester testerFunc) {
logging.Logger.Debug("downloadOrUpload: upgrading to WebSockets")
if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol {
warnAndClose(
writer, "downloadOrUpload: missing Sec-WebSocket-Protocol in request")
return
}
headers := http.Header{}
headers.Add("Sec-WebSocket-Protocol", spec.SecWebSocketProtocol)
conn, err := h.Upgrader.Upgrade(writer, request, headers)
if err != nil {
warnAndClose(writer, "downloadOrUpload: cannnot UPGRADE to WebSocket")
return
}
// TODO(bassosimone): an error before this point means that the *os.File
// will stay in cache until the cache pruning mechanism is triggered. This
// should be a small amount of seconds. If Golang does not call shutdown(2)
// and close(2), we'll end up keeping sockets that caused an error in the
// code above (e.g. because the handshake was not okay) alive for the time
// in which the corresponding *os.File is kept in cache.
defer warnonerror.Close(conn, "download: ignoring conn.Close result")
logging.Logger.Debug("downloadOrUpload: opening results file")
resultfp, err := results.OpenFor(request, conn, h.DataDir, kind)
if err != nil {
return // error already printed
}
defer warnonerror.Close(resultfp, "download: ignoring resultfp.Close result")
tester(request.Context(), conn, resultfp)
}

// Download handles the download subtest.
func (h Handler) Download(writer http.ResponseWriter, request *http.Request) {
h.downloadOrUpload(writer, request, spec.SubtestDownload, download.Do)
}
11 changes: 7 additions & 4 deletions ndt7/results/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"github.com/m-lab/ndt-server/fdcache"
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/model"
"github.com/m-lab/ndt-server/ndt7/spec"
)

// File is the file where we save measurements.
type File struct {
// Writer is the gzip writer instance
Writer *gzip.Writer

// Fp is the underlying file
Fp *os.File
}
Expand Down Expand Up @@ -55,8 +57,9 @@ func newFile(datadir, what, uuid string) (*File, error) {
// containing the metadata. The conn argument is used to retrieve the local and
// the remote endpoints addresses. The "datadir" argument specifies the
// directory on disk to write the data into and the what argument should
// indicate whether this is a "download" or an "upload" ndt7 measurement.
func OpenFor(request *http.Request, conn *websocket.Conn, datadir, what string) (*File, error) {
// indicate whether this is a spec.SubtestDownload or a spec.SubtestUpload
// ndt7 measurement.
func OpenFor(request *http.Request, conn *websocket.Conn, datadir string, what spec.SubtestKind) (*File, error) {
meta := make(metadata)
netConn := conn.UnderlyingConn()
id, err := fdcache.GetUUID(netConn)
Expand All @@ -65,8 +68,8 @@ func OpenFor(request *http.Request, conn *websocket.Conn, datadir, what string)
return nil, err
}
initMetadata(&meta, conn.LocalAddr().String(), conn.RemoteAddr().String(), id,
request.URL.Query(), what)
resultfp, err := newFile(datadir, what, id)
request.URL.Query(), string(what))
resultfp, err := newFile(datadir, string(what), id)
if err != nil {
logging.Logger.WithError(err).Warn("newFile failed")
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions ndt7/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,14 @@ const DefaultRuntime = 10 * time.Second

// MaxRuntime is the maximum runtime of a subtest
const MaxRuntime = 15 * time.Second

// SubtestKind indicates the subtest kind
type SubtestKind string

const (
// SubtestDownload is a download subtest
SubtestDownload = SubtestKind("download")

// SubtestUpload is a upload subtest
SubtestUpload = SubtestKind("upload")
)

0 comments on commit 26f937e

Please sign in to comment.