Skip to content

Commit

Permalink
feat(parsers.openmetrics): Add parser for OpenMetrics format (#15298)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed May 16, 2024
1 parent dcb6177 commit 4896384
Show file tree
Hide file tree
Showing 64 changed files with 4,132 additions and 78 deletions.
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Protocol, JSON format, or Apache Avro format.
- [JSON v2](/plugins/parsers/json_v2)
- [Logfmt](/plugins/parsers/logfmt)
- [Nagios](/plugins/parsers/nagios)
- [OpenMetrics](/plugins/parsers/openmetrics)
- [OpenTSDB](/plugins/parsers/opentsdb)
- [Parquet](/plugins/parsers/parquet)
- [Prometheus](/plugins/parsers/prometheus)
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## If set to true, the gather time will be used.
# ignore_timestamp = false

## Override content-type of the returned message
## Available options are for prometheus:
## text, protobuf-delimiter, protobuf-compact, protobuf-text,
## and for openmetrics:
## openmetrics-text, openmetrics-protobuf
## By default the content-type of the response is used.
# content_type_override = ""

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

Expand Down
173 changes: 95 additions & 78 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/prometheus/common/expfmt"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/influxdata/telegraf/models"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/openmetrics"
parser "github.com/influxdata/telegraf/plugins/parsers/prometheus"
)

