Skip to content

Commit

Permalink
Merge e5f7962 into a5c690a
Browse files Browse the repository at this point in the history
  • Loading branch information
ondrej-fabry committed Feb 28, 2020
2 parents a5c690a + e5f7962 commit 0ec9579
Show file tree
Hide file tree
Showing 18 changed files with 464 additions and 113 deletions.
1 change: 1 addition & 0 deletions cmd/vpp-agent/app/vpp_agent.go
Expand Up @@ -106,6 +106,7 @@ func New() *VPPAgent {
}
orchestrator.DefaultPlugin.Watcher = watchers
orchestrator.DefaultPlugin.StatusPublisher = writers
orchestrator.EnabledGrpcMetrics()

ifplugin.DefaultPlugin.Watcher = watchers
ifplugin.DefaultPlugin.NotifyStates = ifStatePub
Expand Down
8 changes: 4 additions & 4 deletions docker/dev/Dockerfile
Expand Up @@ -89,15 +89,15 @@ COPY ./docker/dev/init_hook.sh /usr/bin/
# Install agent
WORKDIR /src/ligato/vpp-agent

COPY go.mod go.sum ./
RUN go mod download
COPY . ./

ARG VERSION
ARG COMMIT
ARG BRANCH
ARG BUILD_DATE

COPY go.mod go.sum ./
RUN go mod download
COPY . ./

RUN make install purge

ENV SUPERVISOR_CONFIG=/opt/vpp-agent/dev/supervisor.conf
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Expand Up @@ -25,6 +25,7 @@ require (
github.com/golang/protobuf v1.3.3
github.com/gorilla/mux v1.6.2
github.com/gorilla/websocket v1.4.1 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.11.3 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand All @@ -49,12 +50,12 @@ require (
github.com/vishvananda/netlink v0.0.0-20180910184128-56b1bd27a9a3
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect
go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200115100339-59da3fdeb0c7
go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200224101443-4cf6c674e05d
go.uber.org/multierr v1.2.0 // indirect
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 // indirect
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 // indirect
google.golang.org/grpc v1.27.0
google.golang.org/grpc v1.27.1
gotest.tools v2.2.0+incompatible // indirect
)
8 changes: 4 additions & 4 deletions go.sum
Expand Up @@ -359,8 +359,8 @@ github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBU
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200115100339-59da3fdeb0c7 h1:B8GC+I0QFHcyTfpdWDszXcdNzqBsW6koR+30bySHAWY=
go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200115100339-59da3fdeb0c7/go.mod h1:lrof6xgwrmjAtpkfKcrUr1MqaOmuPEXORWviyXyZA2w=
go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200224101443-4cf6c674e05d h1:GLbxg/WhwjXiuw8KN5hZfYIW6VnkAjN9ZlWxi8uFje4=
go.ligato.io/cn-infra/v2 v2.5.0-alpha.0.20200224101443-4cf6c674e05d/go.mod h1:mYLtG2Bq3C/SOUUafEe8JOwdqohd4NVYl6Bu/nh/O8Y=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down Expand Up @@ -456,8 +456,8 @@ google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
Expand Down
2 changes: 1 addition & 1 deletion plugins/govppmux/metrics.go
Expand Up @@ -25,7 +25,7 @@ import (
// Set of raw Prometheus metrics.
// Labels
// * message
// *
// * error
// Do not increment directly, use Report* methods.
var (
channelsCreated = prometheus.NewCounter(prometheus.CounterOpts{
Expand Down
116 changes: 116 additions & 0 deletions plugins/kvscheduler/metrics.go
@@ -0,0 +1,116 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kvscheduler

import (
"time"

"github.com/prometheus/client_golang/prometheus"

kvs "go.ligato.io/vpp-agent/v3/plugins/kvscheduler/api"
)

// Set of raw Prometheus metrics.
// Labels
// * txn_type
// * slice
// Do not increment directly, use Report* methods.
var (
transactionsProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ligato",
Subsystem: "kvscheduler",
Name: "txn_processed",
Help: "The total number of transactions processed.",
})
transactionsDropped = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ligato",
Subsystem: "kvscheduler",
Name: "txn_dropped",
Help: "The total number of transactions dropped.",
})
queueCapacity = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ligato",
Subsystem: "kvscheduler",
Name: "queue_capacity",
Help: "The capacity of the transactions queue.",
})
queueLength = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ligato",
Subsystem: "kvscheduler",
Name: "queue_length",
Help: "The number of transactions in the queue.",
})
queueWaitSeconds = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "ligato",
Subsystem: "kvscheduler",
Name: "queue_wait_seconds",
Help: "Wait time in queue for transactions.",
MaxAge: time.Second * 30,
},
[]string{"txn_type"},
)
txnProcessDurationSeconds = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "ligato",
Subsystem: "kvscheduler",
Name: "txn_process_duration_seconds",
Help: "Processing time of transactions.",
MaxAge: time.Second * 30,
},
[]string{"slice"},
)
txnDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "ligato",
Subsystem: "kvscheduler",
Name: "txn_duration_seconds",
Help: "Bucketed histogram of processing time of transactions by type.",
},
[]string{"txn_type"},
)
)

