Skip to content
Permalink
Browse files

Preserve router metrics across restarts

  • Loading branch information...
smarterclayton committed Jan 23, 2018
1 parent 9ac06b1 commit 8f0119bdd9c3b679cdfdf2962143435a95e08eae
Showing with 105 additions and 21 deletions.
  1. +7 −1 pkg/cmd/infra/router/template.go
  2. +76 −19 pkg/router/metrics/haproxy/haproxy.go
  3. +22 −1 test/extended/router/metrics.go
@@ -259,6 +259,8 @@ func (o *TemplateRouterOptions) Validate() error {
func (o *TemplateRouterOptions) Run() error {
glog.Infof("Starting template router (%s)", version.Get())

var reloadCallbacks []func()

statsPort := o.StatsPort
switch {
case o.MetricsType == "haproxy" && statsPort != 0:
@@ -301,7 +303,7 @@ func (o *TemplateRouterOptions) Run() error {
}
}

_, err := haproxy.NewPrometheusCollector(haproxy.PrometheusOptions{
collector, err := haproxy.NewPrometheusCollector(haproxy.PrometheusOptions{
// Only template router customizers who alter the image should need this
ScrapeURI: util.Env("ROUTER_METRICS_HAPROXY_SCRAPE_URI", ""),
// Only template router customizers who alter the image should need this
@@ -379,13 +381,17 @@ func (o *TemplateRouterOptions) Run() error {
})
}
l.Listen()

// on reload, invoke the collector to preserve whatever metrics we can
reloadCallbacks = append(reloadCallbacks, collector.CollectNow)
}

pluginCfg := templateplugin.TemplatePluginConfig{
WorkingDir: o.WorkingDir,
TemplatePath: o.TemplateFile,
ReloadScriptPath: o.ReloadScript,
ReloadInterval: o.ReloadInterval,
ReloadCallbacks: reloadCallbacks,
DefaultCertificate: o.DefaultCertificate,
DefaultCertificatePath: o.DefaultCertificatePath,
DefaultCertificateDir: o.DefaultCertificateDir,
@@ -93,20 +93,30 @@ func (m metrics) Names() []int {
return keys
}

type metricID struct {
proxyType string
proxyName string
serverName string
}

// counterValuesByMetric is used to track values across reloads
type counterValuesByMetric map[metricID]map[int]int64

// defaultSelectedMetrics is the list of metrics included by default. These metrics are a subset
// of the metrics exposed by haproxy_exporter by default for performance reasons.
var defaultSelectedMetrics = []int{2, 4, 5, 7, 8, 9, 13, 14, 17, 21, 24, 33, 35, 40, 43, 60}

// defaultCounterMetrics is the list of metrics that are counters and should be preserved across
// restarts. Only add metrics to this list if they are a counter.
var defaultCounterMetrics = []int{7, 8, 9, 13, 14, 21, 24, 40, 43}

// Exporter collects HAProxy stats from the given URI and exports them using
// the prometheus metrics package.
type Exporter struct {
opts PrometheusOptions
mutex sync.RWMutex
fetch func() (io.ReadCloser, error)

// pendingScrape indicates that a scrape was triggered in the background, and so metrics should be
// reported without recollection
pendingScrape bool
// lastScrape is the time the last scrape was invoked if at all
lastScrape *time.Time
// scrapeInterval is a calculated value based on the number of rows returned by HAProxy
@@ -123,6 +133,13 @@ type Exporter struct {
totalScrapes, csvParseFailures prometheus.Counter
serverThresholdCurrent, serverThresholdLimit prometheus.Gauge
frontendMetrics, backendMetrics, serverMetrics map[int]*prometheus.GaugeVec

// baseCounterValues is added to the value specific haproxy frontend, backend, or server counter
// metrics. This allows metrics to be tracked across restarts. This map is updated whenever CollectNow
// is invoked.
baseCounterValues counterValuesByMetric
// the metrics to append to baseCounterValues
counterMetrics map[int]struct{}
}

// NewExporter returns an initialized Exporter. baseScrapeInterval is how often to scrape per 1000 entries
@@ -143,6 +160,11 @@ func NewExporter(opts PrometheusOptions) (*Exporter, error) {
return nil, fmt.Errorf("unsupported scheme: %q", u.Scheme)
}

counterMetrics := make(map[int]struct{})
for _, m := range defaultCounterMetrics {
counterMetrics[m] = struct{}{}
}

return &Exporter{
opts: opts,
fetch: fetch,
@@ -253,6 +275,7 @@ func NewExporter(opts PrometheusOptions) (*Exporter, error) {
44: newServerMetric("http_responses_total", "Total of HTTP responses.", prometheus.Labels{"code": "other"}),
60: newServerMetric("http_average_response_latency_milliseconds", "Average response latency of the last 1024 requests in milliseconds.", nil),
}),
counterMetrics: counterMetrics,
}, nil
}

@@ -285,16 +308,13 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {

now := time.Now()
switch {
case e.pendingScrape:
// CollectNow was called before
e.pendingScrape = false
case e.lastScrape != nil && e.lastScrape.Add(e.scrapeInterval).After(now):
// do nothing, return the most recently scraped metrics
glog.V(6).Infof("Will not scrape HAProxy metrics more often than every %s", e.scrapeInterval)
default:
e.lastScrape = &now
e.resetMetrics()
e.scrape()
e.scrape(false)
}

ch <- e.up
@@ -312,9 +332,8 @@ func (e *Exporter) CollectNow() {
defer e.mutex.Unlock()

e.resetMetrics()
e.scrape()
e.scrape(true)
e.lastScrape = nil
e.pendingScrape = true
}

func fetchHTTP(uri string, timeout time.Duration) func() (io.ReadCloser, error) {
@@ -359,9 +378,14 @@ func fetchUnix(u *url.URL, timeout time.Duration) func() (io.ReadCloser, error)
}
}

func (e *Exporter) scrape() {
func (e *Exporter) scrape(record bool) {
e.totalScrapes.Inc()

var targetValues counterValuesByMetric
if record {
targetValues = make(counterValuesByMetric)
}

body, err := e.fetch()
if err != nil {
e.up.Set(0)
@@ -411,7 +435,12 @@ loop:
}

rows++
e.parseRow(row)
e.parseRow(row, targetValues)
}

// swap the counter values
if record {
e.baseCounterValues = targetValues
}

e.serverLimited = servers > e.opts.ServerThreshold
@@ -451,30 +480,34 @@ func (e *Exporter) collectMetrics(metrics chan<- prometheus.Metric) {
}
}

func (e *Exporter) parseRow(csvRow []string) {
// parseRow identifies which metrics to capture for a given row based on type and the value of pxname and svname. If the
// proxy and server names match our conventions they are labelled to the given route, service, or pod - if they don't match
// then a generic label set is applied. If targetValues is non-nil then the map will be populated with the updated counter state
// for each metric.
func (e *Exporter) parseRow(csvRow []string, targetValues counterValuesByMetric) {
pxname, svname, typ := csvRow[0], csvRow[1], csvRow[32]

switch typ {
case frontendType:
e.exportCsvFields(e.frontendMetrics, csvRow, pxname)
e.exportAndRecordRow(e.frontendMetrics, metricID{proxyType: serverType, proxyName: pxname}, targetValues, csvRow, pxname)
case backendType:
if mode, value, ok := knownBackendSegment(pxname); ok {
if namespace, name, ok := parseNameSegment(value); ok {
e.exportCsvFields(e.backendMetrics, csvRow, mode, namespace, name)
e.exportAndRecordRow(e.backendMetrics, metricID{proxyType: serverType, proxyName: pxname}, targetValues, csvRow, mode, namespace, name)
return
}
}
e.exportCsvFields(e.backendMetrics, csvRow, "other/"+pxname, "", "")
e.exportAndRecordRow(e.backendMetrics, metricID{proxyType: serverType, proxyName: pxname}, targetValues, csvRow, "other/"+pxname, "", "")
case serverType:
pod, service, server, _ := knownServerSegment(svname)

if _, value, ok := knownBackendSegment(pxname); ok {
if namespace, name, ok := parseNameSegment(value); ok {
e.exportCsvFields(e.serverMetrics, csvRow, server, namespace, name, pod, service)
e.exportAndRecordRow(e.serverMetrics, metricID{serverType, pxname, svname}, targetValues, csvRow, server, namespace, name, pod, service)
return
}
}
e.exportCsvFields(e.serverMetrics, csvRow, server, "", "", pod, service)
e.exportAndRecordRow(e.serverMetrics, metricID{proxyType: serverType, serverName: svname}, targetValues, csvRow, server, "", "", pod, service)
}
}

@@ -538,7 +571,26 @@ func parseStatusField(value string) int64 {
return 0
}

func (e *Exporter) exportCsvFields(metrics metrics, csvRow []string, labels ...string) {
// exportAndRecordRow parses the provided csvRow labels for the specified metrics and then updates their value given labels. If targetValues is
// non-nil the current value of the metric will be written back to rowID. This allows baseline values to be recorded and used across restarts of
// HAProxy.
func (e *Exporter) exportAndRecordRow(metrics metrics, rowID metricID, targetValues counterValuesByMetric, csvRow []string, labels ...string) {
updateValues := targetValues != nil
baseCounterValues := e.baseCounterValues[rowID]
if updateValues && baseCounterValues == nil {
baseCounterValues = make(map[int]int64)
}

exportCSVFields(e.csvParseFailures, metrics, baseCounterValues, updateValues, csvRow, labels)

if updateValues {
targetValues[rowID] = baseCounterValues
}
}

// exportCSVFields iterates over the returned CSV values and sets the appropriate metric in the map. Empty values or parse errors result in
// no metric being scraped.
func exportCSVFields(csvParseFailures prometheus.Counter, metrics metrics, baselineValues map[int]int64, updateBaseline bool, csvRow []string, labels []string) {
for fieldIdx, metric := range metrics {
valueStr := csvRow[fieldIdx]
if valueStr == "" {
@@ -554,10 +606,15 @@ func (e *Exporter) exportCsvFields(metrics metrics, csvRow []string, labels ...s
value, err = strconv.ParseInt(valueStr, 10, 64)
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't parse CSV field value %s: %v", valueStr, err))
e.csvParseFailures.Inc()
csvParseFailures.Inc()
continue
}
value += baselineValues[fieldIdx]
}
if updateBaseline {
baselineValues[fieldIdx] = value
}

metric.WithLabelValues(labels...).Set(float64(value))
}
}
@@ -114,13 +114,13 @@ var _ = g.Describe("[Conformance][Area:Networking][Feature:Router] openshift rou
serverLabels := labels{"namespace": ns, "route": "weightedroute"}
var metrics map[string]*dto.MetricFamily
times := 10
p := expfmt.TextParser{}
var results string
defer func() { e2e.Logf("received metrics:\n%s", results) }()
err = wait.PollImmediate(2*time.Second, 240*time.Second, func() (bool, error) {
results, err = getAuthenticatedURLViaPod(ns, execPodName, fmt.Sprintf("http://%s:%d/metrics", host, statsPort), username, password)
o.Expect(err).NotTo(o.HaveOccurred())

p := expfmt.TextParser{}
metrics, err = p.TextToMetricFamilies(bytes.NewBufferString(results))
o.Expect(err).NotTo(o.HaveOccurred())
//e2e.Logf("Metrics:\n%s", results)
@@ -181,6 +181,27 @@ var _ = g.Describe("[Conformance][Area:Networking][Feature:Router] openshift rou
// router metrics
o.Expect(findMetricsWithLabels(metrics["template_router_reload_seconds"], nil)[0].Summary.GetSampleSum()).To(o.BeNumerically(">", 0))
o.Expect(findMetricsWithLabels(metrics["template_router_write_config_seconds"], nil)[0].Summary.GetSampleSum()).To(o.BeNumerically(">", 0))

// TODO: uncomment after this merges in a follow on (once the router image supports reload)
// verify that across a reload metrics are preserved
// g.By("forcing a router restart after a pod deletion")

// // delete the pod
// err = oc.AdminKubeClient().CoreV1().Pods(ns).Delete("endpoint-2", nil)
// o.Expect(err).NotTo(o.HaveOccurred())

// g.By("waiting for the router to reload")
// time.Sleep(15 * time.Second)

// g.By("checking that some metrics are not reset to 0 after router restart")
// defer func() { e2e.Logf("received metrics:\n%s", results) }()
// results, err = getAuthenticatedURLViaPod(ns, execPodName, fmt.Sprintf("http://%s:%d/metrics", host, statsPort), username, password)
// o.Expect(err).NotTo(o.HaveOccurred())

// updatedMetrics, err := p.TextToMetricFamilies(bytes.NewBufferString(results))
// o.Expect(err).NotTo(o.HaveOccurred())
// o.Expect(findGaugesWithLabels(updatedMetrics["haproxy_backend_connections_total"], routeLabels)[0]).To(o.BeNumerically(">=", findGaugesWithLabels(metrics["haproxy_backend_connections_total"], routeLabels)[0]))
// o.Expect(findGaugesWithLabels(updatedMetrics["haproxy_server_bytes_in_total"], serverLabels)[0]).To(o.BeNumerically(">=", findGaugesWithLabels(metrics["haproxy_server_bytes_in_total"], serverLabels)[0]))
})

g.It("should expose the profiling endpoints", func() {

0 comments on commit 8f0119b

Please sign in to comment.
You can’t perform that action at this time.