From 38dc475d29ae2445a9a18522b8d557ff44f75c3e Mon Sep 17 00:00:00 2001 From: Ivo Meixner Date: Mon, 31 Aug 2020 12:56:22 +0200 Subject: [PATCH] Limit ALERTS metric to 1000; Count total alerts --- pkg/gather/clusterconfig/clusterconfig.go | 106 +++++++++++++++++++++- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/pkg/gather/clusterconfig/clusterconfig.go b/pkg/gather/clusterconfig/clusterconfig.go index 914cafcb4..ffb9494b9 100644 --- a/pkg/gather/clusterconfig/clusterconfig.go +++ b/pkg/gather/clusterconfig/clusterconfig.go @@ -1,8 +1,11 @@ package clusterconfig import ( + "bytes" "context" "fmt" + "io" + "io/ioutil" "regexp" "sort" "strings" @@ -31,6 +34,14 @@ import ( "github.com/openshift/insights-operator/pkg/record" ) +const ( + // metricsAlertsLinesLimit is the maximal number of lines read from monitoring Prometheus + // 500 KiB of alerts is limit, one alert line has typically 450 bytes => 1137 lines. + // This number has been rounded to 1000 for simplicity. + // Formerly, the `500 * 1024 / 450` expression was used instead. + metricsAlertsLinesLimit = 1000 +) + var ( serializer = scheme.Codecs.LegacyCodec(configv1.SchemeGroupVersion) kubeSerializer = kubescheme.Codecs.LegacyCodec(corev1.SchemeGroupVersion) @@ -38,6 +49,9 @@ var ( // maxEventTimeInterval represents the "only keep events that are maximum 1h old" // TODO: make this dynamic like the reporting window based on configured interval maxEventTimeInterval = 1 * time.Hour + + // lineSep is the line separator used by the alerts metric + lineSep = []byte{'\n'} ) // Gatherer is a driving instance invoking collection of data @@ -70,7 +84,6 @@ func (i *Gatherer) Gather(ctx context.Context, recorder record.Interface) error return nil, nil } data, err := i.metricsClient.Get().AbsPath("federate"). - Param("match[]", "ALERTS"). Param("match[]", "etcd_object_counts"). Param("match[]", "cluster_installer"). DoRaw() @@ -79,9 +92,39 @@ func (i *Gatherer) Gather(ctx context.Context, recorder record.Interface) error klog.Errorf("Unable to retrieve most recent metrics: %v", err) return []record.Record{{Name: "config/metrics", Item: RawByte(fmt.Sprintf("# error: %v\n", err))}}, nil } - return []record.Record{ + + rsp, err := i.metricsClient.Get().AbsPath("federate"). + Param("match[]", "ALERTS"). + Stream() + if err != nil { + // write metrics errors to the file format as a comment + klog.Errorf("Unable to retrieve most recent alerts from metrics: %v", err) + return []record.Record{{Name: "config/metrics", Item: RawByte(fmt.Sprintf("# error: %v\n", err))}}, nil + } + r := NewLineLimitReader(rsp, metricsAlertsLinesLimit) + alerts, err := ioutil.ReadAll(r) + if err != nil && err != io.EOF { + klog.Errorf("Unable to read most recent alerts from metrics: %v", err) + return nil, []error{err} + } + + remainingAlertLines, err := countLines(rsp) + if err != nil && err != io.EOF { + klog.Errorf("Unable to count truncated lines of alerts metric: %v", err) + return nil, []error{err} + } + totalAlertCount := r.GetTotalLinesRead() + remainingAlertLines + + // # ALERTS / + // The total number of alerts will typically be greater than the true number of alerts by 2 + // because the `# TYPE ALERTS untyped` header and the final empty line are counter in. + data = append(data, []byte(fmt.Sprintf("# ALERTS %d/%d\n", totalAlertCount, metricsAlertsLinesLimit))...) + data = append(data, alerts...) + records := []record.Record{ {Name: "config/metrics", Item: RawByte(data)}, - }, nil + } + + return records, nil }, func() ([]record.Record, []error) { config, err := i.client.ClusterOperators().List(metav1.ListOptions{}) @@ -603,3 +646,60 @@ ANONYMIZED } return sb.String() } + +// NewLineLimitReader returns a Reader that reads from `r` but stops with EOF after `n` lines. +func NewLineLimitReader(r io.Reader, n int) *LineLimitedReader { return &LineLimitedReader{r, n, 0} } + +// A LineLimitedReader reads from R but limits the amount of +// data returned to just N lines. Each call to Read +// updates N to reflect the new amount remaining. +// Read returns EOF when N <= 0 or when the underlying R returns EOF. +type LineLimitedReader struct { + reader io.Reader // underlying reader + maxLinesLimit int // max lines remaining + // totalLinesRead is the total number of line separators already read by the underlying reader. + totalLinesRead int +} + +func (l *LineLimitedReader) Read(p []byte) (int, error) { + if l.maxLinesLimit <= 0 { + return 0, io.EOF + } + + rc, err := l.reader.Read(p) + l.totalLinesRead += bytes.Count(p[:rc], lineSep) + + lc := 0 + for { + lineSepIdx := bytes.Index(p[lc:rc], lineSep) + if lineSepIdx == -1 { + return rc, err + } + if l.maxLinesLimit <= 0 { + return lc, io.EOF + } + l.maxLinesLimit-- + lc += lineSepIdx + 1 // skip past the EOF + } +} + +// GetTotalLinesRead return the total number of line separators already read by the underlying reader. +// This includes lines that have been truncated by the `Read` calls after exceeding the line limit. +func (l *LineLimitedReader) GetTotalLinesRead() int { return l.totalLinesRead } + +// countLines reads the remainder of the reader and counts the number of lines. +// +// Inspired by: https://stackoverflow.com/a/24563853/ +func countLines(r io.Reader) (int, error) { + buf := make([]byte, 0x8000) + // Original implementation started from 0, but a file with no line separator + // still contains a single line, so I would say that was an off-by-1 error. + lineCount := 1 + for { + c, err := r.Read(buf) + lineCount += bytes.Count(buf[:c], lineSep) + if err != nil { + return lineCount, err + } + } +}