Skip to content

Commit

Permalink
WIP: Initial support for CEL
Browse files Browse the repository at this point in the history
This adds initial support for queries using the common expression
language (CEL).

Signed-off-by: Manuel Rüger <manuel@rueg.eu>
  • Loading branch information
mrueg committed Nov 16, 2023
1 parent 15d4535 commit 5a26a4a
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 15 deletions.
13 changes: 12 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// Metric contains values that define a metric
type Metric struct {
Name string
Engine EngineType
Path string
Labels map[string]string
Type ScrapeType
Expand All @@ -44,7 +45,14 @@ type ValueType string
const (
ValueTypeGauge ValueType = "gauge"
ValueTypeCounter ValueType = "counter"
ValueTypeUntyped ValueType = "untyped"
ValueTypeUntyped ValueType = "untyped" // default
)

type EngineType string

const (
EngineTypeJSONPath EngineType = "jsonpath" // default
EngineTypeCEL EngineType = "cel"
)

// Config contains multiple modules.
Expand Down Expand Up @@ -89,6 +97,9 @@ func LoadConfig(configPath string) (Config, error) {
if module.Metrics[i].ValueType == "" {
module.Metrics[i].ValueType = ValueTypeUntyped
}
if module.Metrics[i].Engine == "" {
module.Metrics[i].Engine = EngineTypeJSONPath
}
}
}

Expand Down
40 changes: 38 additions & 2 deletions examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ modules:
help: Example of a top-level global value scrape in the json
labels:
environment: beta # static label
location: 'planet-{.location}' # dynamic label
location: 'planet-{ .location }' # dynamic label

- name: example_cel_global_value
engine: cel
path: '.counter'
help: Example of a top-level global value scrape in the json using cel
valuetype: 'gauge'
labels:
environment: "\"beta\"" # static label. Quotes need to be escaped for CEL
location: "\"planet-\"+.location" # dynamic label. Quotes need to be escaped for CEL

- name: example_timestamped_value
type: object
path: '{ .values[?(@.state == "INACTIVE")] }'
Expand All @@ -18,7 +28,19 @@ modules:
labels:
environment: beta # static label
values:
count: '{.count}' # dynamic value
count: '{ .count }' # dynamic value

- name: example_cel_timestamped_value
type: object
engine: cel
path: ".values.filter(i, i.state == \"INACTIVE\")"
epochTimestamp: '.timestamp'
help: Example of a timestamped value scrape in the json
labels:
environment: "\"beta\"" # static label
values:
count: '.count' # dynamic value

- name: example_value
type: object
help: Example of sub-level value scrapes from a json
Expand All @@ -31,6 +53,20 @@ modules:
count: '{.count}' # dynamic value
boolean: '{.some_boolean}'

- name: example_cel_value
type: object
engine: cel
help: Example of sub-level value scrapes from a json
path: ".values.filter(i, i.state == \"ACTIVE\")"
labels:
environment: "\"beta\"" # static label
id: '.id' # dynamic label
values:
active: 1 # static value
count: '.count' # dynamic value
boolean: '.some_boolean'


animals:
metrics:
- name: animal
Expand Down
116 changes: 106 additions & 10 deletions exporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@ package exporter
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/cel-go/cel"
"github.com/google/cel-go/common/types/ref"
"github.com/prometheus-community/json_exporter/config"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
structpb "google.golang.org/protobuf/types/known/structpb"
"k8s.io/client-go/util/jsonpath"
)

