Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redefine piped stats model #190

Merged
merged 1 commit into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.1
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions pkg/app/piped/statsreporter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ go_library(
"//pkg/app/api/service/pipedservice:go_default_library",
"//pkg/model:go_default_library",
"//pkg/version:go_default_library",
"@com_github_prometheus_client_model//go:go_default_library",
"@com_github_prometheus_common//expfmt:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
Expand Down
125 changes: 57 additions & 68 deletions pkg/app/piped/statsreporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
package statsreporter

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand Down Expand Up @@ -74,19 +71,19 @@ L:
break L

case now := <-ticker.C:
stats, err := r.collect()
metrics, err := r.collect()
if err != nil {
continue
}
if len(stats) == 0 {
r.logger.Info("there are no stats to report")
if len(metrics) == 0 {
r.logger.Info("there are no metrics to report")
continue
}
if r.report(ctx, stats, now); err != nil {
if r.report(ctx, metrics, now); err != nil {
continue
}
r.logger.Info("successfully collected and reported stats",
zap.Int("num", len(stats)),
r.logger.Info("successfully collected and reported metrics",
zap.Int("num", len(metrics)),
zap.Duration("duration", time.Since(now)),
)
}
Expand All @@ -96,29 +93,29 @@ L:
return nil
}

func (r *reporter) collect() ([]*model.PipedStats_PrometheusMetrics, error) {
func (r *reporter) collect() ([]*model.PrometheusMetrics, error) {
resp, err := r.httpClient.Get(r.metricsURL)
if err != nil {
r.logger.Error("failed to collect prometheus metrics", zap.Error(err))
return nil, err
}
defer resp.Body.Close()

stats, err := parsePrometheusMetrics(resp.Body)
metrics, err := parsePrometheusMetrics(resp.Body)
if err != nil {
r.logger.Error("failed to parse prometheus metrics", zap.Error(err))
return nil, err
}

return stats, nil
return metrics, nil
}

func (r *reporter) report(ctx context.Context, stats []*model.PipedStats_PrometheusMetrics, now time.Time) error {
func (r *reporter) report(ctx context.Context, metrics []*model.PrometheusMetrics, now time.Time) error {
req := &pipedservice.PingRequest{
PipedStats: &model.PipedStats{
Version: version.Get().Version,
Timestamp: now.Unix(),
PrometheusStats: stats,
Version: version.Get().Version,
Timestamp: now.Unix(),
PrometheusMetrics: metrics,
},
}
if _, err := r.apiClient.Ping(ctx, req); err != nil {
Expand All @@ -128,68 +125,60 @@ func (r *reporter) report(ctx context.Context, stats []*model.PipedStats_Prometh
return nil
}

var helpPrefix = []byte("# HELP")

const typePrefix = "# TYPE"
var parser expfmt.TextParser

// TODO: Add a metrics whitelist and fiter out not needed ones.
func parsePrometheusMetrics(reader io.Reader) ([]*model.PipedStats_PrometheusMetrics, error) {
var (
curType = model.PipedStats_PrometheusMetrics_UNKNOWN
metrics []*model.PipedStats_PrometheusMetrics
)
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
lb := scanner.Bytes()
if len(lb) == 0 {
continue
func parsePrometheusMetrics(reader io.Reader) ([]*model.PrometheusMetrics, error) {
metricFamily, err := parser.TextToMetricFamilies(reader)
if err != nil {
return nil, err
}

metrics := make([]*model.PrometheusMetrics, 0, len(metricFamily))

L:
for _, mf := range metricFamily {
var metricType model.PrometheusMetrics_Type

switch mf.GetType() {
case dto.MetricType_COUNTER:
metricType = model.PrometheusMetrics_COUNTER
case dto.MetricType_GAUGE:
metricType = model.PrometheusMetrics_GAUGE
default:
continue L
}

// Ignore all HELP line.
if bytes.HasPrefix(lb, helpPrefix) {
continue
metric := &model.PrometheusMetrics{
Name: *mf.Name,
Type: metricType,
}

// Extract current type from TYPE line.
line := string(lb)
if strings.HasPrefix(line, typePrefix) {
parts := strings.Split(line, " ")
if len(parts) < 3 {
return nil, fmt.Errorf("malformed TYPE line %s", line)
for _, m := range mf.Metric {
sample := &model.PrometheusMetrics_Sample{
Labels: make([]*model.PrometheusMetrics_LabelPair, 0, len(m.Label)),
}
switch parts[len(parts)-1] {
case "gauge":
curType = model.PipedStats_PrometheusMetrics_GAUGE
case "counter":
curType = model.PipedStats_PrometheusMetrics_COUNTER
default:
curType = model.PipedStats_PrometheusMetrics_UNKNOWN
metric.Samples = append(metric.Samples, sample)

for _, l := range m.Label {
sample.Labels = append(sample.Labels, &model.PrometheusMetrics_LabelPair{
Name: l.GetName(),
Value: l.GetValue(),
})
}
continue
}

if curType == model.PipedStats_PrometheusMetrics_UNKNOWN {
continue
switch metric.Type {
case model.PrometheusMetrics_COUNTER:
sample.Value = m.Counter.GetValue()
case model.PrometheusMetrics_GAUGE:
sample.Value = m.Gauge.GetValue()
}
}

// Extract metrics data.
lastSpaceIndex := strings.LastIndexByte(line, ' ')
if lastSpaceIndex < 0 {
continue
}
value, err := strconv.ParseFloat(line[lastSpaceIndex+1:], 64)
if err != nil {
return nil, err
if len(metric.Samples) > 0 {
metrics = append(metrics, metric)
}
metrics = append(metrics, &model.PipedStats_PrometheusMetrics{
Type: curType,
Name: line[:lastSpaceIndex],
Value: value,
})
}
if err := scanner.Err(); err != nil {
return nil, err
}

return metrics, nil
}
3 changes: 2 additions & 1 deletion pkg/app/piped/statsreporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ func TestParsePrometheusMetrics(t *testing.T) {

metrics, err := parsePrometheusMetrics(f)
require.NoError(t, err)
assert.Equal(t, 27, len(metrics))

assert.Equal(t, 30, len(metrics))
}
71 changes: 41 additions & 30 deletions pkg/app/piped/statsreporter/testdata/metrics.txt
Original file line number Diff line number Diff line change
@@ -1,63 +1,63 @@
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 3.8513e-05
go_gc_duration_seconds{quantile="0.25"} 4.4435e-05
go_gc_duration_seconds{quantile="0.5"} 5.9408e-05
go_gc_duration_seconds{quantile="0.75"} 0.00010296
go_gc_duration_seconds{quantile="1"} 0.000199262
go_gc_duration_seconds_sum 0.000722282
go_gc_duration_seconds_count 9
go_gc_duration_seconds{quantile="0"} 3.8913e-05
go_gc_duration_seconds{quantile="0.25"} 4.4122e-05
go_gc_duration_seconds{quantile="0.5"} 6.918e-05
go_gc_duration_seconds{quantile="0.75"} 0.00014006
go_gc_duration_seconds{quantile="1"} 0.00017785
go_gc_duration_seconds_sum 0.000994407
go_gc_duration_seconds_count 11
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 308
go_goroutines 305
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.14.4"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 1.7628976e+07
go_memstats_alloc_bytes 1.2961024e+07
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 5.3620752e+07
go_memstats_alloc_bytes_total 5.0056888e+07
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 1.469857e+06
go_memstats_buck_hash_sys_bytes 1.467122e+06
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 206594
go_memstats_frees_total 225496
# HELP go_memstats_gc_cpu_fraction The fraction of this program's available CPU time used by the GC since the program started.
# TYPE go_memstats_gc_cpu_fraction gauge
go_memstats_gc_cpu_fraction 0.0006767014484907021
go_memstats_gc_cpu_fraction 0.000716180943817101
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 3.602696e+06
go_memstats_gc_sys_bytes 3.5986e+06
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 1.7628976e+07
go_memstats_heap_alloc_bytes 1.2961024e+07
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 4.448256e+07
go_memstats_heap_idle_bytes 4.8766976e+07
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 2.0365312e+07
go_memstats_heap_inuse_bytes 1.6048128e+07
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 142521
go_memstats_heap_objects 119395
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 3.9714816e+07
go_memstats_heap_released_bytes 4.3614208e+07
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 6.4847872e+07
go_memstats_heap_sys_bytes 6.4815104e+07
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
go_memstats_last_gc_time_seconds 1.592872471600576e+09
go_memstats_last_gc_time_seconds 1.592924191333674e+09
# HELP go_memstats_lookups_total Total number of pointer lookups.
# TYPE go_memstats_lookups_total counter
go_memstats_lookups_total 0
# HELP go_memstats_mallocs_total Total number of mallocs.
# TYPE go_memstats_mallocs_total counter
go_memstats_mallocs_total 349115
go_memstats_mallocs_total 344891
# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
# TYPE go_memstats_mcache_inuse_bytes gauge
go_memstats_mcache_inuse_bytes 27776
Expand All @@ -66,25 +66,36 @@ go_memstats_mcache_inuse_bytes 27776
go_memstats_mcache_sys_bytes 32768
# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
# TYPE go_memstats_mspan_inuse_bytes gauge
go_memstats_mspan_inuse_bytes 323000
go_memstats_mspan_inuse_bytes 302872
# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
# TYPE go_memstats_mspan_sys_bytes gauge
go_memstats_mspan_sys_bytes 344064
go_memstats_mspan_sys_bytes 327680
# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
# TYPE go_memstats_next_gc_bytes gauge
go_memstats_next_gc_bytes 2.5871136e+07
go_memstats_next_gc_bytes 2.3010352e+07
# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
# TYPE go_memstats_other_sys_bytes gauge
go_memstats_other_sys_bytes 3.283543e+06
go_memstats_other_sys_bytes 3.568902e+06
# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
# TYPE go_memstats_stack_inuse_bytes gauge
go_memstats_stack_inuse_bytes 2.260992e+06
go_memstats_stack_inuse_bytes 2.29376e+06
# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
# TYPE go_memstats_stack_sys_bytes gauge
go_memstats_stack_sys_bytes 2.260992e+06
go_memstats_stack_sys_bytes 2.29376e+06
# HELP go_memstats_sys_bytes Number of bytes obtained from system.
# TYPE go_memstats_sys_bytes gauge
go_memstats_sys_bytes 7.5841792e+07
go_memstats_sys_bytes 7.6103936e+07
# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 23
go_threads 23
# HELP livestatestore_kubernetes_api_requests_total Number of requests sent to kubernetes api server.
# TYPE livestatestore_kubernetes_api_requests_total counter
livestatestore_kubernetes_api_requests_total{code="200",host="34.84.220.84",method="GET"} 110
# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served.
# TYPE promhttp_metric_handler_requests_in_flight gauge
promhttp_metric_handler_requests_in_flight 1
# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
# TYPE promhttp_metric_handler_requests_total counter
promhttp_metric_handler_requests_total{code="200"} 0
promhttp_metric_handler_requests_total{code="500"} 0
promhttp_metric_handler_requests_total{code="503"} 0
31 changes: 19 additions & 12 deletions pkg/model/piped_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@ option go_package = "github.com/pipe-cd/pipe/pkg/model";

import "validate/validate.proto";

message PipedStats {
message PrometheusMetrics {
enum Type {
GAUGE = 0;
COUNTER = 1;
UNKNOWN = 2;
}

string name = 1 [(validate.rules).string.min_len = 1];
Type type = 2 [(validate.rules).enum.defined_only = true];
double value = 3;
message PrometheusMetrics {
enum Type {
GAUGE = 0;
COUNTER = 1;
}
message LabelPair {
string name = 1;
string value = 2;
}
message Sample {
repeated LabelPair labels = 1;
double value = 2;
}

string name = 1 [(validate.rules).string.min_len = 1];
Type type = 2 [(validate.rules).enum.defined_only = true];
repeated Sample samples = 3;
}

message PipedStats {
string version = 1 [(validate.rules).string.min_len = 1];
int64 timestamp = 2 [(validate.rules).int64.gt = 0];
repeated PrometheusMetrics prometheus_stats = 3;
repeated PrometheusMetrics prometheus_metrics = 3;
}