diff --git a/.gitignore b/.gitignore index bb4a27e..f0a52e7 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /ndt7-client +/ndt7-prometheus-exporter diff --git a/.travis.yml b/.travis.yml index 58a3bbd..6995e99 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,5 +7,7 @@ before_script: - go install github.com/mattn/goveralls@latest script: -- go test -v -coverprofile=coverage.cov -coverpkg=./... ./... -- $GOPATH/bin/goveralls -coverprofile=coverage.cov -service=travis-ci +- go test -v -coverprofile=coverage.cov -coverpkg=./,./internal/...,./spec/... ./... +- go test -v -coverprofile=ndt7-client.cov ./cmd/ndt7-client/... +- go test -v -coverprofile=ndt7-prometheus-exporter.cov ./cmd/ndt7-prometheus-exporter/... +- $GOPATH/bin/goveralls -coverprofile=coverage.cov,ndt7-client.cov,ndt7-prometheus-exporter.cov -service=travis-ci diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6e124fc --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.18-alpine as ndt7-prometheus-exporter-build +WORKDIR /go/src/github.com/m-lab/ndt7-client +ADD . ./ +RUN go get ./cmd/ndt7-client/ndt7-prometheus-exporter +RUN go build ./cmd/ndt7-client/ndt7-prometheus-exporter + +FROM alpine:3.16 +WORKDIR /app +COPY --from=ndt7-prometheus-exporter-build /go/src/github.com/m-lab/ndt7-client/ndt7-prometheus-exporter ./ +EXPOSE 8080 +ENTRYPOINT ["./ndt7-prometheus-exporter", "--port=8080"] diff --git a/README.md b/README.md index 38eebbe..317000d 100644 --- a/README.md +++ b/README.md @@ -61,3 +61,19 @@ CLIENTNAME=my-custom-client-name go build -ldflags "-X main.ClientName=$CLIENTNAME" ./cmd/ndt7-client ``` + +### Building and running using Docker + +To build + +```bash +git clone https://github.com/m-lab/ndt7-client-go +docker build -t ndt7-prometheus-exporter . +``` + +To run tests repeatedly + +```bash +PORT=9191 +docker run -d -p ${PORT}:8080 ndt7-prometheus-exporter +``` diff --git a/cmd/ndt7-prometheus-exporter/main.go b/cmd/ndt7-prometheus-exporter/main.go new file mode 100644 index 0000000..3e04c7d --- /dev/null +++ b/cmd/ndt7-prometheus-exporter/main.go @@ -0,0 +1,266 @@ +// ndt7-promeheus-exporter is an ndt7 non-interactive prometheus exporting client +// +// Usage: +// +// ndt7-prometheus-exporter +// +// The default behavior is for ndt7-client to discover a suitable server using +// Measurement Lab's locate service. This behavior may be overridden using +// either the `-server` or `-service-url` flags. +// +// The `-server ` flag specifies the server `name` for performing +// the ndt7 test. This option overrides `-service-url`. +// +// The `-service-url ` flag specifies a complete URL that specifies the +// scheme (e.g. "ws"), server name and port, protocol (e.g. /ndt/v7/download), +// and HTTP parameters. By default, upload and download measurements are run +// automatically. The `-service-url` specifies only one measurement direction. +// +// The `-no-verify` flag allows to skip TLS certificate verification. +// +// The `-scheme ` flag allows to override the default scheme, i.e., +// "wss", with another scheme. The only other supported scheme is "ws" +// and causes ndt7 to run unencrypted. +// +// The `-timeout ` flag specifies the time after which the +// whole test is interrupted. The `` is a string suitable to +// be passed to time.ParseDuration, e.g., "15s". The default is a large +// enough value that should be suitable for common conditions. +// +// The `-port` flag starts an HTTP server to export summary results in a form +// that can be consumed by Prometheus (http://prometheus.io). +// +// The `-profile` flag defines the file where to write a CPU profile +// that later you can pass to `go tool pprof`. See https://blog.golang.org/pprof. +// +// Additionally, passing any unrecognized flag, such as `-help`, will +// cause ndt7-client to print a brief help message. +package main + +import ( + "context" + "crypto/tls" + "flag" + "fmt" + "log" + "net/http" + "os" + "runtime/pprof" + "strings" + "time" + + "github.com/m-lab/go/flagx" + "github.com/m-lab/go/memoryless" + "github.com/m-lab/ndt7-client-go" + "github.com/m-lab/ndt7-client-go/internal/emitter" + "github.com/m-lab/ndt7-client-go/internal/params" + "github.com/m-lab/ndt7-client-go/internal/runner" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sys/cpu" +) + +const ( + defaultTimeout = 55 * time.Second +) + +var ( + ClientName = "ndt7-client-go-cmd" + ClientVersion = "0.7.0" + flagProfile = flag.String("profile", "", + "file where to store pprof profile (see https://blog.golang.org/pprof)") + + flagScheme = flagx.Enum{ + Options: []string{"wss", "ws"}, + Value: defaultSchemeForArch(), + } + + flagNoVerify = flag.Bool("no-verify", false, "skip TLS certificate verification") + flagServer = flag.String("server", "", "optional ndt7 server hostname") + flagTimeout = flag.Duration( + "timeout", defaultTimeout, "time after which the test is aborted") + flagQuiet = flag.Bool("quiet", false, "emit summary and errors only") + flagService = flagx.URL{} + flagUpload = flag.Bool("upload", true, "perform upload measurement") + flagDownload = flag.Bool("download", true, "perform download measurement") + + // The flag values below implement rate limiting at the recommended rate + flagPeriodMean = flag.Duration("period_mean", 6 * time.Hour, "mean period, e.g. 6h, between speed tests, when running in daemon mode") + flagPeriodMin = flag.Duration("period_min", 36 * time.Minute, "minimum period, e.g. 36m, between speed tests, when running in daemon mode") + flagPeriodMax = flag.Duration("period_max", 15 * time.Hour, "maximum period, e.g. 15h, between speed tests, when running in daemon mode") + + flagPort = flag.Int("port", 0, "if non-zero, start an HTTP server on this port to export prometheus metrics") +) + +func init() { + flag.Var( + &flagScheme, + "scheme", + `WebSocket scheme to use: either "wss" or "ws"`, + ) + flag.Var( + &flagService, + "service-url", + "Service URL specifies target hostname and other URL fields like access token. Overrides -server.", + ) +} + +// defaultSchemeForArch returns the default WebSocket scheme to use, depending +// on the architecture we are running on. A CPU without native AES instructions +// will perform poorly if TLS is enabled. +func defaultSchemeForArch() string { + if cpu.ARM64.HasAES || cpu.ARM.HasAES || cpu.X86.HasAES { + return "wss" + } + return "ws" +} + +var osExit = os.Exit + +func main() { + flag.Parse() + + if *flagProfile != "" { + log.Printf("warning: using -profile will reduce the performance") + fp, err := os.Create(*flagProfile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(fp) + defer pprof.StopCPUProfile() + } + + // If a service URL is given, then only one direction is possible. + if flagService.URL != nil && strings.Contains(flagService.URL.Path, params.DownloadURLPath) { + *flagUpload = false + *flagDownload = true + } else if flagService.URL != nil && strings.Contains(flagService.URL.Path, params.UploadURLPath) { + *flagUpload = true + *flagDownload = false + } else if flagService.URL != nil { + fmt.Println("WARNING: ignoring unsupported service url") + flagService.URL = nil + } + + e := emitter.NewQuiet(emitter.NewHumanReadable()) + + if *flagPort > 0 { + downloadGauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "ndt7", + Name: "download_rate_bps", + Help: "m-lab ndt7 download speed in bits/s", + }) + prometheus.MustRegister(downloadGauge) + uploadGauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "ndt7", + Name: "upload_rate_bps", + Help: "m-lab ndt7 upload speed in bits/s", + }) + prometheus.MustRegister(uploadGauge) + rttGauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "ndt7", + Name: "rtt_seconds", + Help: "m-lab ndt7 round-trip time in seconds", + }) + prometheus.MustRegister(rttGauge) + + // The result gauge captures the result of the last test attemp. + // + // Since its value is a timestamp, the following PromQL expression will + // give the most recent result for each upload and download test. + // + // time() - topk(1, ndt7_result_timestamp_seconds) without (result) + lastResultGauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ndt7", + Name: "result_timestamp_seconds", + Help: "m-lab ndt7 test completion time in seconds since 1970-01-01", + }, + []string{ + // which test completed + "test", + // test result + "result", + }) + prometheus.MustRegister(lastResultGauge) + + // The success gauge captures both the client IP and the server FQDN. + // + // Since its value is a timestamp, we can use it to determine the last + // client-server pair that successfully ran the tests. With some PromQL + // trickery, it is possible to join the client-server labels with test + // results. + // + // For example: + // + // - last download test result with client-server labels + // + // ndt7_download_rate_bps + on () group_left(client_ip, server) + // 0 * topk(1, ndt7_last_success_timestamp_seconds) without (client_ip, server) + // + // - last upload test result with client-server labels + // + // ndt7_upload_rate_bps + on () group_left(client_ip, server) + // 0 * topk(1, ndt7_last_success_timestamp_seconds) without (client_ip, server) + // + // - last rtt test result with client-server labels + // + // ndt7_rtt_seconds + on () group_left(client_ip, server) + // 0 * topk(1, ndt7_last_success_timestamp_seconds) without (client_ip, server) + // + lastSuccessGauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ndt7", + Name: "last_success_timestamp_seconds", + Help: "last successful m-lab ndt7 test completion time in seconds since 1970-01-01", + }, + []string{ + // client IP and remote server + "client_ip", + "server", + }) + prometheus.MustRegister(lastSuccessGauge) + e = emitter.NewPrometheus(e, downloadGauge, uploadGauge, rttGauge, lastResultGauge, lastSuccessGauge) + http.Handle("/metrics", promhttp.Handler()) + go func() { + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *flagPort), nil)) + }() + } + + ticker, err := memoryless.NewTicker( + context.Background(), + memoryless.Config{ + Expected: *flagPeriodMean, + Min: *flagPeriodMin, + Max: *flagPeriodMax, + }) + if err != nil { + log.Fatalf("Failed to create memoryless.Ticker: %v", err) + } + defer ticker.Stop() + + r := runner.NewRunner( + runner.RunnerOptions{ + Download: *flagDownload, + Upload: *flagUpload, + Timeout: *flagTimeout, + ClientFactory: func() *ndt7.Client { + c := ndt7.NewClient(ClientName, ClientVersion) + c.ServiceURL = flagService.URL + c.Server = *flagServer + c.Scheme = flagScheme.Value + c.Dialer.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: *flagNoVerify, + } + + return c + }, + }, + e, + ticker) + + osExit(r.RunTestsInLoop()) +} diff --git a/internal/emitter/prometheus.go b/internal/emitter/prometheus.go new file mode 100644 index 0000000..e886e31 --- /dev/null +++ b/internal/emitter/prometheus.go @@ -0,0 +1,84 @@ +package emitter + +import ( + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/m-lab/ndt7-client-go/spec" +) + +// Prometheus tees summary metrics as prometheus metrics. +// The message is actually emitted by the embedded Emitter. +type Prometheus struct { + emitter Emitter + // Last download, upload speed in bits/s + download, upload prometheus.Gauge + // Last RTT in seconds + rtt prometheus.Gauge + // Last results + // Value: time in seconds since unix epoch + // labels: test, result + lastResult *prometheus.GaugeVec + // Last successful test + // Value: time in seconds since unix epoch + // labels: client_ip, server + lastSuccess *prometheus.GaugeVec +} + +// NewPrometheus returns a Summary emitter which emits messages +// via the passed Emitter. +func NewPrometheus(e Emitter, download, upload, rtt prometheus.Gauge, lastResult, lastSuccess *prometheus.GaugeVec) Emitter { + return &Prometheus{e, download, upload, rtt, lastResult, lastSuccess} +} + +// OnStarting emits the starting event +func (p Prometheus) OnStarting(test spec.TestKind) error { + return p.emitter.OnStarting(test) +} + +// OnError emits the error event +func (p Prometheus) OnError(test spec.TestKind, err error) error { + g := p.lastResult.WithLabelValues(string(test), "ERROR") + g.Set(float64(time.Now().Unix())) + return p.emitter.OnError(test, err) +} + +// OnConnected emits the connected event +func (p Prometheus) OnConnected(test spec.TestKind, fqdn string) error { + return p.emitter.OnConnected(test, fqdn) +} + +// OnDownloadEvent handles an event emitted during the download +func (p Prometheus) OnDownloadEvent(m *spec.Measurement) error { + return p.emitter.OnDownloadEvent(m) +} + +// OnUploadEvent handles an event emitted during the upload +func (p Prometheus) OnUploadEvent(m *spec.Measurement) error { + return p.emitter.OnUploadEvent(m) +} + +// OnComplete is the event signalling the end of the test +func (p Prometheus) OnComplete(test spec.TestKind) error { + g := p.lastResult.WithLabelValues(string(test), "OK") + g.Set(float64(time.Now().Unix())) + return p.emitter.OnComplete(test) +} + +// OnSummary handles the summary event, emitted after the test is over. +func (p *Prometheus) OnSummary(s *Summary) error { + // Note this assumes download and upload test result units are Mbit/s. + p.download.Set(s.Download.Value * 1000.0 * 1000.0) + p.upload.Set(s.Upload.Value * 1000.0 * 1000.0) + + // Note this assumes RTT units are millisecs + p.rtt.Set(s.MinRTT.Value / 1000.0) + + success := p.lastSuccess.WithLabelValues( + s.ClientIP, + fmt.Sprintf("%s:%s", s.ServerIP, s.ServerPort)) + success.Set(float64(time.Now().Unix())) + + return p.emitter.OnSummary(s) +} diff --git a/internal/emitter/summary.go b/internal/emitter/summary.go index 8f29119..ee4a4e8 100644 --- a/internal/emitter/summary.go +++ b/internal/emitter/summary.go @@ -15,9 +15,15 @@ type Summary struct { // ServerIP is the (v4 or v6) IP address of the server. ServerIP string + // ServerPort is the port of the server. + ServerPort string + // ClientIP is the (v4 or v6) IP address of the client. ClientIP string + // ClientIP is the port of the client. + ClientPort string + // DownloadUUID is the UUID of the download test. // TODO: add UploadUUID after we start processing counterflow messages. DownloadUUID string