Skip to content

Commit

Permalink
Merge e37980c into 595c7d6
Browse files Browse the repository at this point in the history
  • Loading branch information
fhltang committed Aug 19, 2022
2 parents 595c7d6 + e37980c commit 5171eb7
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1 +1,2 @@
/ndt7-client
/ndt7-prometheus-exporter
6 changes: 4 additions & 2 deletions .travis.yml
Expand Up @@ -7,5 +7,7 @@ before_script:
- go install github.com/mattn/goveralls@latest

script:
- go test -v -coverprofile=coverage.cov -coverpkg=./... ./...
- $GOPATH/bin/goveralls -coverprofile=coverage.cov -service=travis-ci
- go test -v -coverprofile=coverage.cov -coverpkg=./,./internal/...,./spec/... ./...
- go test -v -coverprofile=ndt7-client.cov ./cmd/ndt7-client/...
- go test -v -coverprofile=ndt7-prometheus-exporter.cov ./cmd/ndt7-prometheus-exporter/...
- $GOPATH/bin/goveralls -coverprofile=coverage.cov,ndt7-client.cov,ndt7-prometheus-exporter.cov -service=travis-ci
11 changes: 11 additions & 0 deletions Dockerfile
@@ -0,0 +1,11 @@
FROM golang:1.18-alpine as ndt7-prometheus-exporter-build
WORKDIR /go/src/github.com/m-lab/ndt7-client
ADD . ./
RUN go get ./cmd/ndt7-client/ndt7-prometheus-exporter
RUN go build ./cmd/ndt7-client/ndt7-prometheus-exporter

FROM alpine:3.16
WORKDIR /app
COPY --from=ndt7-prometheus-exporter-build /go/src/github.com/m-lab/ndt7-client/ndt7-prometheus-exporter ./
EXPOSE 8080
ENTRYPOINT ["./ndt7-prometheus-exporter", "--port=8080"]
16 changes: 16 additions & 0 deletions README.md
Expand Up @@ -61,3 +61,19 @@ CLIENTNAME=my-custom-client-name