Expand All @@ -46,94 +48,71 @@ const (
type PodID string

type Prometheus struct {
// An array of urls to scrape metrics from.
URLs []string `toml:"urls"`

// An array of Kubernetes services to scrape metrics from.
KubernetesServices []string

// Location of kubernetes config file
KubeConfig string

// Label Selector/s for Kubernetes
KubernetesLabelSelector string `toml:"kubernetes_label_selector"`

// Field Selector/s for Kubernetes
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`

// Consul SD configuration
URLs []string `toml:"urls"`
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
Username string `toml:"username"`
Password string `toml:"password"`
HTTPHeaders map[string]string `toml:"http_headers"`
ContentLengthLimit config.Size `toml:"content_length_limit"`
ContentTypeOverride string `toml:"content_type_override"`
EnableRequestMetrics bool `toml:"enable_request_metrics"`
MetricVersion int `toml:"metric_version"`
URLTag string `toml:"url_tag"`
IgnoreTimestamp bool `toml:"ignore_timestamp"`

// Kubernetes service discovery
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodScrapeScope string `toml:"pod_scrape_scope"`
NodeIP string `toml:"node_ip"`
PodScrapeInterval int `toml:"pod_scrape_interval"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
PodNamespaceLabelName string `toml:"pod_namespace_label_name"`
KubernetesServices []string `toml:"kubernetes_services"`
KubeConfig string `toml:"kube_config"`
KubernetesLabelSelector string `toml:"kubernetes_label_selector"`
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`
MonitorKubernetesPodsMethod MonitorMethod `toml:"monitor_kubernetes_pods_method"`
MonitorKubernetesPodsScheme string `toml:"monitor_kubernetes_pods_scheme"`
MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"`
MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"`
NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"`
NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"`
PodAnnotationInclude []string `toml:"pod_annotation_include"`
PodAnnotationExclude []string `toml:"pod_annotation_exclude"`
PodLabelInclude []string `toml:"pod_label_include"`
PodLabelExclude []string `toml:"pod_label_exclude"`
CacheRefreshInterval int `toml:"cache_refresh_interval"`

// Consul discovery
ConsulConfig ConsulConfig `toml:"consul"`

// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`

// Basic authentication credentials
Username string `toml:"username"`
Password string `toml:"password"`

HTTPHeaders map[string]string `toml:"http_headers"`

ContentLengthLimit config.Size `toml:"content_length_limit"`
EnableRequestMetrics bool `toml:"enable_request_metrics"`

MetricVersion int `toml:"metric_version"`

URLTag string `toml:"url_tag"`

IgnoreTimestamp bool `toml:"ignore_timestamp"`

Log telegraf.Logger

Log telegraf.Logger `toml:"-"`
httpconfig.HTTPClientConfig

client *http.Client
headers map[string]string

nsStore cache.Store
client *http.Client
headers map[string]string
contentType string

nsStore cache.Store
nsAnnotationPass []models.TagFilter
nsAnnotationDrop []models.TagFilter

// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodScrapeScope string `toml:"pod_scrape_scope"`
NodeIP string `toml:"node_ip"`
PodScrapeInterval int `toml:"pod_scrape_interval"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
PodNamespaceLabelName string `toml:"pod_namespace_label_name"`
lock sync.Mutex
kubernetesPods map[PodID]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
lock sync.Mutex
kubernetesPods map[PodID]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup

// Only for monitor_kubernetes_pods=true and pod_scrape_scope="node"
podLabelSelector labels.Selector
podFieldSelector fields.Selector
isNodeScrapeScope bool

MonitorKubernetesPodsMethod MonitorMethod `toml:"monitor_kubernetes_pods_method"`
MonitorKubernetesPodsScheme string `toml:"monitor_kubernetes_pods_scheme"`
MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"`
MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"`

NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"`
NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"`

PodAnnotationInclude []string `toml:"pod_annotation_include"`
PodAnnotationExclude []string `toml:"pod_annotation_exclude"`

PodLabelInclude []string `toml:"pod_label_include"`
PodLabelExclude []string `toml:"pod_label_exclude"`

podLabelSelector labels.Selector
podFieldSelector fields.Selector
isNodeScrapeScope bool
podAnnotationIncludeFilter filter.Filter
podAnnotationExcludeFilter filter.Filter
podLabelIncludeFilter filter.Filter
podLabelExcludeFilter filter.Filter

// Only for monitor_kubernetes_pods=true
CacheRefreshInterval int `toml:"cache_refresh_interval"`

// List of consul services to scrape
consulServices map[string]URLAndAddress
}
Expand All @@ -143,6 +122,29 @@ func (*Prometheus) SampleConfig() string {
}

func (p *Prometheus) Init() error {
// Setup content-type override if requested
switch p.ContentTypeOverride {
case "": // No override
case "text":
p.contentType = string(expfmt.NewFormat(expfmt.TypeTextPlain))
case "protobuf-delimiter":
p.contentType = string(expfmt.NewFormat(expfmt.TypeProtoDelim))
case "protobuf-compact":
p.contentType = string(expfmt.NewFormat(expfmt.TypeProtoCompact))
case "protobuf-text":
p.contentType = string(expfmt.NewFormat(expfmt.TypeProtoText))
case "openmetrics-text":
f, err := expfmt.NewOpenMetricsFormat(expfmt.OpenMetricsVersion_1_0_0)
if err != nil {
return err
}
p.contentType = string(f)
case "openmetrics-protobuf":
p.contentType = "application/openmetrics-protobuf;version=1.0.0"
default:
return fmt.Errorf("invalid 'content_type_override' setting %q", p.ContentTypeOverride)
}

// Config processing for node scrape scope for monitor_kubernetes_pods
p.isNodeScrapeScope = strings.EqualFold(p.PodScrapeScope, "node")
if p.isNodeScrapeScope {
Expand Down Expand Up @@ -491,14 +493,29 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s
return requestFields, tags, fmt.Errorf("error reading body: %w", err)
}
}

requestFields["content_length"] = len(body)

// Override the response format if the user requested it
if p.contentType != "" {
resp.Header.Set("Content-Type", p.contentType)
}

// Parse the metrics
metricParser := parser.Parser{
Header: resp.Header,
MetricVersion: p.MetricVersion,
IgnoreTimestamp: p.IgnoreTimestamp,
Log: p.Log,
var metricParser telegraf.Parser
if openmetrics.AcceptsContent(resp.Header) {
metricParser = &openmetrics.Parser{
Header: resp.Header,
MetricVersion: p.MetricVersion,
IgnoreTimestamp: p.IgnoreTimestamp,
Log: p.Log,
}
} else {
metricParser = &parser.Parser{
Header: resp.Header,
MetricVersion: p.MetricVersion,
IgnoreTimestamp: p.IgnoreTimestamp,
Log: p.Log,
}
}
metrics, err := metricParser.Parse(body)
if err != nil {
Expand Down
Loading

0 comments on commit 4896384

Please sign in to comment.