Skip to content

Commit

Permalink
Merge pull request #169 from natiiix/alerts-limit-backport-4.3
Browse files Browse the repository at this point in the history
[release-4.3] Bug 1874018: Limit collection of ALERTS metric to 1000 lines (~500KiB) to avoid unbearably large archives
  • Loading branch information
openshift-merge-robot committed Sep 26, 2020
2 parents 01ee789 + 38dc475 commit 9210ed0
Showing 1 changed file with 103 additions and 3 deletions.
106 changes: 103 additions & 3 deletions pkg/gather/clusterconfig/clusterconfig.go
@@ -1,8 +1,11 @@
package clusterconfig

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -31,13 +34,24 @@ import (
"github.com/openshift/insights-operator/pkg/record"
)

const (
// metricsAlertsLinesLimit is the maximal number of lines read from monitoring Prometheus
// 500 KiB of alerts is limit, one alert line has typically 450 bytes => 1137 lines.
// This number has been rounded to 1000 for simplicity.
// Formerly, the `500 * 1024 / 450` expression was used instead.
metricsAlertsLinesLimit = 1000
)

var (
serializer = scheme.Codecs.LegacyCodec(configv1.SchemeGroupVersion)
kubeSerializer = kubescheme.Codecs.LegacyCodec(corev1.SchemeGroupVersion)

// maxEventTimeInterval represents the "only keep events that are maximum 1h old"
// TODO: make this dynamic like the reporting window based on configured interval
maxEventTimeInterval = 1 * time.Hour

// lineSep is the line separator used by the alerts metric
lineSep = []byte{'\n'}
)

// Gatherer is a driving instance invoking collection of data
Expand Down Expand Up @@ -70,7 +84,6 @@ func (i *Gatherer) Gather(ctx context.Context, recorder record.Interface) error
return nil, nil
}
data, err := i.metricsClient.Get().AbsPath("federate").
Param("match[]", "ALERTS").
Param("match[]", "etcd_object_counts").
Param("match[]", "cluster_installer").
DoRaw()
Expand All @@ -79,9 +92,39 @@ func (i *Gatherer) Gather(ctx context.Context, recorder record.Interface) error
klog.Errorf("Unable to retrieve most recent metrics: %v", err)
return []record.Record{{Name: "config/metrics", Item: RawByte(fmt.Sprintf("# error: %v\n", err))}}, nil
}
return []record.Record{

rsp, err := i.metricsClient.Get().AbsPath("federate").
Param("match[]", "ALERTS").
Stream()
if err != nil {
// write metrics errors to the file format as a comment
klog.Errorf("Unable to retrieve most recent alerts from metrics: %v", err)
return []record.Record{{Name: "config/metrics", Item: RawByte(fmt.Sprintf("# error: %v\n", err))}}, nil
}
r := NewLineLimitReader(rsp, metricsAlertsLinesLimit)
alerts, err := ioutil.ReadAll(r)
if err != nil && err != io.EOF {
klog.Errorf("Unable to read most recent alerts from metrics: %v", err)
return nil, []error{err}
}

remainingAlertLines, err := countLines(rsp)
if err != nil && err != io.EOF {
klog.Errorf("Unable to count truncated lines of alerts metric: %v", err)
return nil, []error{err}
}
totalAlertCount := r.GetTotalLinesRead() + remainingAlertLines

// # ALERTS <Total Alerts Lines>/<Alerts Line Limit>
// The total number of alerts will typically be greater than the true number of alerts by 2
// because the `# TYPE ALERTS untyped` header and the final empty line are counter in.
data = append(data, []byte(fmt.Sprintf("# ALERTS %d/%d\n", totalAlertCount, metricsAlertsLinesLimit))...)
data = append(data, alerts...)
records := []record.Record{
{Name: "config/metrics", Item: RawByte(data)},
}, nil
}

return records, nil
},
func() ([]record.Record, []error) {
config, err := i.client.ClusterOperators().List(metav1.ListOptions{})
Expand Down Expand Up @@ -603,3 +646,60 @@ ANONYMIZED
}
return sb.String()
}

// NewLineLimitReader returns a Reader that reads from `r` but stops with EOF after `n` lines.
func NewLineLimitReader(r io.Reader, n int) *LineLimitedReader { return &LineLimitedReader{r, n, 0} }

// A LineLimitedReader reads from R but limits the amount of
// data returned to just N lines. Each call to Read
// updates N to reflect the new amount remaining.
// Read returns EOF when N <= 0 or when the underlying R returns EOF.
type LineLimitedReader struct {
reader io.Reader // underlying reader
maxLinesLimit int // max lines remaining
// totalLinesRead is the total number of line separators already read by the underlying reader.
totalLinesRead int
}

func (l *LineLimitedReader) Read(p []byte) (int, error) {
if l.maxLinesLimit <= 0 {
return 0, io.EOF
}

rc, err := l.reader.Read(p)
l.totalLinesRead += bytes.Count(p[:rc], lineSep)

lc := 0
for {
lineSepIdx := bytes.Index(p[lc:rc], lineSep)
if lineSepIdx == -1 {
return rc, err
}
if l.maxLinesLimit <= 0 {
return lc, io.EOF
}
l.maxLinesLimit--
lc += lineSepIdx + 1 // skip past the EOF
}
}

// GetTotalLinesRead return the total number of line separators already read by the underlying reader.
// This includes lines that have been truncated by the `Read` calls after exceeding the line limit.
func (l *LineLimitedReader) GetTotalLinesRead() int { return l.totalLinesRead }

// countLines reads the remainder of the reader and counts the number of lines.
//
// Inspired by: https://stackoverflow.com/a/24563853/
func countLines(r io.Reader) (int, error) {
buf := make([]byte, 0x8000)
// Original implementation started from 0, but a file with no line separator
// still contains a single line, so I would say that was an off-by-1 error.
lineCount := 1
for {
c, err := r.Read(buf)
lineCount += bytes.Count(buf[:c], lineSep)
if err != nil {
return lineCount, err
}
}
}

0 comments on commit 9210ed0

Please sign in to comment.