Expand All @@ -34,6 +41,7 @@ type JSONMetricCollector struct {
type JSONMetric struct {
Desc *prometheus.Desc
Type config.ScrapeType
EngineType config.EngineType
KeyJSONPath string
ValueJSONPath string
LabelsJSONPaths []string
Expand All @@ -51,7 +59,8 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) {
for _, m := range mc.JSONMetrics {
switch m.Type {
case config.ValueScrape:
value, err := extractValue(mc.Logger, mc.Data, m.KeyJSONPath, false)
level.Debug(mc.Logger).Log("msg", "Extracting value for metric", "path", m.KeyJSONPath, "metric", m.Desc)
value, err := extractValue(mc.Logger, m.EngineType, mc.Data, m.KeyJSONPath, false)
if err != nil {
level.Error(mc.Logger).Log("msg", "Failed to extract value for metric", "path", m.KeyJSONPath, "err", err, "metric", m.Desc)
continue
Expand All @@ -62,7 +71,7 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) {
m.Desc,
m.ValueType,
floatValue,
extractLabels(mc.Logger, mc.Data, m.LabelsJSONPaths)...,
extractLabels(mc.Logger, m.EngineType, mc.Data, m.LabelsJSONPaths)...,
)
ch <- timestampMetric(mc.Logger, m, mc.Data, metric)
} else {
Expand All @@ -71,7 +80,8 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) {
}

case config.ObjectScrape:
values, err := extractValue(mc.Logger, mc.Data, m.KeyJSONPath, true)
level.Debug(mc.Logger).Log("msg", "Extracting object for metric", "path", m.KeyJSONPath, "metric", m.Desc)
values, err := extractValue(mc.Logger, m.EngineType, mc.Data, m.KeyJSONPath, true)
if err != nil {
level.Error(mc.Logger).Log("msg", "Failed to extract json objects for metric", "err", err, "metric", m.Desc)
continue
Expand All @@ -85,7 +95,7 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) {
level.Error(mc.Logger).Log("msg", "Failed to marshal data to json", "path", m.ValueJSONPath, "err", err, "metric", m.Desc, "data", data)
continue
}
value, err := extractValue(mc.Logger, jdata, m.ValueJSONPath, false)
value, err := extractValue(mc.Logger, m.EngineType, jdata, m.ValueJSONPath, false)
if err != nil {
level.Error(mc.Logger).Log("msg", "Failed to extract value for metric", "path", m.ValueJSONPath, "err", err, "metric", m.Desc)
continue
Expand All @@ -96,7 +106,7 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) {
m.Desc,
m.ValueType,
floatValue,
extractLabels(mc.Logger, jdata, m.LabelsJSONPaths)...,
extractLabels(mc.Logger, m.EngineType, jdata, m.LabelsJSONPaths)...,
)
ch <- timestampMetric(mc.Logger, m, jdata, metric)
} else {
Expand All @@ -105,7 +115,7 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) {
}
}
} else {
level.Error(mc.Logger).Log("msg", "Failed to convert extracted objects to json", "err", err, "metric", m.Desc)
level.Error(mc.Logger).Log("msg", "Failed to convert extracted objects to json", "value", values, "err", err, "metric", m.Desc)
continue
}
default:
Expand All @@ -115,8 +125,19 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) {
}
}

func extractValue(logger log.Logger, engine config.EngineType, data []byte, path string, enableJSONOutput bool) (string, error) {
switch engine {
case config.EngineTypeJSONPath:
return extractValueJSONPath(logger, data, path, enableJSONOutput)
case config.EngineTypeCEL:
return extractValueCEL(logger, data, path, enableJSONOutput)
default:
return "", fmt.Errorf("Unknown engine type: %s", engine)
}
}

