Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new metrics indexing behavior #2

Merged
merged 1 commit into from Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions Makefile
Expand Up @@ -10,6 +10,17 @@ deploy: build
--trigger-resource origin-ci-test --trigger-event google.storage.object.finalize
.PHONY: deploy-functions

deploy-service-account:
gcloud iam service-accounts create search-index-gcs-writer \
--display-name search-index-gcs-writer \
--description 'Allows ci-search-functions to update elements in the origin-ci-bucket that they own' \
--project openshift-gce-devel
gsutil -m iam ch \
serviceAccount:search-index-gcs-writer@openshift-gce-devel.iam.gserviceaccount.com:objectCreator \
serviceAccount:search-index-gcs-writer@openshift-gce-devel.iam.gserviceaccount.com:objectViewer \
gs://origin-ci-test
.PHONY: deploy-service-account

delete:
gcloud functions delete IndexJobs \
--project openshift-gce-devel
Expand Down
280 changes: 280 additions & 0 deletions functions.go
Expand Up @@ -2,9 +2,11 @@
package cisearch

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
Expand Down Expand Up @@ -143,6 +145,139 @@ func IndexJobs(ctx context.Context, e GCSEvent) error {
return fmt.Errorf("failed to link %s to %s: %v", indexPath, u, err)
}
log.Printf("Indexed job %s with state %s to gs://%s/%s", u, state, e.Bucket, indexPath)

case "job_metrics.json":
// only process job metrics that appear to be in a smaller set of logs
parts := strings.Split(e.Name, "/")
if len(parts) < 4 {
return nil
}
var u, build, job string
switch {
case parts[0] == "logs":
u = (&url.URL{
Scheme: "gs",
Host: e.Bucket,
Path: path.Join(parts[:3]...),
}).String()
job = parts[1]
build = parts[2]
switch {
case strings.HasPrefix(job, "periodic-ci-openshift-release-"),
strings.HasPrefix(job, "release-openshift-"):
default:
// log.Printf("Skip job that is not a release job: %s", e.Name)
return nil
}
default:
//log.Printf("Skip job that is not postsubmit/periodic: %s", e.Name)
return nil
}

client, err := storage.NewClient(ctx, option.WithScopes(storage.ScopeReadWrite))
if err != nil {
return err
}

// read the raw output and transform into the consolidated form
// {
// "<name>[{<label>="<value>"]": {"timestamp":<int64>,"value":"<float64 string>"},
// ...
// }
r, err := client.Bucket(e.Bucket).Object(e.Name).NewReader(ctx)
if err != nil {
return err
}
metrics := make(map[string]PrometheusResult)
d := json.NewDecoder(r)
var rows int
for err = d.Decode(&metrics); err == nil; err = d.Decode(&metrics) {
rows++
}
if err != nil && err != io.EOF {
return fmt.Errorf("failed to decode metric on line %d: %v", rows+1, err)
}

outputMetrics := make(map[string]OutputMetric, len(metrics))
for name, v := range metrics {
if v.Status != "success" {
continue
}
if v.Data.ResultType != "vector" {
continue
}
if len(v.Data.Result) == 0 {
continue
}
if len(v.Data.Result) == 1 && len(v.Data.Result[0].Metric) == 0 {
outputMetrics[name] = OutputMetric{
Value: v.Data.Result[0].Value.Value,
Timestamp: v.Data.Result[0].Value.Timestamp,
}
//log.Printf("%s %s @ %d", name, v.Data.Result[0].Value.Value, v.Data.Result[0].Value.Timestamp)
continue
}
var label string
for i, result := range v.Data.Result {
if len(result.Metric) != 1 {
log.Printf("warn: Dropped result %d from %s because metric labels were must have length 1 instead of %d", i, name, len(result.Metric))
continue
}
if len(label) == 0 {
for k := range result.Metric {
label = k
break
}
if len(label) == 0 {
continue
}
}
value, ok := result.Metric[label]
if !ok {
log.Printf("warn: Dropped result %d from %s because no value for metric %s", i, name, label)
continue
}
outputMetrics[fmt.Sprintf("%s{%s=%q}", name, label, value)] = OutputMetric{
Value: v.Data.Result[0].Value.Value,
Timestamp: v.Data.Result[0].Value.Timestamp,
}
//log.Printf("%s{%s=%q} %s @ %d", name, label, value, result.Value.Value, result.Value.Timestamp)
}
}

duration, ok := outputMetrics["job:duration:total:seconds"]
if !ok {
return fmt.Errorf("job not indexed, does not have metric %q", "job:duration:total:seconds")
}

data, err := json.Marshal(outputMetrics)
if err != nil {
return fmt.Errorf("unable to marshal output metrics: %v", err)
}

// build index components
finishedAt := time.Unix(duration.Timestamp, 0)
key := finishedAt.UTC().Format(time.RFC3339)
indexPath := path.Join("index", "job-metrics", key, job, build)

