Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parsers.openmetrics): Add parser for OpenMetrics format #15298

Merged
merged 17 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Protocol, JSON format, or Apache Avro format.
- [JSON v2](/plugins/parsers/json_v2)
- [Logfmt](/plugins/parsers/logfmt)
- [Nagios](/plugins/parsers/nagios)
- [OpenMetrics](/plugins/parsers/openmetrics)
- [OpenTSDB](/plugins/parsers/opentsdb)
- [Parquet](/plugins/parsers/parquet)
- [Prometheus](/plugins/parsers/prometheus)
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## If set to true, the gather time will be used.
# ignore_timestamp = false

## Override content-type of the returned message
## Available options are for prometheus:
## text, protobuf-delimiter, protobuf-compact, protobuf-text,
## and for openmetrics:
## openmetrics-text, openmetrics-protobuf
## By default the content-type of the response is used.
# content_type_override = ""

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

Expand Down
173 changes: 95 additions & 78 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/prometheus/common/expfmt"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/influxdata/telegraf/models"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/openmetrics"
parser "github.com/influxdata/telegraf/plugins/parsers/prometheus"
)

Expand All @@ -46,94 +48,71 @@ const (
type PodID string

type Prometheus struct {
// An array of urls to scrape metrics from.
URLs []string `toml:"urls"`

// An array of Kubernetes services to scrape metrics from.
KubernetesServices []string

// Location of kubernetes config file
KubeConfig string

// Label Selector/s for Kubernetes
KubernetesLabelSelector string `toml:"kubernetes_label_selector"`

// Field Selector/s for Kubernetes
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`

// Consul SD configuration
URLs []string `toml:"urls"`
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
Username string `toml:"username"`
Password string `toml:"password"`
HTTPHeaders map[string]string `toml:"http_headers"`
ContentLengthLimit config.Size `toml:"content_length_limit"`
ContentTypeOverride string `toml:"content_type_override"`
EnableRequestMetrics bool `toml:"enable_request_metrics"`
MetricVersion int `toml:"metric_version"`
URLTag string `toml:"url_tag"`
IgnoreTimestamp bool `toml:"ignore_timestamp"`

// Kubernetes service discovery
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodScrapeScope string `toml:"pod_scrape_scope"`
NodeIP string `toml:"node_ip"`
PodScrapeInterval int `toml:"pod_scrape_interval"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
PodNamespaceLabelName string `toml:"pod_namespace_label_name"`
KubernetesServices []string `toml:"kubernetes_services"`
KubeConfig string `toml:"kube_config"`
KubernetesLabelSelector string `toml:"kubernetes_label_selector"`
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`
MonitorKubernetesPodsMethod MonitorMethod `toml:"monitor_kubernetes_pods_method"`
MonitorKubernetesPodsScheme string `toml:"monitor_kubernetes_pods_scheme"`
MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"`
MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"`
NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"`
NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"`
PodAnnotationInclude []string `toml:"pod_annotation_include"`
PodAnnotationExclude []string `toml:"pod_annotation_exclude"`
PodLabelInclude []string `toml:"pod_label_include"`
PodLabelExclude []string `toml:"pod_label_exclude"`
CacheRefreshInterval int `toml:"cache_refresh_interval"`

// Consul discovery
ConsulConfig ConsulConfig `toml:"consul"`

// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`

// Basic authentication credentials
Username string `toml:"username"`
Password string `toml:"password"`

HTTPHeaders map[string]string `toml:"http_headers"`

ContentLengthLimit config.Size `toml:"content_length_limit"`
EnableRequestMetrics bool `toml:"enable_request_metrics"`

MetricVersion int `toml:"metric_version"`

URLTag string `toml:"url_tag"`

IgnoreTimestamp bool `toml:"ignore_timestamp"`

Log telegraf.Logger

Log telegraf.Logger `toml:"-"`
httpconfig.HTTPClientConfig

client *http.Client
headers map[string]string

nsStore cache.Store
client *http.Client
headers map[string]string
contentType string

nsStore cache.Store
nsAnnotationPass []models.TagFilter
nsAnnotationDrop []models.TagFilter

// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodScrapeScope string `toml:"pod_scrape_scope"`
NodeIP string `toml:"node_ip"`
PodScrapeInterval int `toml:"pod_scrape_interval"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
PodNamespaceLabelName string `toml:"pod_namespace_label_name"`
lock sync.Mutex
kubernetesPods map[PodID]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
lock sync.Mutex
kubernetesPods map[PodID]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup

// Only for monitor_kubernetes_pods=true and pod_scrape_scope="node"
podLabelSelector labels.Selector
podFieldSelector fields.Selector
isNodeScrapeScope bool

MonitorKubernetesPodsMethod MonitorMethod `toml:"monitor_kubernetes_pods_method"`
MonitorKubernetesPodsScheme string `toml:"monitor_kubernetes_pods_scheme"`
MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"`
MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"`

NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"`
NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"`

PodAnnotationInclude []string `toml:"pod_annotation_include"`
PodAnnotationExclude []string `toml:"pod_annotation_exclude"`

PodLabelInclude []string `toml:"pod_label_include"`
PodLabelExclude []string `toml:"pod_label_exclude"`

podLabelSelector labels.Selector
podFieldSelector fields.Selector
isNodeScrapeScope bool
podAnnotationIncludeFilter filter.Filter
podAnnotationExcludeFilter filter.Filter
podLabelIncludeFilter filter.Filter
podLabelExcludeFilter filter.Filter

// Only for monitor_kubernetes_pods=true
CacheRefreshInterval int `toml:"cache_refresh_interval"`

// List of consul services to scrape
consulServices map[string]URLAndAddress
}
Expand All @@ -143,6 +122,29 @@ func (*Prometheus) SampleConfig() string {
}

func (p *Prometheus) Init() error {
// Setup content-type override if requested
switch p.ContentTypeOverride {
case "": // No override
case "text":
p.contentType = string(expfmt.NewFormat(expfmt.TypeTextPlain))
case "protobuf-delimiter":
p.contentType = string(expfmt.NewFormat(expfmt.TypeProtoDelim))
case "protobuf-compact":
p.contentType = string(expfmt.NewFormat(expfmt.TypeProtoCompact))
case "protobuf-text":
p.contentType = string(expfmt.NewFormat(expfmt.TypeProtoText))
case "openmetrics-text":
f, err := expfmt.NewOpenMetricsFormat(expfmt.OpenMetricsVersion_1_0_0)
if err != nil {
return err
}
p.contentType = string(f)
case "openmetrics-protobuf":
p.contentType = "application/openmetrics-protobuf;version=1.0.0"
default:
return fmt.Errorf("invalid 'content_type_override' setting %q", p.ContentTypeOverride)
}

// Config processing for node scrape scope for monitor_kubernetes_pods
p.isNodeScrapeScope = strings.EqualFold(p.PodScrapeScope, "node")
if p.isNodeScrapeScope {
Expand Down Expand Up @@ -491,14 +493,29 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s
return requestFields, tags, fmt.Errorf("error reading body: %w", err)
}
}

requestFields["content_length"] = len(body)

// Override the response format if the user requested it
if p.contentType != "" {
resp.Header.Set("Content-Type", p.contentType)
}

// Parse the metrics
metricParser := parser.Parser{
Header: resp.Header,
MetricVersion: p.MetricVersion,
IgnoreTimestamp: p.IgnoreTimestamp,
Log: p.Log,
var metricParser telegraf.Parser
if openmetrics.AcceptsContent(resp.Header) {
metricParser = &openmetrics.Parser{
Header: resp.Header,
MetricVersion: p.MetricVersion,
IgnoreTimestamp: p.IgnoreTimestamp,
Log: p.Log,
}
} else {
metricParser = &parser.Parser{
Header: resp.Header,
MetricVersion: p.MetricVersion,
IgnoreTimestamp: p.IgnoreTimestamp,
Log: p.Log,
}
}
metrics, err := metricParser.Parse(body)
if err != nil {
Expand Down
Loading
Loading