// Returns the last matching value at the given json path
func extractValue(logger log.Logger, data []byte, path string, enableJSONOutput bool) (string, error) {
func extractValueJSONPath(logger log.Logger, data []byte, path string, enableJSONOutput bool) (string, error) {
var jsonData interface{}
buf := new(bytes.Buffer)

Expand Down Expand Up @@ -148,11 +169,71 @@ func extractValue(logger log.Logger, data []byte, path string, enableJSONOutput
return buf.String(), nil
}

// Returns the last matching value at the given json path
func extractValueCEL(logger log.Logger, data []byte, expression string, enableJSONOutput bool) (string, error) {

var jsonData map[string]any

err := json.Unmarshal(data, &jsonData)
if err != nil {
level.Error(logger).Log("msg", "Failed to unmarshal data to json", "err", err, "data", data)
return "", err
}

inputVars := make([]cel.EnvOption, 0, len(jsonData))
for k := range jsonData {
inputVars = append(inputVars, cel.Variable(k, cel.DynType))
}

env, err := cel.NewEnv(inputVars...)

if err != nil {
level.Error(logger).Log("msg", "Failed to set up CEL environment", "err", err, "data", data)
return "", err
}

ast, issues := env.Compile(expression)
if issues != nil && issues.Err() != nil {
level.Error(logger).Log("CEL type-check error", issues.Err(), "expression", expression)
return "", err
}
prg, err := env.Program(ast)
if err != nil {
level.Error(logger).Log("CEL program construction error", err)
return "", err
}

out, _, err := prg.Eval(jsonData)
if err != nil {
level.Error(logger).Log("msg", "Failed to evaluate cel query", "err", err, "expression", expression, "data", jsonData)
return "", err
}

// Since we are finally going to extract only float64, unquote if necessary

//res, err := jsonpath.UnquoteExtend(fmt.Sprintf("%g", out))
//if err == nil {
// level.Error(logger).Log("msg","Triggered")
// return res, nil
//}
level.Error(logger).Log("msg", "Triggered later", "val", out)
if enableJSONOutput {
res, err := valueToJSON(out)
if err != nil {
return "", err
} else {

Check warning on line 224 in exporter/collector.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
return res, nil
}
}

return fmt.Sprintf("%v", out), nil
}

// Returns the list of labels created from the list of provided json paths
func extractLabels(logger log.Logger, data []byte, paths []string) []string {
func extractLabels(logger log.Logger, engine config.EngineType, data []byte, paths []string) []string {
labels := make([]string, len(paths))
for i, path := range paths {
if result, err := extractValue(logger, data, path, false); err == nil {
if result, err := extractValue(logger, engine, data, path, false); err == nil {
labels[i] = result
} else {
level.Error(logger).Log("msg", "Failed to extract label value", "err", err, "path", path, "data", data)
Expand All @@ -165,7 +246,7 @@ func timestampMetric(logger log.Logger, m JSONMetric, data []byte, pm prometheus
if m.EpochTimestampJSONPath == "" {
return pm
}
ts, err := extractValue(logger, data, m.EpochTimestampJSONPath, false)
ts, err := extractValue(logger, m.EngineType, data, m.EpochTimestampJSONPath, false)
if err != nil {
level.Error(logger).Log("msg", "Failed to extract timestamp for metric", "path", m.KeyJSONPath, "err", err, "metric", m.Desc)
return pm
Expand All @@ -178,3 +259,18 @@ func timestampMetric(logger log.Logger, m JSONMetric, data []byte, pm prometheus
timestamp := time.UnixMilli(epochTime)
return prometheus.NewMetricWithTimestamp(timestamp, pm)
}

// valueToJSON converts the CEL type to a protobuf JSON representation and
// marshals the result to a string.
func valueToJSON(val ref.Val) (string, error) {
v, err := val.ConvertToNative(reflect.TypeOf(&structpb.Value{}))
if err != nil {
return "", err
}
marshaller := protojson.MarshalOptions{Indent: " "}
bytes, err := marshaller.Marshal(v.(proto.Message))
if err != nil {
return "", err
}
return string(bytes), err
}
4 changes: 3 additions & 1 deletion exporter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func CreateMetricsList(c config.Module) ([]JSONMetric, error) {
variableLabels,
nil,
),
EngineType: metric.Engine,
KeyJSONPath: metric.Path,
LabelsJSONPaths: variableLabelsValues,
ValueType: valueType,
Expand All @@ -125,6 +126,7 @@ func CreateMetricsList(c config.Module) ([]JSONMetric, error) {
variableLabels,
nil,
),
EngineType: metric.Engine,
KeyJSONPath: metric.Path,
ValueJSONPath: valuePath,
LabelsJSONPaths: variableLabelsValues,
Expand All @@ -134,7 +136,7 @@ func CreateMetricsList(c config.Module) ([]JSONMetric, error) {
metrics = append(metrics, jsonMetric)
}
default:
return nil, fmt.Errorf("Unknown metric type: '%s', for metric: '%s'", metric.Type, metric.Name)
return nil, fmt.Errorf("unknown metric type: '%s', for metric: '%s'", metric.Type, metric.Name)
}
}
return metrics, nil
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/alecthomas/kingpin/v2 v2.3.2
github.com/go-kit/log v0.2.1
github.com/google/cel-go v0.18.2
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.45.0
github.com/prometheus/exporter-toolkit v0.10.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/client-go v0.28.3
)
Expand All @@ -17,6 +19,7 @@ require (
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver/v3 v3.2.0 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
Expand All @@ -35,13 +38,16 @@ require (
github.com/prometheus/procfs v0.11.1 // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.31.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect
)

0 comments on commit 5a26a4a

Please sign in to comment.