// write the link with the metadata contents
w := client.
Bucket(e.Bucket).
Object(indexPath).
If(storage.Conditions{DoesNotExist: true}).
NewWriter(ctx)
w.ObjectAttrs.Metadata = map[string]string{
"link": u,
}
if _, err := w.Write(data); err != nil {
defer w.Close()
return fmt.Errorf("failed to write metrics %s to %s: %v", indexPath, u, err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("failed to write metrics %s to %s: %v", indexPath, u, err)
}

log.Printf("Indexed %d job metrics %s in %d bytes to gs://%s/%s (link to %s)", len(metrics), e.Name, len(data), e.Bucket, indexPath, u)
}
return nil
}
Expand All @@ -152,3 +287,148 @@ type JobResult struct {
CompletedAt int64 `json:"completed_at"`
Link string `json:"link"`
}

type OutputMetric struct {
Timestamp int64 `json:"timestamp"`
Value string `json:"value"`
}

type PrometheusResult struct {
Status string `json:"status"`
Data PrometheusData `json:"data"`
}

type PrometheusData struct {
ResultType string `json:"resultType"`
Result []PrometheusMetric `json:"result"`
}

type PrometheusMetric struct {
Metric PrometheusLabels `json:"metric"`
Value PrometheusValue `json:"value"`
}

type PrometheusValue struct {
Timestamp int64
Value string
}

// PrometheusLabels avoids deserialization allocations
type PrometheusLabels map[string]string

var _ json.Marshaler = PrometheusLabels(nil)
var _ json.Unmarshaler = &PrometheusLabels{}

func (l PrometheusLabels) MarshalJSON() ([]byte, error) {
if len(l) == 0 {
return []byte(`{}`), nil
}
return json.Marshal(map[string]string(l))
}

func (l *PrometheusLabels) UnmarshalJSON(data []byte) error {
switch {
case len(data) == 4 && bytes.Equal(data, []byte("null")):
return nil
case len(data) == 2 && bytes.Equal(data, []byte("{}")):
if l == nil {
return nil
}
for k := range *l {
delete(*l, k)
}
return nil
}
if l == nil {
*l = make(map[string]string)
}
var m *map[string]string = (*map[string]string)(l)
return json.Unmarshal(data, m)
}

type parseState int

const (
startState parseState = iota
timestampState
stringNumberState
closeState
doneState
)

func (l *PrometheusValue) UnmarshalJSON(data []byte) error {
switch {
case len(data) == 4 && bytes.Equal(data, []byte("null")):
return nil
case len(data) == 2 && bytes.Equal(data, []byte("[]")):
return fmt.Errorf("unexpected value")
}
var state parseState = startState

data = bytes.TrimSpace(data)
for len(data) > 0 {
switch data[0] {
case '[':
switch state {
case startState:
if l == nil {
*l = PrometheusValue{}
}
data = bytes.TrimSpace(data[1:])
state = timestampState
default:
return fmt.Errorf("unexpected character %c in state %d", data[0], state)
}
case ']':
switch state {
case closeState:
data = bytes.TrimSpace(data[1:])
state = doneState
default:
return fmt.Errorf("unexpected character %c in state %d", data[0], state)
}
default:
switch state {
case timestampState:
pos := bytes.Index(data, []byte(","))
if pos == -1 {
return fmt.Errorf("expected [<timestamp int>, \"<number string>\"], could not find comma")
}
timestampBytes := bytes.TrimSpace(data[:pos])
var err error
l.Timestamp, err = strconv.ParseInt(string(timestampBytes), 10, 64)
if err != nil {
return fmt.Errorf("expected [<timestamp int>, \"<number string>\"], timestamp was not an int64: %v", err)
}
data = data[pos+1:]
state = stringNumberState
case stringNumberState:
pos := bytes.Index(data, []byte("]"))
if pos == -1 {
return fmt.Errorf("expected [<timestamp int>, \"<number string>\"], could not find ending bracket in %q", string(data))
}
numberBytes := bytes.TrimSpace(data[:pos])
if len(numberBytes) < 2 || numberBytes[0] != '"' || numberBytes[len(numberBytes)-1] != '"' {
return fmt.Errorf("expected [<timestamp int>, \"<number string>\"], could not find number string")
}
b := numberBytes[1 : len(numberBytes)-1]
if len(b) != len(bytes.TrimSpace(b)) {
return fmt.Errorf("expected [<timestamp int>, \"<number string>\"], number was not a valid float64: whitespace in string")
}
s := string(b)
if _, err := strconv.ParseFloat(s, 64); err != nil {
return fmt.Errorf("expected [<timestamp int>, \"<number string>\"], number was not a valid float64: %v", err)
}
l.Value = s
data = data[pos:]
state = closeState
default:
return fmt.Errorf("unexpected character %c in state %d", data[0], state)
}
}
}
if state != doneState {
return fmt.Errorf("expected [<timestamp int>, \"<number string>\"]")
}
return nil
}