Skip to content

Commit

Permalink
Eliminate application-level quantization of throughput measurement.
Browse files Browse the repository at this point in the history
  • Loading branch information
pboothe committed Sep 3, 2019
1 parent 1a6d6ce commit ee00c22
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 77 deletions.
73 changes: 28 additions & 45 deletions ndt5/c2s/c2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"time"

"github.com/m-lab/ndt-server/ndt5/web100"

"github.com/m-lab/ndt-server/ndt5/metrics"

"github.com/m-lab/go/warnonerror"
Expand Down Expand Up @@ -94,7 +96,7 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv
}

record.StartTime = time.Now()
byteCount, err := drainForeverButMeasureFor(testConn, 10*time.Second)
byteCount, err := drainForeverButMeasureFor(ctx, testConn, 10*time.Second)
record.EndTime = time.Now()
seconds := record.EndTime.Sub(record.StartTime).Seconds()
log.Println("Ended C2S test on", testConn)
Expand Down Expand Up @@ -139,58 +141,39 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv
// measuring the connection for the first part of the drain. This method does
// not close the passed-in Connection, and starts a goroutine which runs until
// that Connection is closed.
func drainForeverButMeasureFor(conn protocol.Connection, d time.Duration) (int64, error) {
type measurement struct {
totalByteCount int64
err error
}
measurements := make(chan measurement)
func drainForeverButMeasureFor(ctx context.Context, conn protocol.MeasuredConnection, d time.Duration) (int64, error) {
derivedCtx, derivedCancel := context.WithTimeout(ctx, d)
defer derivedCancel()

conn.StartMeasuring(derivedCtx)

errs := make(chan error, 1)
// This is the "drain forever" part of this function. Read the passed-in
// connection until the passed-in connection is closed. Only send measurements
// on the measurement channel if the channel can be written to without
// blocking.
// connection until the passed-in connection is closed.
go func() {
var totalByteCount int64
var err error
var connErr error
// Read the connections until the connection is closed. Reading on a closed
// connection returns an error, which terminates the loop and the goroutine.
for err == nil {
var byteCount int64
byteCount, err = conn.ReadBytes()
totalByteCount += byteCount
// Only write to the channel if it won't block, to ensure the reading process
// goes as fast as possible.
select {
case measurements <- measurement{totalByteCount, err}:
default:
}
for connErr == nil {
_, connErr = conn.ReadBytes()
}
// After we get an error, drain the channel and then close it.
fullChannel := true
for fullChannel {
select {
case <-measurements:
default:
fullChannel = false
}
}
close(measurements)
errs <- connErr
}()

// Read the measurements channel until the timer goes off.
timer := time.NewTimer(d)
var bytesRead int64
var socketStats *web100.Metrics
var err error
timerActive := true
for timerActive {
select {
case m := <-measurements:
bytesRead = m.totalByteCount
err = m.err
case <-timer.C:
timerActive = false
}
select {
case <-derivedCtx.Done(): // Wait for timeout
log.Println("Timed out")
socketStats, err = conn.StopMeasuring()
case err = <-errs: // Error in c2s transfer
log.Println("C2S error:", err)
socketStats, _ = conn.StopMeasuring()
}
if socketStats == nil {
return 0, err
}
return bytesRead, err
// The TCPInfo element of socketstats is a value not a pointer, so this is safe
// if socketStats is not nil.
return socketStats.TCPInfo.BytesReceived, err
}
25 changes: 15 additions & 10 deletions ndt5/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ type MeasuredConnection interface {
Measurable
}

// The measurer struct is a hack to ensure that we only have to write the
// complicated measurement code at most once.
// measurer allows all types of connections to embed this struct and be measured
// in the same way. I also means that we have to write the complicated
// measurement code at most once.
type measurer struct {
measurements chan *web100.Metrics
cancelMeasurementContext context.CancelFunc
Expand All @@ -136,22 +137,26 @@ type measurer struct {
// StartMeasuring starts a polling measurement goroutine that runs until the ctx
// expires. After measurement is complete, the given `fd` is closed.
func (m *measurer) StartMeasuring(ctx context.Context, fd *os.File) {
m.measurements = make(chan *web100.Metrics)
m.measurements = make(chan *web100.Metrics, 1)
var newctx context.Context
newctx, m.cancelMeasurementContext = context.WithCancel(ctx)
go func() {
defer fd.Close()
web100.MeasureViaPolling(newctx, fd, m.measurements)
}()
go web100.MeasureViaPolling(newctx, fd, m.measurements)
}

// StopMeasuring stops the measurement process. The measurement process can also
// be stopped by cancelling the context that was passed in to StartMeasuring().
func (m *measurer) StopMeasuring() (*web100.Metrics, error) {
m.cancelMeasurementContext()
info, ok := <-m.measurements
if !ok {
select {
case info := <-m.measurements:
if info == nil {
return nil, errors.New("No data returned from web100.MeasureViaPolling")
}
return info, nil
case <-time.After(time.Second):
log.Println("No data received from the measurement goroutine")
return nil, errors.New("No data")
}
return info, nil
}

// wsConnection wraps a websocket connection to allow it to be used as a
Expand Down
38 changes: 16 additions & 22 deletions ndt5/web100/web100_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,39 @@ func summarize(snaps []*tcp.LinuxTCPInfo) (*Metrics, error) {
}
}
info := &Metrics{
TCPInfo: *snaps[len(snaps)-1], // Save the last snapshot into the metric struct.
TCPInfo: *snaps[len(snaps)-1], // Save the last snapshot of TCPInfo data into the metric struct.
MinRTT: minrtt / 1000, // Convert microseconds to milliseconds for legacy compatibility.
}
log.Println("Summarized data:", info)
return info, nil
}

// MeasureViaPolling collects all required data by polling. It is required for
// non-BBR connections because MinRTT is one of our critical metrics.
// MeasureViaPolling collects all required data by polling. This function, when
// it exits, will always close the passed-in channel. It may or may not send
// socket information along the channel before it closes, depending on whether
// or not an error occurred. If you want to avoid blocking and potential
// goroutine leaks (and you almost certainly do), then the passed-in channel
// should have a capacity of at least 1.
func MeasureViaPolling(ctx context.Context, fp *os.File, c chan *Metrics) {
log.Println("Measuring via polling")
defer close(c)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
snaps := make([]*tcp.LinuxTCPInfo, 0, 100)
snaps := make([]*tcp.LinuxTCPInfo, 0, 200) // Enough space for 20 seconds of data.
// Poll until the context is canceled.
for {
for ctx.Err() == nil {
// Get the tcp_cc metrics
info, err := tcpinfox.GetTCPInfo(fp)
if err == nil {
snaps = append(snaps, info)
} else {
log.Println("Getsockopt error:", err)
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
info, err := summarize(snaps)
if err == nil {
c <- info
}
return
}
// Wait for the next time interval
<-ticker.C
}
info, err := summarize(snaps)
if err == nil {
c <- info
}
}

// TODO: Implement BBR support for ndt5 clients.
/*
func MeasureBBR(ctx context.Context, fp *os.File) (Metrics, error) {
return Metrics{}, errors.New("MeasureBBR is unimplemented")
}
*/

0 comments on commit ee00c22

Please sign in to comment.