func init() {
prometheus.MustRegister(transactionsProcessed)
prometheus.MustRegister(transactionsDropped)
prometheus.MustRegister(queueCapacity)
prometheus.MustRegister(queueLength)
prometheus.MustRegister(queueWaitSeconds)
prometheus.MustRegister(txnProcessDurationSeconds)
prometheus.MustRegister(txnDurationSeconds)
}

func reportTxnProcessed(typ kvs.TxnType, sec float64) {
transactionsProcessed.Inc()
txnDurationSeconds.WithLabelValues(typ.String()).Observe(sec)
}

func reportTxnDropped() {
transactionsDropped.Inc()
}

func reportQueueCap(c int) {
queueCapacity.Set(float64(c))
}

func reportQueued(n int) {
queueLength.Add(float64(n))
}

func reportQueueWait(typ kvs.TxnType, sec float64) {
queueWaitSeconds.WithLabelValues(typ.String()).Observe(sec)
}

func reportTxnProcessDuration(slice string, sec float64) {
txnProcessDurationSeconds.WithLabelValues(slice).Observe(sec)
}
3 changes: 3 additions & 0 deletions plugins/kvscheduler/plugin_scheduler.go
Expand Up @@ -180,6 +180,7 @@ func (s *Scheduler) Init() error {
s.registry = registry.NewRegistry()
// prepare channel for serializing transactions
s.txnQueue = make(chan *transaction, 100)
reportQueueCap(cap(s.txnQueue))
// register REST API handlers
s.registerHandlers(s.HTTPHandlers)
// initialize key-set used to mark values with updated status
Expand Down Expand Up @@ -292,6 +293,7 @@ func (s *Scheduler) TransactionBarrier() {
func (s *Scheduler) PushSBNotification(notif ...kvs.KVWithMetadata) error {
txn := &transaction{
txnType: kvs.SBNotification,
created: time.Now(),
}
for _, value := range notif {
txn.values = append(txn.values, kvForTxn{
Expand Down Expand Up @@ -441,6 +443,7 @@ func (txn *SchedulerTxn) Commit(ctx context.Context) (txnSeqNum uint64, err erro
txnType: kvs.NBTransaction,
nb: &nbTxn{},
values: make([]kvForTxn, 0, len(txn.values)),
created: time.Now(),
}

// collect values
Expand Down
1 change: 1 addition & 0 deletions plugins/kvscheduler/stats.go
Expand Up @@ -128,6 +128,7 @@ func trackTransactionMethod(m string) func() {
s := stats.TxnStats
return func() {
took := time.Since(t)
reportTxnProcessDuration(m, took.Seconds())
statsMu.Lock()
ms, tracked := s.Methods[m]
if !tracked {
Expand Down
3 changes: 3 additions & 0 deletions plugins/kvscheduler/txn_process.go
Expand Up @@ -39,6 +39,7 @@ type transaction struct {
values []kvForTxn
nb *nbTxn // defined for NB transactions
retry *retryTxn // defined for retry of failed operations
created time.Time
}

// kvForTxn represents a new value for a given key to be applied in a transaction.
Expand Down Expand Up @@ -92,7 +93,9 @@ func (s *Scheduler) consumeTransactions() {
if canceled {
return
}
reportQueueWait(txn.txnType, time.Since(txn.created).Seconds())
s.processTransaction(txn)
reportTxnProcessed(txn.txnType, time.Since(txn.created).Seconds())
}
}

Expand Down
7 changes: 6 additions & 1 deletion plugins/kvscheduler/txn_queue.go
Expand Up @@ -34,15 +34,18 @@ func (s *Scheduler) enqueueTxn(txn *transaction) error {
case <-s.ctx.Done():
return kvs.ErrClosedScheduler
case s.txnQueue <- txn:
reportQueued(1)
return nil
}
}
select {
case <-s.ctx.Done():
return kvs.ErrClosedScheduler
case s.txnQueue <- txn:
reportQueued(1)
return nil
default:
reportTxnDropped()
return kvs.ErrTxnQueueFull
}
}
Expand All @@ -53,6 +56,7 @@ func (s *Scheduler) dequeueTxn() (txn *transaction, canceled bool) {
case <-s.ctx.Done():
return nil, true
case txn = <-s.txnQueue:
reportQueued(-1)
//trace.Log(txn.ctx, "txn", "dequeue")
return txn, false
}
Expand All @@ -75,12 +79,13 @@ func (s *Scheduler) delayRetry(args *retryTxn) {
err := s.enqueueTxn(&transaction{
txnType: kvs.RetryFailedOps,
retry: args,
created: time.Now(),
})
if err != nil {
s.Log.WithFields(logging.Fields{
"txnSeqNum": args.txnSeqNum,
"err": err,
}).Warn("Failed to enqueue re-try for failed operations")
}).Warn("Failed to enqueue retry transaction for failed operations")
s.enqueueRetry(args) // try again with the same time period
}
}
Expand Down
5 changes: 2 additions & 3 deletions plugins/orchestrator/dispatcher.go
Expand Up @@ -15,16 +15,15 @@
package orchestrator

import (
"context"
"fmt"
"runtime/trace"
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"go.ligato.io/cn-infra/v2/logging"
"golang.org/x/net/context"

"github.com/golang/protobuf/proto"

"go.ligato.io/vpp-agent/v3/pkg/models"
kvs "go.ligato.io/vpp-agent/v3/plugins/kvscheduler/api"
Expand Down
8 changes: 7 additions & 1 deletion plugins/orchestrator/options.go
Expand Up @@ -15,6 +15,7 @@
package orchestrator

import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.ligato.io/cn-infra/v2/datasync/kvdbsync/local"
"go.ligato.io/cn-infra/v2/rpc/grpc"

Expand Down Expand Up @@ -45,8 +46,13 @@ func NewPlugin(opts ...Option) *Plugin {
// Option is a function that acts on a Plugin to inject Dependencies or configuration
type Option func(*Plugin)

func WithReflection(enabled bool) Option {
func UseReflection(enabled bool) Option {
return func(p *Plugin) {
p.reflection = enabled
}
}

func EnabledGrpcMetrics() {
grpc_prometheus.EnableHandlingTimeHistogram()
grpc.UsePromMetrics(grpc_prometheus.DefaultServerMetrics)(&grpc.DefaultPlugin)
}
6 changes: 4 additions & 2 deletions plugins/vpp/ifplugin/interface_state.go
Expand Up @@ -251,8 +251,10 @@ func (c *InterfaceStateUpdater) doUpdatesIfStateDetails() {

var ifIdxs []uint32
c.access.Lock()
for ifIdx := range c.ifsForUpdate {
ifIdxs = append(ifIdxs, ifIdx)
if len(c.ifsForUpdate) < 1000 {
for ifIdx := range c.ifsForUpdate {
ifIdxs = append(ifIdxs, ifIdx)
}
}
// clear interfaces for update
c.ifsForUpdate = make(map[uint32]struct{})
Expand Down
42 changes: 42 additions & 0 deletions scripts/run_perf_test.sh
@@ -0,0 +1,42 @@
#!/usr/bin/env bash
set -euo pipefail

# usage: ./scripts/run_perf_test.sh <num_req> <num_tunnels_per_req> <num_clients>

num_req=${1-10000}

image=${AGENT_IMG:-ligato/dev-vpp-agent}
reports=${REPORT_DIR:-report}
profiling_mode=${PROF_MODE-}

reports="$(cd $reports && pwd)"

runid=${RUN-"${num_req}-req"}
results="${reports}/perf-results-${runid}"

mkdir -p "$results"

echo "Starting perf test run: $runid"

cid=$(docker run -d -it --privileged \
--label perf-run="$runid" \
-v "$results":/report \
-e REPORT_DIR=/report \
-e ETCD_CONFIG=disabled \
-e INITIAL_LOGLVL=info \
-e DEBUG_ENABLED=y \
-e DEBUG_PROFILE_MODE="$profiling_mode" \
${DOCKER_EXTRA_ARGS:-} \
-- \
"$image" /bin/bash \
)

function on_exit() {
docker stop -t 1 "$cid"
exit
}
trap 'on_exit' EXIT

docker exec -it "$cid" bash ./tests/perf/perf_test.sh grpc-perf $*

echo "Test results stored in: $results"

0 comments on commit 0ec9579

Please sign in to comment.