Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.5] Bug 1866776: Limit collection of ALERTS metric to 1000 lines (~500KiB) to avoid unbearably large archives #153

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 99 additions & 2 deletions pkg/gather/clusterconfig/clusterconfig.go
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/pem"
"fmt"
"io"
"io/ioutil"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -36,6 +37,14 @@ import (
"github.com/openshift/insights-operator/pkg/record"
)

const (
// metricsAlertsLinesLimit is the maximal number of lines read from monitoring Prometheus
// 500 KiB of alerts is limit, one alert line has typically 450 bytes => 1137 lines.
// This number has been rounded to 1000 for simplicity.
// Formerly, the `500 * 1024 / 450` expression was used instead.
metricsAlertsLinesLimit = 1000
)

var (
openshiftSerializer = openshiftscheme.Codecs.LegacyCodec(configv1.SchemeGroupVersion)
kubeSerializer = kubescheme.Codecs.LegacyCodec(corev1.SchemeGroupVersion)
Expand All @@ -49,6 +58,9 @@ var (

// logTailLines defines a number lines to keep when fetching pod logs
logTailLines = int64(100)

// lineSep is the line separator used by the alerts metric
lineSep = []byte{'\n'}
)

func init() {
Expand Down Expand Up @@ -107,9 +119,10 @@ func (i *Gatherer) Gather(ctx context.Context, recorder record.Interface) error
//
// The GET REST query to URL /federate
// Gathered metrics:
// ALERTS
// etcd_object_counts
// cluster_installer
// namespace CPU and memory usage
// followed by at most 1000 lines of ALERTS metric
//
// Location in archive: config/metrics/
// See: docs/insights-archive-sample/config/metrics
Expand All @@ -119,7 +132,6 @@ func GatherMostRecentMetrics(i *Gatherer) func() ([]record.Record, []error) {
return nil, nil
}
data, err := i.metricsClient.Get().AbsPath("federate").
Param("match[]", "ALERTS").
Param("match[]", "etcd_object_counts").
Param("match[]", "cluster_installer").
DoRaw()
Expand All @@ -128,6 +140,34 @@ func GatherMostRecentMetrics(i *Gatherer) func() ([]record.Record, []error) {
klog.Errorf("Unable to retrieve most recent metrics: %v", err)
return []record.Record{{Name: "config/metrics", Item: RawByte(fmt.Sprintf("# error: %v\n", err))}}, nil
}

rsp, err := i.metricsClient.Get().AbsPath("federate").
Param("match[]", "ALERTS").
Stream()
if err != nil {
// write metrics errors to the file format as a comment
klog.Errorf("Unable to retrieve most recent alerts from metrics: %v", err)
return []record.Record{{Name: "config/metrics", Item: RawByte(fmt.Sprintf("# error: %v\n", err))}}, nil
}
r := NewLineLimitReader(rsp, metricsAlertsLinesLimit)
alerts, err := ioutil.ReadAll(r)
if err != nil && err != io.EOF {
klog.Errorf("Unable to read most recent alerts from metrics: %v", err)
return nil, []error{err}
}

remainingAlertLines, err := countLines(rsp)
if err != nil && err != io.EOF {
klog.Errorf("Unable to count truncated lines of alerts metric: %v", err)
return nil, []error{err}
}
totalAlertCount := r.GetTotalLinesRead() + remainingAlertLines

// # ALERTS <Total Alerts Lines>/<Alerts Line Limit>
// The total number of alerts will typically be greater than the true number of alerts by 2
// because the `# TYPE ALERTS untyped` header and the final empty line are counter in.
data = append(data, []byte(fmt.Sprintf("# ALERTS %d/%d\n", totalAlertCount, metricsAlertsLinesLimit))...)
data = append(data, alerts...)
records := []record.Record{
{Name: "config/metrics", Item: RawByte(data)},
}
Expand Down Expand Up @@ -956,3 +996,60 @@ ANONYMIZED
}
return sb.String()
}

// NewLineLimitReader returns a Reader that reads from `r` but stops with EOF after `n` lines.
func NewLineLimitReader(r io.Reader, n int) *LineLimitedReader { return &LineLimitedReader{r, n, 0} }

// A LineLimitedReader reads from R but limits the amount of
// data returned to just N lines. Each call to Read
// updates N to reflect the new amount remaining.
// Read returns EOF when N <= 0 or when the underlying R returns EOF.
type LineLimitedReader struct {
reader io.Reader // underlying reader
maxLinesLimit int // max lines remaining
// totalLinesRead is the total number of line separators already read by the underlying reader.
totalLinesRead int
}

func (l *LineLimitedReader) Read(p []byte) (int, error) {
if l.maxLinesLimit <= 0 {
return 0, io.EOF
}

rc, err := l.reader.Read(p)
l.totalLinesRead += bytes.Count(p[:rc], lineSep)

lc := 0
for {
lineSepIdx := bytes.Index(p[lc:rc], lineSep)
if lineSepIdx == -1 {
return rc, err
}
if l.maxLinesLimit <= 0 {
return lc, io.EOF
}
l.maxLinesLimit--
lc += lineSepIdx + 1 // skip past the EOF
}
}

// GetTotalLinesRead return the total number of line separators already read by the underlying reader.
// This includes lines that have been truncated by the `Read` calls after exceeding the line limit.
func (l *LineLimitedReader) GetTotalLinesRead() int { return l.totalLinesRead }

// countLines reads the remainder of the reader and counts the number of lines.
//
// Inspired by: https://stackoverflow.com/a/24563853/
func countLines(r io.Reader) (int, error) {
buf := make([]byte, 0x8000)
// Original implementation started from 0, but a file with no line separator
// still contains a single line, so I would say that was an off-by-1 error.
lineCount := 1
for {
c, err := r.Read(buf)
lineCount += bytes.Count(buf[:c], lineSep)
if err != nil {
return lineCount, err
}
}
}
2 changes: 1 addition & 1 deletion pkg/gather/clusterconfig/clusterconfig_test.go
Expand Up @@ -314,7 +314,7 @@ func ExampleGatherMostRecentMetrics_Test() {
}
fmt.Print(string(b))
// Output:
// [{"Name":"config/metrics","Captured":"0001-01-01T00:00:00Z","Fingerprint":"","Item":"SGVsbG8sIGNsaWVudAo="}]
// [{"Name":"config/metrics","Captured":"0001-01-01T00:00:00Z","Fingerprint":"","Item":"SGVsbG8sIGNsaWVudAojIEFMRVJUUyAyLzEwMDAKSGVsbG8sIGNsaWVudAo="}]
}

func ExampleGatherClusterOperators_Test() {
Expand Down