go build -ldflags "-X main.ClientName=$CLIENTNAME" ./cmd/ndt7-client
```

### Building and running using Docker

To build

```bash
git clone https://github.com/m-lab/ndt7-client-go
docker build -t ndt7-prometheus-exporter .
```

To run tests repeatedly

```bash
PORT=9191
docker run -d -p ${PORT}:8080 ndt7-prometheus-exporter
```
266 changes: 266 additions & 0 deletions cmd/ndt7-prometheus-exporter/main.go
@@ -0,0 +1,266 @@
// ndt7-promeheus-exporter is an ndt7 non-interactive prometheus exporting client
//
// Usage:
//
// ndt7-prometheus-exporter
//
// The default behavior is for ndt7-client to discover a suitable server using
// Measurement Lab's locate service. This behavior may be overridden using
// either the `-server` or `-service-url` flags.
//
// The `-server <name>` flag specifies the server `name` for performing
// the ndt7 test. This option overrides `-service-url`.
//
// The `-service-url <url>` flag specifies a complete URL that specifies the
// scheme (e.g. "ws"), server name and port, protocol (e.g. /ndt/v7/download),
// and HTTP parameters. By default, upload and download measurements are run
// automatically. The `-service-url` specifies only one measurement direction.
//
// The `-no-verify` flag allows to skip TLS certificate verification.
//
// The `-scheme <scheme>` flag allows to override the default scheme, i.e.,
// "wss", with another scheme. The only other supported scheme is "ws"
// and causes ndt7 to run unencrypted.
//
// The `-timeout <string>` flag specifies the time after which the
// whole test is interrupted. The `<string>` is a string suitable to
// be passed to time.ParseDuration, e.g., "15s". The default is a large
// enough value that should be suitable for common conditions.
//
// The `-port` flag starts an HTTP server to export summary results in a form
// that can be consumed by Prometheus (http://prometheus.io).
//
// The `-profile` flag defines the file where to write a CPU profile
// that later you can pass to `go tool pprof`. See https://blog.golang.org/pprof.
//
// Additionally, passing any unrecognized flag, such as `-help`, will
// cause ndt7-client to print a brief help message.
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"log"
"net/http"
"os"
"runtime/pprof"
"strings"
"time"

"github.com/m-lab/go/flagx"
"github.com/m-lab/go/memoryless"
"github.com/m-lab/ndt7-client-go"
"github.com/m-lab/ndt7-client-go/internal/emitter"
"github.com/m-lab/ndt7-client-go/internal/params"
"github.com/m-lab/ndt7-client-go/internal/runner"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/cpu"
)

const (
defaultTimeout = 55 * time.Second
)

var (
ClientName = "ndt7-client-go-cmd"
ClientVersion = "0.7.0"
flagProfile = flag.String("profile", "",
"file where to store pprof profile (see https://blog.golang.org/pprof)")

flagScheme = flagx.Enum{
Options: []string{"wss", "ws"},
Value: defaultSchemeForArch(),
}

flagNoVerify = flag.Bool("no-verify", false, "skip TLS certificate verification")
flagServer = flag.String("server", "", "optional ndt7 server hostname")
flagTimeout = flag.Duration(
"timeout", defaultTimeout, "time after which the test is aborted")
flagQuiet = flag.Bool("quiet", false, "emit summary and errors only")
flagService = flagx.URL{}
flagUpload = flag.Bool("upload", true, "perform upload measurement")
flagDownload = flag.Bool("download", true, "perform download measurement")

// The flag values below implement rate limiting at the recommended rate
flagPeriodMean = flag.Duration("period_mean", 6 * time.Hour, "mean period, e.g. 6h, between speed tests, when running in daemon mode")
flagPeriodMin = flag.Duration("period_min", 36 * time.Minute, "minimum period, e.g. 36m, between speed tests, when running in daemon mode")
flagPeriodMax = flag.Duration("period_max", 15 * time.Hour, "maximum period, e.g. 15h, between speed tests, when running in daemon mode")

flagPort = flag.Int("port", 0, "if non-zero, start an HTTP server on this port to export prometheus metrics")
)

func init() {
flag.Var(
&flagScheme,
"scheme",
`WebSocket scheme to use: either "wss" or "ws"`,
)
flag.Var(
&flagService,
"service-url",
"Service URL specifies target hostname and other URL fields like access token. Overrides -server.",
)
}

// defaultSchemeForArch returns the default WebSocket scheme to use, depending
// on the architecture we are running on. A CPU without native AES instructions
// will perform poorly if TLS is enabled.
func defaultSchemeForArch() string {
if cpu.ARM64.HasAES || cpu.ARM.HasAES || cpu.X86.HasAES {
return "wss"
}
return "ws"
}

var osExit = os.Exit

func main() {
flag.Parse()

if *flagProfile != "" {
log.Printf("warning: using -profile will reduce the performance")
fp, err := os.Create(*flagProfile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(fp)
defer pprof.StopCPUProfile()
}

// If a service URL is given, then only one direction is possible.
if flagService.URL != nil && strings.Contains(flagService.URL.Path, params.DownloadURLPath) {
*flagUpload = false
*flagDownload = true
} else if flagService.URL != nil && strings.Contains(flagService.URL.Path, params.UploadURLPath) {
*flagUpload = true
*flagDownload = false
} else if flagService.URL != nil {
fmt.Println("WARNING: ignoring unsupported service url")
flagService.URL = nil
}

e := emitter.NewQuiet(emitter.NewHumanReadable())

if *flagPort > 0 {
downloadGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "ndt7",
Name: "download_rate_bps",
Help: "m-lab ndt7 download speed in bits/s",
})
prometheus.MustRegister(downloadGauge)
uploadGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "ndt7",
Name: "upload_rate_bps",
Help: "m-lab ndt7 upload speed in bits/s",
})
prometheus.MustRegister(uploadGauge)
rttGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "ndt7",
Name: "rtt_seconds",
Help: "m-lab ndt7 round-trip time in seconds",
})
prometheus.MustRegister(rttGauge)

// The result gauge captures the result of the last test attemp.
//
// Since its value is a timestamp, the following PromQL expression will
// give the most recent result for each upload and download test.
//
// time() - topk(1, ndt7_result_timestamp_seconds) without (result)
lastResultGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ndt7",
Name: "result_timestamp_seconds",
Help: "m-lab ndt7 test completion time in seconds since 1970-01-01",
},
[]string{
// which test completed
"test",
// test result
"result",
})
prometheus.MustRegister(lastResultGauge)

// The success gauge captures both the client IP and the server FQDN.
//
// Since its value is a timestamp, we can use it to determine the last
// client-server pair that successfully ran the tests. With some PromQL
// trickery, it is possible to join the client-server labels with test
// results.
//
// For example:
//
// - last download test result with client-server labels
//
// ndt7_download_rate_bps + on () group_left(client_ip, server)
// 0 * topk(1, ndt7_last_success_timestamp_seconds) without (client_ip, server)
//
// - last upload test result with client-server labels
//
// ndt7_upload_rate_bps + on () group_left(client_ip, server)
// 0 * topk(1, ndt7_last_success_timestamp_seconds) without (client_ip, server)
//
// - last rtt test result with client-server labels
//
// ndt7_rtt_seconds + on () group_left(client_ip, server)
// 0 * topk(1, ndt7_last_success_timestamp_seconds) without (client_ip, server)
//
lastSuccessGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ndt7",
Name: "last_success_timestamp_seconds",
Help: "last successful m-lab ndt7 test completion time in seconds since 1970-01-01",
},
[]string{
// client IP and remote server
"client_ip",
"server",
})
prometheus.MustRegister(lastSuccessGauge)
e = emitter.NewPrometheus(e, downloadGauge, uploadGauge, rttGauge, lastResultGauge, lastSuccessGauge)
http.Handle("/metrics", promhttp.Handler())
go func() {
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *flagPort), nil))
}()
}

ticker, err := memoryless.NewTicker(
context.Background(),
memoryless.Config{
Expected: *flagPeriodMean,
Min: *flagPeriodMin,
Max: *flagPeriodMax,
})
if err != nil {
log.Fatalf("Failed to create memoryless.Ticker: %v", err)
}
defer ticker.Stop()

r := runner.NewRunner(
runner.RunnerOptions{
Download: *flagDownload,
Upload: *flagUpload,
Timeout: *flagTimeout,
ClientFactory: func() *ndt7.Client {
c := ndt7.NewClient(ClientName, ClientVersion)
c.ServiceURL = flagService.URL
c.Server = *flagServer
c.Scheme = flagScheme.Value
c.Dialer.TLSClientConfig = &tls.Config{
InsecureSkipVerify: *flagNoVerify,
}

return c
},
},
e,
ticker)

osExit(r.RunTestsInLoop())
}
84 changes: 84 additions & 0 deletions internal/emitter/prometheus.go
@@ -0,0 +1,84 @@
package emitter

import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/m-lab/ndt7-client-go/spec"
)

// Prometheus tees summary metrics as prometheus metrics.
// The message is actually emitted by the embedded Emitter.
type Prometheus struct {
emitter Emitter
// Last download, upload speed in bits/s
download, upload prometheus.Gauge
// Last RTT in seconds
rtt prometheus.Gauge
// Last results
// Value: time in seconds since unix epoch
// labels: test, result
lastResult *prometheus.GaugeVec
// Last successful test
// Value: time in seconds since unix epoch
// labels: client_ip, server
lastSuccess *prometheus.GaugeVec
}

// NewPrometheus returns a Summary emitter which emits messages
// via the passed Emitter.
func NewPrometheus(e Emitter, download, upload, rtt prometheus.Gauge, lastResult, lastSuccess *prometheus.GaugeVec) Emitter {
return &Prometheus{e, download, upload, rtt, lastResult, lastSuccess}
}

// OnStarting emits the starting event
func (p Prometheus) OnStarting(test spec.TestKind) error {
return p.emitter.OnStarting(test)
}

// OnError emits the error event
func (p Prometheus) OnError(test spec.TestKind, err error) error {
g := p.lastResult.WithLabelValues(string(test), "ERROR")
g.Set(float64(time.Now().Unix()))
return p.emitter.OnError(test, err)
}

// OnConnected emits the connected event
func (p Prometheus) OnConnected(test spec.TestKind, fqdn string) error {
return p.emitter.OnConnected(test, fqdn)
}

// OnDownloadEvent handles an event emitted during the download
func (p Prometheus) OnDownloadEvent(m *spec.Measurement) error {
return p.emitter.OnDownloadEvent(m)
}

// OnUploadEvent handles an event emitted during the upload
func (p Prometheus) OnUploadEvent(m *spec.Measurement) error {
return p.emitter.OnUploadEvent(m)
}

// OnComplete is the event signalling the end of the test
func (p Prometheus) OnComplete(test spec.TestKind) error {
g := p.lastResult.WithLabelValues(string(test), "OK")
g.Set(float64(time.Now().Unix()))
return p.emitter.OnComplete(test)
}

// OnSummary handles the summary event, emitted after the test is over.
func (p *Prometheus) OnSummary(s *Summary) error {
// Note this assumes download and upload test result units are Mbit/s.
p.download.Set(s.Download.Value * 1000.0 * 1000.0)
p.upload.Set(s.Upload.Value * 1000.0 * 1000.0)

// Note this assumes RTT units are millisecs
p.rtt.Set(s.MinRTT.Value / 1000.0)

success := p.lastSuccess.WithLabelValues(
s.ClientIP,
fmt.Sprintf("%s:%s", s.ServerIP, s.ServerPort))
success.Set(float64(time.Now().Unix()))

return p.emitter.OnSummary(s)
}

0 comments on commit 5171eb7

Please sign in to comment.