From 6e08611f03f4908010981735ada197d6ceb3fcfd Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Mon, 2 Mar 2020 09:27:25 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20prometheus=20metrics=20to=20kvsch?= =?UTF-8?q?eduler=20and=20improve=20perf=20te=E2=80=A6=20(#1630)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Put args after downloading deps to prevent cache bust on builds Signed-off-by: Ondrej Fabry * Enable grpc metrics by default Signed-off-by: Ondrej Fabry * Update deps Signed-off-by: Ondrej Fabry * Improve perf tests This commit also adds script for running perf tests using docker. Signed-off-by: Ondrej Fabry * Define prometheus metrics for kvscheduler Signed-off-by: Ondrej Fabry * Dump single interfaces only when length <1000 Signed-off-by: Ondrej Fabry * Fix running >65k interfaces Signed-off-by: Ondrej Fabry * Fix setting reports dir Signed-off-by: Ondrej Fabry * Improve perf test reporting Signed-off-by: Ondrej Fabry * Fix report dir Signed-off-by: Ondrej Fabry * Allow to customize all perf test settings Signed-off-by: Ondrej Fabry --- cmd/vpp-agent/app/vpp_agent.go | 1 + docker/dev/Dockerfile | 8 +- go.mod | 5 +- go.sum | 8 +- plugins/govppmux/metrics.go | 2 +- plugins/kvscheduler/metrics.go | 116 ++++++++++++ plugins/kvscheduler/plugin_scheduler.go | 3 + plugins/kvscheduler/stats.go | 1 + plugins/kvscheduler/txn_process.go | 3 + plugins/kvscheduler/txn_queue.go | 7 +- plugins/orchestrator/dispatcher.go | 5 +- plugins/orchestrator/options.go | 8 +- plugins/vpp/ifplugin/interface_state.go | 6 +- scripts/run_perf_test.sh | 42 +++++ tests/perf/grpc-perf/main.go | 236 ++++++++++++++++++------ tests/perf/grpc-perf/vpp.conf | 26 +++ tests/perf/perf_test.sh | 96 ++++++---- tests/perf/run_all.sh | 4 +- 18 files changed, 464 insertions(+), 113 deletions(-) create mode 100644 plugins/kvscheduler/metrics.go create mode 100755 scripts/run_perf_test.sh create mode 100644 tests/perf/grpc-perf/vpp.conf diff --git a/cmd/vpp-agent/app/vpp_agent.go b/cmd/vpp-agent/app/vpp_agent.go index 7304d84461..9762a7f4d4 100644 --- a/cmd/vpp-agent/app/vpp_agent.go +++ b/cmd/vpp-agent/app/vpp_agent.go @@ -106,6 +106,7 @@ func New() *VPPAgent { } orchestrator.DefaultPlugin.Watcher = watchers orchestrator.DefaultPlugin.StatusPublisher = writers + orchestrator.EnabledGrpcMetrics() ifplugin.DefaultPlugin.Watcher = watchers ifplugin.DefaultPlugin.NotifyStates = ifStatePub diff --git a/docker/dev/Dockerfile b/docker/dev/Dockerfile index fb41e4d887..21d61bd97b 100644 --- a/docker/dev/Dockerfile +++ b/docker/dev/Dockerfile @@ -89,15 +89,15 @@ COPY ./docker/dev/init_hook.sh /usr/bin/ # Install agent WORKDIR /src/ligato/vpp-agent +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ + ARG VERSION ARG COMMIT ARG BRANCH ARG BUILD_DATE -COPY go.mod go.sum ./ -RUN go mod download -COPY . ./ - RUN make install purge ENV SUPERVISOR_CONFIG=/opt/vpp-agent/dev/supervisor.conf diff --git a/go.mod b/go.mod index 87699a7303..466785d4e6 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/golang/protobuf v1.3.3 github.com/gorilla/mux v1.6.2 github.com/gorilla/websocket v1.4.1 // indirect + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.11.3 // indirect github.com/hashicorp/go-uuid v1.0.1 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -49,12 +50,12 @@ require ( github.com/vishvananda/netlink v0.0.0-20180910184128-56b1bd27a9a3 github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect - go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200115100339-59da3fdeb0c7 + go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200224101443-4cf6c674e05d go.uber.org/multierr v1.2.0 // indirect golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 // indirect golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 // indirect - google.golang.org/grpc v1.27.0 + google.golang.org/grpc v1.27.1 gotest.tools v2.2.0+incompatible // indirect ) diff --git a/go.sum b/go.sum index df7a8bcad5..84ab0f8819 100644 --- a/go.sum +++ b/go.sum @@ -359,8 +359,8 @@ github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBU go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= -go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200115100339-59da3fdeb0c7 h1:B8GC+I0QFHcyTfpdWDszXcdNzqBsW6koR+30bySHAWY= -go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200115100339-59da3fdeb0c7/go.mod h1:lrof6xgwrmjAtpkfKcrUr1MqaOmuPEXORWviyXyZA2w= +go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200224101443-4cf6c674e05d h1:GLbxg/WhwjXiuw8KN5hZfYIW6VnkAjN9ZlWxi8uFje4= +go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200224101443-4cf6c674e05d/go.mod h1:mYLtG2Bq3C/SOUUafEe8JOwdqohd4NVYl6Bu/nh/O8Y= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -456,8 +456,8 @@ google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk= +google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/plugins/govppmux/metrics.go b/plugins/govppmux/metrics.go index f8cf4551dc..73816780bf 100644 --- a/plugins/govppmux/metrics.go +++ b/plugins/govppmux/metrics.go @@ -25,7 +25,7 @@ import ( // Set of raw Prometheus metrics. // Labels // * message -// * +// * error // Do not increment directly, use Report* methods. var ( channelsCreated = prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/plugins/kvscheduler/metrics.go b/plugins/kvscheduler/metrics.go new file mode 100644 index 0000000000..4acd854225 --- /dev/null +++ b/plugins/kvscheduler/metrics.go @@ -0,0 +1,116 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kvscheduler + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + + kvs "go.ligato.io/vpp-agent/v3/plugins/kvscheduler/api" +) + +// Set of raw Prometheus metrics. +// Labels +// * txn_type +// * slice +// Do not increment directly, use Report* methods. +var ( + transactionsProcessed = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ligato", + Subsystem: "kvscheduler", + Name: "txn_processed", + Help: "The total number of transactions processed.", + }) + transactionsDropped = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ligato", + Subsystem: "kvscheduler", + Name: "txn_dropped", + Help: "The total number of transactions dropped.", + }) + queueCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "ligato", + Subsystem: "kvscheduler", + Name: "queue_capacity", + Help: "The capacity of the transactions queue.", + }) + queueLength = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "ligato", + Subsystem: "kvscheduler", + Name: "queue_length", + Help: "The number of transactions in the queue.", + }) + queueWaitSeconds = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "ligato", + Subsystem: "kvscheduler", + Name: "queue_wait_seconds", + Help: "Wait time in queue for transactions.", + MaxAge: time.Second * 30, + }, + []string{"txn_type"}, + ) + txnProcessDurationSeconds = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "ligato", + Subsystem: "kvscheduler", + Name: "txn_process_duration_seconds", + Help: "Processing time of transactions.", + MaxAge: time.Second * 30, + }, + []string{"slice"}, + ) + txnDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "ligato", + Subsystem: "kvscheduler", + Name: "txn_duration_seconds", + Help: "Bucketed histogram of processing time of transactions by type.", + }, + []string{"txn_type"}, + ) +) + +func init() { + prometheus.MustRegister(transactionsProcessed) + prometheus.MustRegister(transactionsDropped) + prometheus.MustRegister(queueCapacity) + prometheus.MustRegister(queueLength) + prometheus.MustRegister(queueWaitSeconds) + prometheus.MustRegister(txnProcessDurationSeconds) + prometheus.MustRegister(txnDurationSeconds) +} + +func reportTxnProcessed(typ kvs.TxnType, sec float64) { + transactionsProcessed.Inc() + txnDurationSeconds.WithLabelValues(typ.String()).Observe(sec) +} + +func reportTxnDropped() { + transactionsDropped.Inc() +} + +func reportQueueCap(c int) { + queueCapacity.Set(float64(c)) +} + +func reportQueued(n int) { + queueLength.Add(float64(n)) +} + +func reportQueueWait(typ kvs.TxnType, sec float64) { + queueWaitSeconds.WithLabelValues(typ.String()).Observe(sec) +} + +func reportTxnProcessDuration(slice string, sec float64) { + txnProcessDurationSeconds.WithLabelValues(slice).Observe(sec) +} diff --git a/plugins/kvscheduler/plugin_scheduler.go b/plugins/kvscheduler/plugin_scheduler.go index 3f78061009..6b8ec8571b 100644 --- a/plugins/kvscheduler/plugin_scheduler.go +++ b/plugins/kvscheduler/plugin_scheduler.go @@ -180,6 +180,7 @@ func (s *Scheduler) Init() error { s.registry = registry.NewRegistry() // prepare channel for serializing transactions s.txnQueue = make(chan *transaction, 100) + reportQueueCap(cap(s.txnQueue)) // register REST API handlers s.registerHandlers(s.HTTPHandlers) // initialize key-set used to mark values with updated status @@ -292,6 +293,7 @@ func (s *Scheduler) TransactionBarrier() { func (s *Scheduler) PushSBNotification(notif ...kvs.KVWithMetadata) error { txn := &transaction{ txnType: kvs.SBNotification, + created: time.Now(), } for _, value := range notif { txn.values = append(txn.values, kvForTxn{ @@ -441,6 +443,7 @@ func (txn *SchedulerTxn) Commit(ctx context.Context) (txnSeqNum uint64, err erro txnType: kvs.NBTransaction, nb: &nbTxn{}, values: make([]kvForTxn, 0, len(txn.values)), + created: time.Now(), } // collect values diff --git a/plugins/kvscheduler/stats.go b/plugins/kvscheduler/stats.go index 2a34256612..b24e077ecc 100644 --- a/plugins/kvscheduler/stats.go +++ b/plugins/kvscheduler/stats.go @@ -128,6 +128,7 @@ func trackTransactionMethod(m string) func() { s := stats.TxnStats return func() { took := time.Since(t) + reportTxnProcessDuration(m, took.Seconds()) statsMu.Lock() ms, tracked := s.Methods[m] if !tracked { diff --git a/plugins/kvscheduler/txn_process.go b/plugins/kvscheduler/txn_process.go index 1a87137843..2da5260f5b 100644 --- a/plugins/kvscheduler/txn_process.go +++ b/plugins/kvscheduler/txn_process.go @@ -39,6 +39,7 @@ type transaction struct { values []kvForTxn nb *nbTxn // defined for NB transactions retry *retryTxn // defined for retry of failed operations + created time.Time } // kvForTxn represents a new value for a given key to be applied in a transaction. @@ -92,7 +93,9 @@ func (s *Scheduler) consumeTransactions() { if canceled { return } + reportQueueWait(txn.txnType, time.Since(txn.created).Seconds()) s.processTransaction(txn) + reportTxnProcessed(txn.txnType, time.Since(txn.created).Seconds()) } } diff --git a/plugins/kvscheduler/txn_queue.go b/plugins/kvscheduler/txn_queue.go index c97c3e8cd3..a540c32f5d 100644 --- a/plugins/kvscheduler/txn_queue.go +++ b/plugins/kvscheduler/txn_queue.go @@ -34,6 +34,7 @@ func (s *Scheduler) enqueueTxn(txn *transaction) error { case <-s.ctx.Done(): return kvs.ErrClosedScheduler case s.txnQueue <- txn: + reportQueued(1) return nil } } @@ -41,8 +42,10 @@ func (s *Scheduler) enqueueTxn(txn *transaction) error { case <-s.ctx.Done(): return kvs.ErrClosedScheduler case s.txnQueue <- txn: + reportQueued(1) return nil default: + reportTxnDropped() return kvs.ErrTxnQueueFull } } @@ -53,6 +56,7 @@ func (s *Scheduler) dequeueTxn() (txn *transaction, canceled bool) { case <-s.ctx.Done(): return nil, true case txn = <-s.txnQueue: + reportQueued(-1) //trace.Log(txn.ctx, "txn", "dequeue") return txn, false } @@ -75,12 +79,13 @@ func (s *Scheduler) delayRetry(args *retryTxn) { err := s.enqueueTxn(&transaction{ txnType: kvs.RetryFailedOps, retry: args, + created: time.Now(), }) if err != nil { s.Log.WithFields(logging.Fields{ "txnSeqNum": args.txnSeqNum, "err": err, - }).Warn("Failed to enqueue re-try for failed operations") + }).Warn("Failed to enqueue retry transaction for failed operations") s.enqueueRetry(args) // try again with the same time period } } diff --git a/plugins/orchestrator/dispatcher.go b/plugins/orchestrator/dispatcher.go index afe3db69a4..e0cdd3955e 100644 --- a/plugins/orchestrator/dispatcher.go +++ b/plugins/orchestrator/dispatcher.go @@ -15,16 +15,15 @@ package orchestrator import ( + "context" "fmt" "runtime/trace" "sync" "time" + "github.com/golang/protobuf/proto" "github.com/pkg/errors" "go.ligato.io/cn-infra/v2/logging" - "golang.org/x/net/context" - - "github.com/golang/protobuf/proto" "go.ligato.io/vpp-agent/v3/pkg/models" kvs "go.ligato.io/vpp-agent/v3/plugins/kvscheduler/api" diff --git a/plugins/orchestrator/options.go b/plugins/orchestrator/options.go index 1295461a91..a7bc84170e 100644 --- a/plugins/orchestrator/options.go +++ b/plugins/orchestrator/options.go @@ -15,6 +15,7 @@ package orchestrator import ( + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "go.ligato.io/cn-infra/v2/datasync/kvdbsync/local" "go.ligato.io/cn-infra/v2/rpc/grpc" @@ -45,8 +46,13 @@ func NewPlugin(opts ...Option) *Plugin { // Option is a function that acts on a Plugin to inject Dependencies or configuration type Option func(*Plugin) -func WithReflection(enabled bool) Option { +func UseReflection(enabled bool) Option { return func(p *Plugin) { p.reflection = enabled } } + +func EnabledGrpcMetrics() { + grpc_prometheus.EnableHandlingTimeHistogram() + grpc.UsePromMetrics(grpc_prometheus.DefaultServerMetrics)(&grpc.DefaultPlugin) +} diff --git a/plugins/vpp/ifplugin/interface_state.go b/plugins/vpp/ifplugin/interface_state.go index 419f8c4908..1a53a14e16 100644 --- a/plugins/vpp/ifplugin/interface_state.go +++ b/plugins/vpp/ifplugin/interface_state.go @@ -251,8 +251,10 @@ func (c *InterfaceStateUpdater) doUpdatesIfStateDetails() { var ifIdxs []uint32 c.access.Lock() - for ifIdx := range c.ifsForUpdate { - ifIdxs = append(ifIdxs, ifIdx) + if len(c.ifsForUpdate) < 1000 { + for ifIdx := range c.ifsForUpdate { + ifIdxs = append(ifIdxs, ifIdx) + } } // clear interfaces for update c.ifsForUpdate = make(map[uint32]struct{}) diff --git a/scripts/run_perf_test.sh b/scripts/run_perf_test.sh new file mode 100755 index 0000000000..bd43ca487a --- /dev/null +++ b/scripts/run_perf_test.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +set -euo pipefail + +# usage: ./scripts/run_perf_test.sh + +num_req=${1-10000} + +image=${AGENT_IMG:-ligato/dev-vpp-agent} +reports=${REPORT_DIR:-report} +profiling_mode=${PROF_MODE-} + +reports="$(cd $reports && pwd)" + +runid=${RUN-"${num_req}-req"} +results="${reports}/perf-results-${runid}" + +mkdir -p "$results" + +echo "Starting perf test run: $runid" + +cid=$(docker run -d -it --privileged \ + --label perf-run="$runid" \ + -v "$results":/report \ + -e REPORT_DIR=/report \ + -e ETCD_CONFIG=disabled \ + -e INITIAL_LOGLVL=info \ + -e DEBUG_ENABLED=y \ + -e DEBUG_PROFILE_MODE="$profiling_mode" \ + ${DOCKER_EXTRA_ARGS:-} \ + -- \ + "$image" /bin/bash \ +) + +function on_exit() { + docker stop -t 1 "$cid" + exit +} +trap 'on_exit' EXIT + +docker exec -it "$cid" bash ./tests/perf/perf_test.sh grpc-perf $* + +echo "Test results stored in: $results" diff --git a/tests/perf/grpc-perf/main.go b/tests/perf/grpc-perf/main.go index 8324493994..6210224535 100644 --- a/tests/perf/grpc-perf/main.go +++ b/tests/perf/grpc-perf/main.go @@ -18,20 +18,26 @@ package main import ( "context" "fmt" + "io/ioutil" "log" "net" + "net/http" "sync" "time" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/namsral/flag" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.ligato.io/cn-infra/v2/agent" "go.ligato.io/cn-infra/v2/infra" "go.ligato.io/cn-infra/v2/logging" "google.golang.org/grpc" + "go.ligato.io/vpp-agent/v3/pkg/version" "go.ligato.io/vpp-agent/v3/proto/ligato/configurator" "go.ligato.io/vpp-agent/v3/proto/ligato/vpp" interfaces "go.ligato.io/vpp-agent/v3/proto/ligato/vpp/interfaces" @@ -40,14 +46,41 @@ import ( ) var ( - address = flag.String("address", "127.0.0.1:9111", "address of GRPC server") - socketType = flag.String("socket-type", "tcp", "socket type [tcp, tcp4, tcp6, unix, unixpacket]") - numClients = flag.Int("clients", 1, "number of concurrent grpc clients") - numTunnels = flag.Int("tunnels", 100, "number of tunnels to stress per client") - numPerRequest = flag.Int("numperreq", 10, "number of tunnels/routes per grpc request") - withIPs = flag.Bool("with-ips", false, "configure IP address for each tunnel on memif at the end") - debug = flag.Bool("debug", false, "turn on debug dump") - timeout = flag.Uint("timeout", 300, "timeout for requests (in seconds)") + reg = prometheus.NewRegistry() + grpcMetrics = grpc_prometheus.NewClientMetrics() + perfTestSettings *prometheus.GaugeVec +) + +func init() { + flag.Parse() + + grpcMetrics.EnableClientHandlingTimeHistogram() + reg.MustRegister(grpcMetrics) + perfTestSettings = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ligato", + Subsystem: "perf_test", + Name: "client_settings", + Help: "", + ConstLabels: map[string]string{ + "num_clients": fmt.Sprint(*numClients), + "num_tunnels": fmt.Sprint(*numTunnels), + "num_per_req": fmt.Sprint(*numPerRequest), + }, + }, []string{"start_time"}) + reg.MustRegister(perfTestSettings) +} + +var ( + address = flag.String("address", "127.0.0.1:9111", "address of GRPC server") + socketType = flag.String("socket-type", "tcp", "socket type [tcp, tcp4, tcp6, unix, unixpacket]") + numClients = flag.Int("clients", 1, "number of concurrent grpc clients") + numTunnels = flag.Int("tunnels", 100, "number of tunnels to stress per client") + numPerRequest = flag.Int("numperreq", 1, "number of tunnels/routes per grpc request") + withIPs = flag.Bool("with-ips", false, "configure IP address for each tunnel on memif at the end") + debug = flag.Bool("debug", false, "turn on debug dump") + dumpMetrics = flag.Bool("dumpmetrics", false, "Dump metrics before exit.") + timeout = flag.Uint("timeout", 300, "timeout for requests (in seconds)") + reportProgress = flag.Uint("progress", 20, "percent of progress to report") dialTimeout = time.Second * 3 reqTimeout = time.Second * 300 @@ -58,13 +91,19 @@ func main() { logging.DefaultLogger.SetLevel(logging.DebugLevel) } + perfTestSettings.WithLabelValues(time.Now().Format(time.Stamp)).Set(1) + + go serveMetrics() + quit := make(chan struct{}) ep := NewGRPCStressPlugin() + ver, rev, date := version.Data() a := agent.NewAgent( agent.AllPlugins(ep), agent.QuitOnClose(quit), + agent.Version(ver, date, rev), ) if err := a.Start(); err != nil { @@ -77,6 +116,39 @@ func main() { if err := a.Stop(); err != nil { log.Fatalln(err) } + + if *dumpMetrics { + resp, err := http.Get("http://localhost:9094/metrics") + if err != nil { + log.Fatalln(err) + } + if b, err := ioutil.ReadAll(resp.Body); err != nil { + log.Fatalln(err) + } else { + fmt.Println("----------------------") + fmt.Println("-> CLIENT METRICS") + fmt.Println("----------------------") + fmt.Print(string(b)) + fmt.Println("----------------------") + } + } + + time.Sleep(time.Second * 5) +} + +func serveMetrics() { + h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{}) + + // Create a HTTP server for prometheus. + httpServer := &http.Server{ + Handler: h, + Addr: fmt.Sprintf(":%d", 9094), + } + + // Start your http server for prometheus. + if err := httpServer.ListenAndServe(); err != nil { + log.Println("Unable to start a http server.") + } } // GRPCStressPlugin makes use of the remoteclient to locally CRUD ipsec tunnels and routes. @@ -84,6 +156,7 @@ type GRPCStressPlugin struct { infra.PluginName Log *logrus.Logger + conn *grpc.ClientConn conns []*grpc.ClientConn wg sync.WaitGroup @@ -104,6 +177,9 @@ func (p *GRPCStressPlugin) Init() error { return nil } func (p *GRPCStressPlugin) Close() error { + if p.conn != nil { + return p.conn.Close() + } return nil } @@ -121,15 +197,35 @@ func (p *GRPCStressPlugin) setupInitial() { conn, err := grpc.Dial("unix", grpc.WithInsecure(), grpc.WithDialer(dialer(*socketType, *address, dialTimeout)), + grpc.WithUnaryInterceptor(grpcMetrics.UnaryClientInterceptor()), + grpc.WithStreamInterceptor(grpcMetrics.StreamClientInterceptor()), ) if err != nil { log.Fatal(err) } + p.conn = conn reqTimeout = time.Second * time.Duration(*timeout) client := configurator.NewConfiguratorServiceClient(conn) + p.Log.Infof("Requesting get..") + cfg, err := client.Get(context.Background(), &configurator.GetRequest{}) + if err != nil { + log.Fatalln(err) + } + out, _ := (&jsonpb.Marshaler{Indent: " "}).MarshalToString(cfg) + fmt.Printf("Config:\n %+v\n", out) + + p.Log.Infof("Requesting dump..") + dump, err := client.Dump(context.Background(), &configurator.DumpRequest{}) + if err != nil { + log.Fatalln(err) + } + fmt.Printf("Dump:\n %+v\n", proto.MarshalTextString(dump)) + + time.Sleep(time.Second * 1) + // create a conn/client to create the red/black interfaces // that each tunnel will reference p.runGRPCCreateRedBlackMemifs(client) @@ -199,17 +295,21 @@ func (p *GRPCStressPlugin) runGRPCCreateRedBlackMemifs(client configurator.Confi } func (p *GRPCStressPlugin) runAllClients() { - p.Log.Debugf("numTunnels: %d, numPerRequest: %d, numClients=%d", - *numTunnels, *numPerRequest, *numClients) - - p.Log.Infof("Running for %d clients", *numClients) + p.Log.Infof("----------------------------------------") + p.Log.Infof(" SETTINGS:") + p.Log.Infof("----------------------------------------") + p.Log.Infof(" -> Clients: %d", *numClients) + p.Log.Infof(" -> Requests: %d", *numTunnels) + p.Log.Infof(" -> Tunnels per request: %d", *numPerRequest) + p.Log.Infof("----------------------------------------") + p.Log.Infof("Launching all clients..") t := time.Now() p.wg.Add(*numClients) for i := 0; i < *numClients; i++ { // Set up connection to the server. - conn, err := grpc.Dial("unix", + /*conn, err := grpc.Dial("unix", grpc.WithInsecure(), grpc.WithDialer(dialer(*socketType, *address, dialTimeout)), ) @@ -217,42 +317,73 @@ func (p *GRPCStressPlugin) runAllClients() { log.Fatal(err) } p.conns = append(p.conns, conn) - client := configurator.NewConfiguratorServiceClient(p.conns[i]) + client := configurator.NewConfiguratorServiceClient(p.conns[i])*/ + + client := configurator.NewConfiguratorServiceClient(p.conn) go p.runGRPCStressCreate(i, client, *numTunnels) } p.Log.Debugf("Waiting for clients..") + p.wg.Wait() + took := time.Since(t) - perSec := float64(*numTunnels) / took.Seconds() + perSec := float64((*numTunnels)*(*numClients)) / took.Seconds() p.Log.Infof("All clients done!") - p.Log.Infof("----------------------------------------") - p.Log.Infof(" -> Took: %.3fs", took.Seconds()) - p.Log.Infof(" -> Clients: %d", *numClients) - p.Log.Infof(" -> Requests: %d", *numTunnels) - p.Log.Infof(" -> PERFORMANCE: %.1f req/sec", perSec) - p.Log.Infof("----------------------------------------") - - for i := 0; i < *numClients; i++ { + p.Log.Infof("========================================") + p.Log.Infof(" RESULTS:") + p.Log.Infof("========================================") + p.Log.Infof(" Elapsed: %.2f sec", took.Seconds()) + p.Log.Infof(" Average: %.1f req/sec", perSec) + p.Log.Infof("========================================") + + /*for i := 0; i < *numClients; i++ { if err := p.conns[i].Close(); err != nil { log.Fatal(err) } + }*/ + + if *debug { + client := configurator.NewConfiguratorServiceClient(p.conn) + + time.Sleep(time.Second * 5) + p.Log.Infof("Requesting get..") + + cfg, err := client.Get(context.Background(), &configurator.GetRequest{}) + if err != nil { + log.Fatalln(err) + } + out, _ := (&jsonpb.Marshaler{Indent: " "}).MarshalToString(cfg) + fmt.Printf("Config:\n %+v\n", out) + + time.Sleep(time.Second * 5) + p.Log.Infof("Requesting dump..") + + dump, err := client.Dump(context.Background(), &configurator.DumpRequest{}) + if err != nil { + log.Fatalln(err) + } + fmt.Printf("Dump:\n %+v\n", proto.MarshalTextString(dump)) } } // runGRPCStressCreate creates 1 tunnel and 1 route ... emulating what strongswan does on a per remote warrior -func (p *GRPCStressPlugin) runGRPCStressCreate(id int, client configurator.ConfiguratorServiceClient, numTunnels int) { +func (p *GRPCStressPlugin) runGRPCStressCreate(clientId int, client configurator.ConfiguratorServiceClient, numTunnels int) { defer p.wg.Done() - p.Log.Debugf("Creating %d tunnels/routes ... for client %d, ", numTunnels, id) + p.Log.Debugf("Creating %d tunnels/routes ... for client %d, ", numTunnels, clientId) startTime := time.Now() ips := []string{"10.0.0.1/24"} + report := 0.0 + lastNumTunnels := 0 + lastReport := startTime + for tunNum := 0; tunNum < numTunnels; { if tunNum == numTunnels { break @@ -266,14 +397,15 @@ func (p *GRPCStressPlugin) runGRPCStressCreate(id int, client configurator.Confi break } - tunID := id*numTunnels + tunNum + tunID := clientId*numTunnels + tunNum tunNum++ ipsecTunnelName := fmt.Sprintf("ipsec-%d", tunID) + ipPart0 := 100 + (uint32(tunID)>>16)&0xFF ipPart := gen2octets(uint32(tunID)) - localIP := fmt.Sprintf("100.%s.1", ipPart) - remoteIP := fmt.Sprintf("100.%s.254", ipPart) + localIP := fmt.Sprintf("%d.%s.1", ipPart0, ipPart) + remoteIP := fmt.Sprintf("%d.%s.254", ipPart0, ipPart) ips = append(ips, localIP+"/24") @@ -308,7 +440,7 @@ func (p *GRPCStressPlugin) runGRPCStressCreate(id int, client configurator.Confi OutgoingInterface: ipsecTunnelName, } - //p.Log.Infof("Creating %s ... client: %d, tunNum: %d", ipsecTunnelName, id, tunNum) + //p.Log.Infof("Creating %s ... client: %d, tunNum: %d", ipsecTunnelName, clientId, tunNum) ifaces = append(ifaces, ipsecTunnel) routes = append(routes, route) @@ -325,7 +457,22 @@ func (p *GRPCStressPlugin) runGRPCStressCreate(id int, client configurator.Confi }, }) if err != nil { - log.Fatalf("Error creating tun/route: id/tun=%d/%d, err: %s", id, tunNum, err) + log.Fatalf("Error creating tun/route: clientId/tun=%d/%d, err: %s", clientId, tunNum, err) + } + + progress := (float64(tunNum) / float64(numTunnels)) * 100 + if uint(progress-report) >= *reportProgress { + tunNumReport := tunNum - lastNumTunnels + + took := time.Since(lastReport) + perSec := float64(tunNumReport) / took.Seconds() + + p.Log.Infof("client #%d - progress % 3.0f%% -> %d tunnels took %.3fs (%.1f tunnels/sec)", + clientId, progress, tunNumReport, took.Seconds(), perSec) + + report = progress + lastReport = time.Now() + lastNumTunnels = tunNum } } @@ -361,32 +508,11 @@ func (p *GRPCStressPlugin) runGRPCStressCreate(id int, client configurator.Confi } } - endTime := time.Now() - - p.Log.Infof("Client #%d done, %d tunnels took %s", - id, numTunnels, endTime.Sub(startTime).Round(time.Millisecond)) - - if *debug { - time.Sleep(time.Second * 5) - p.Log.Infof("Requesting get..") - - cfg, err := client.Get(context.Background(), &configurator.GetRequest{}) - if err != nil { - log.Fatalln(err) - } - out, _ := (&jsonpb.Marshaler{Indent: " "}).MarshalToString(cfg) - fmt.Printf("Config:\n %+v\n", out) - - time.Sleep(time.Second * 5) - p.Log.Infof("Requesting dump..") - - dump, err := client.Dump(context.Background(), &configurator.DumpRequest{}) - if err != nil { - log.Fatalln(err) - } - fmt.Printf("Dump:\n %+v\n", proto.MarshalTextString(dump)) - } + took := time.Since(startTime) + perSec := float64(numTunnels) / took.Seconds() + p.Log.Infof("client #%d done => %d tunnels took %.3fs (%.1f tunnels/sec)", + clientId, numTunnels, took.Seconds(), perSec) } func gen3octets(num uint32) string { diff --git a/tests/perf/grpc-perf/vpp.conf b/tests/perf/grpc-perf/vpp.conf new file mode 100644 index 0000000000..b82befd127 --- /dev/null +++ b/tests/perf/grpc-perf/vpp.conf @@ -0,0 +1,26 @@ +unix { + nodaemon + log /tmp/vpp.log + full-coredump + cli-listen 0.0.0.0:5002 + cli-no-pager +} +api-trace { + on +} +api-segment { + global-size 1G +} +socksvr { + socket-name /run/vpp/api.sock +} +statseg { + socket-name /run/vpp/stats.sock + size 128M +} +plugins { + plugin dpdk_plugin.so { disable } +} +ip { + heap-size 64M +} \ No newline at end of file diff --git a/tests/perf/perf_test.sh b/tests/perf/perf_test.sh index bcead8a1a3..dbc97f2646 100755 --- a/tests/perf/perf_test.sh +++ b/tests/perf/perf_test.sh @@ -11,58 +11,74 @@ BASH_ENTRY_DIR="$(dirname $(readlink -e "${BASH_SOURCE[0]}"))" _test="${1}" _requests="${2-1000}" +_perreq="${3-5}" +_numclients="${4-1}" + +_test_client="$SCRIPT_DIR/$_test" +_vpp_config="$_test_client/vpp.conf" [ -z ${REPORT_DIR-} ] && REPORT_DIR="${SCRIPT_DIR}/reports/$_test" export DEBUG_PROFILE_PATH="${REPORT_DIR}" log_report="${REPORT_DIR}/report.log" - log_vpp="${REPORT_DIR}/vpp.log" log_agent="${REPORT_DIR}/agent.log" sys_info="${REPORT_DIR}/sys-info.txt" vpp_info="${REPORT_DIR}/vpp-info.txt" -agent_info="${REPORT_DIR}/agent-info.txt" +agent_info="${REPORT_DIR}/agent-info" cpuprof="${REPORT_DIR}/cpu.pprof" memprof="${REPORT_DIR}/mem.pprof" rest_addr="${REST_ADDR:-http://127.0.0.1:9191}" +sleep_extra="${SLEEP_EXTRA:-5}" # ------- # test # ------- function run_test() { + echo "Preparing PERF testing.." + # create report directory rm -vrf ${REPORT_DIR}/* 2>/dev/null mkdir --mode=777 -p ${REPORT_DIR} - perftest $* 2>&1 | tee $log_report + perftest $* 2>&1 | tee "$log_report" } function perftest() { local perftest="$1" local requests="$2" - + local tunnels="$3" + local clients="$4" + echo "================================================================================" - echo " PERF TEST: $perftest" - echo " - num requests: ${requests}" - echo " - report dir: ${REPORT_DIR}" + echo " PERF TEST - ${perftest}" echo "================================================================================" - + echo "report dir: ${REPORT_DIR}" + echo + echo "settings:" + echo " - requests per client: ${requests}" + echo " - tunnels per request: ${_perreq}" + echo " - clients: ${_numclients}" + echo "--------------------------------------------------------------------------------" + prepare_test trap 'on_exit' EXIT - start_vpp start_agent - echo "-> running $perftest test.." + echo "-> sleeping for $sleep_extra seconds before starting test" + sleep "$sleep_extra" + + echo "-> starting $perftest test.." echo "--------------------------------------------------------------" test_result=0 - $_test_client/$_test ${CLIENT_PARAMS-} --tunnels=$requests || test_result=$? + "$_test_client"/"$_test" --tunnels=$requests --numperreq=$tunnels --clients=$clients ${CLIENT_PARAMS:-} || test_result=$? echo "--------------------------------------------------------------" echo "-> $_test test finished (exit code: $test_result)" @@ -71,39 +87,47 @@ function perftest() { check_vpp check_agent + set +e + + #curl -sSfL "http://127.0.0.1:9094/metrics" > "${REPORT_DIR}/metrics_client.txt" || true + echo "-> collecting system info to: $sys_info" - sysinfo "uname -a" > $sys_info - sysinfo "env" >> $sys_info sysinfo "pwd" >> $sys_info + sysinfo "env | sort" >> $sys_info + sysinfo "uname -a" > $sys_info sysinfo "lscpu" >> $sys_info - sysinfo "ip addr" >> $sys_info sysinfo "free -h" >> $sys_info sysinfo "df -h" >> $sys_info + sysinfo "ip -br link" >> $sys_info + sysinfo "ip -br addr" >> $sys_info + sysinfo "ip -br route" >> $sys_info sysinfo "ps faux" >> $sys_info - echo "-> collecting agent info to: $agent_info" - grep -B 6 "Starting agent version" $log_agent > $agent_info - agentrest "scheduler/stats" >> $agent_info - agentrest "govppmux/stats" >> $agent_info + echo "-> collecting agent data.." + curljson "$rest_addr/scheduler/stats" > "${REPORT_DIR}/agent-stats_scheduler.json" + curljson "$rest_addr/govppmux/stats" > "${REPORT_DIR}/agent-stats_govppmux.json" + curl -sSfL "$rest_addr/metrics" > "${REPORT_DIR}/metrics_agent.txt" - echo "-> collecting VPP info to: $vpp_info" + echo "-> collecting VPP data to: $vpp_info" echo -e "VPP info:\n\n" > $vpp_info + vppcli "show clock" >> $vpp_info vppcli "show version verbose" >> $vpp_info - vppcli "show version cmdline" >> $vpp_info vppcli "show plugins" >> $vpp_info - vppcli "show clock" >> $vpp_info - vppcli "show threads" >> $vpp_info vppcli "show cpu" >> $vpp_info + vppcli "show version cmdline" >> $vpp_info + vppcli "show threads" >> $vpp_info vppcli "show physmem" >> $vpp_info - vppcli "show memory verbose" >> $vpp_info - vppcli "show api clients" >> $vpp_info + vppcli "show memory main-heap verbose" >> $vpp_info + vppcli "show memory api-segment verbose" >> $vpp_info + vppcli "show memory stats-segment verbose" >> $vpp_info vppcli "show api histogram" >> $vpp_info - vppcli "show api trace-status" >> $vpp_info vppcli "show api ring-stats" >> $vpp_info + vppcli "show api trace-status" >> $vpp_info vppcli "api trace status" >> $vpp_info - vppcli "show event-logger" >> $vpp_info - vppcli "show unix errors" >> $vpp_info + vppcli "show api clients" >> $vpp_info vppcli "show unix files" >> $vpp_info + vppcli "show unix errors" >> $vpp_info + vppcli "show event-logger" >> $vpp_info vppcli "show ip fib summary" >> $vpp_info if [[ "$test_result" == "0" ]]; then @@ -113,6 +137,9 @@ function perftest() { else fail "Test client failure (exit code: $test_result)" fi + + echo "-> sleeping for $sleep_extra seconds before stopping" + sleep "$sleep_extra" trap - EXIT stop_agent @@ -134,7 +161,6 @@ function perftest() { function prepare_test() { #cd ${BASH_ENTRY_DIR} # build test client - _test_client="$SCRIPT_DIR/$_test" #[[ -e "./$_test" ]] || { echo "-> compiling test client $_test.." @@ -188,8 +214,8 @@ function start_vpp() { [[ -e "$_vpp" ]] || fail "VPP not found!" echo -n "-> starting VPP ($_vpp).. " - rm -f /dev/shm/db /dev/shm/global_vm /dev/shm/vpe-api - $_vpp -c /etc/vpp/vpp.conf > "$log_vpp" 2>&1 & + rm -vf /dev/shm/db /dev/shm/global_vm /dev/shm/vpe-api + $_vpp -c "${_vpp_config}" > "$log_vpp" 2>&1 & pid_vpp="$!" timeout "${wait_vpp_boot}" grep -q "vlib_plugin_early_init" <(tail -qF $log_vpp) echo "ok! (PID:${pid_vpp})" @@ -299,18 +325,14 @@ function check_agent() { fi } -function agentrest() { - local url="$rest_addr/$1" +function curljson() { + local url="$1" - echo "----------------------------------------------------" - echo "GET $url" - echo "----------------------------------------------------" curl -sSfL -H "Content-Type: application/json" "$url" - echo } # skip running test if no argument is given (source) #[ -z "${1-}" ] || run_test $1 -run_test "$_test" "$_requests" +run_test "$_test" "$_requests" "$_perreq" "$_numclients" diff --git a/tests/perf/run_all.sh b/tests/perf/run_all.sh index 9d3169e427..90bd15c968 100755 --- a/tests/perf/run_all.sh +++ b/tests/perf/run_all.sh @@ -1,13 +1,11 @@ #!/bin/bash - set -euo pipefail SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - cd $SCRIPT_DIR function run() { - test="$1" + test="${1}" typ="${2-basic}" requests="${3-${REQUESTS-1000}}"