Skip to content

Commit

Permalink
Add scraping for Prometheus endpoint in Kubernetes (influxdata#4920)
Browse files Browse the repository at this point in the history
  • Loading branch information
glinton authored and otherpirate committed Mar 15, 2019
1 parent a6ef138 commit a721984
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 4 deletions.
25 changes: 23 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ in Prometheus format.
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"

## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to `https` & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true

## Use bearer token for authorization
# bearer_token = /path/to/bearer/token

Expand All @@ -39,6 +50,18 @@ by looking up all A records assigned to the hostname as described in
This method can be used to locate all
[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services).

#### Kubernetes scraping

Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes
pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig
file to determine where to monitor.
Currently the following annotation are supported:

* `prometheus.io/scrape` Enable scraping for this pod.
* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http')
* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics')
* `prometheus.io/port` Used to override the port. (default 9102)

#### Bearer Token

If set, the file specified by the `bearer_token` parameter will be read on
Expand Down
198 changes: 198 additions & 0 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package prometheus

import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
"net/url"
"os/user"
"path/filepath"
"sync"
"time"

"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"gopkg.in/yaml.v2"
)

type payload struct {
eventype string
pod *corev1.Pod
}

// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
data, err := ioutil.ReadFile(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err)
}

// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
return k8s.NewClient(&config)
}

func (p *Prometheus) start(ctx context.Context) error {
client, err := k8s.NewInClusterClient()
if err != nil {
u, err := user.Current()
if err != nil {
return fmt.Errorf("Failed to get current user - %v", err)
}
configLocation := filepath.Join(u.HomeDir, ".kube/config")
if p.KubeConfig != "" {
configLocation = p.KubeConfig
}
client, err = loadClient(configLocation)
if err != nil {
return err
}
}

p.wg = sync.WaitGroup{}

p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
err := p.watch(ctx, client)
if err != nil {
log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err)
}
}
}
}()

return nil
}

func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
pod := &corev1.Pod{}
watcher, err := client.Watch(ctx, "", &corev1.Pod{})
if err != nil {
return err
}
defer watcher.Close()

for {
select {
case <-ctx.Done():
return nil
default:
pod = &corev1.Pod{}
// An error here means we need to reconnect the watcher.
eventType, err := watcher.Next(pod)
if err != nil {
return err
}

switch eventType {
case k8s.EventAdded:
registerPod(pod, p)
case k8s.EventDeleted:
unregisterPod(pod, p)
case k8s.EventModified:
}
}
}
}

func registerPod(pod *corev1.Pod, p *Prometheus) {
targetURL := getScrapeURL(pod)
if targetURL == nil {
return
}

log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL)
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %s: %v", *targetURL, err)
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()
p.kubernetesPods = append(p.kubernetesPods,
URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags})
p.lock.Unlock()
}

func getScrapeURL(pod *corev1.Pod) *string {
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if scrape != "true" {
return nil
}
ip := pod.Status.GetPodIP()
if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
return nil
}

scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"]
path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]

if scheme == "" {
scheme = "http"
}
if port == "" {
port = "9102"
}
if path == "" {
path = "/metrics"
}

u := &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(ip, port),
Path: path,
}

x := u.String()

return &x
}

func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := getScrapeURL(pod)
if url == nil {
return
}

p.lock.Lock()
defer p.lock.Unlock()
log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
var result []URLAndAddress
for _, v := range p.kubernetesPods {
if v.URL.String() != *url {
result = append(result, v)
} else {
log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url)
}

}
p.kubernetesPods = result
}
88 changes: 88 additions & 0 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package prometheus

import (
"testing"

"github.com/stretchr/testify/assert"

v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)

func TestScrapeURLNoAnnotations(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.GetMetadata().Annotations = map[string]string{}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestAddPod(t *testing.T) {
prom := &Prometheus{}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods))
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
unregisterPod(p, prom)
assert.Equal(t, 0, len(prom.kubernetesPods))
}

func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}}
p.Status.PodIP = str("127.0.0.1")
p.Metadata.Name = str("myPod")
p.Metadata.Namespace = str("default")
return p
}

func str(x string) *string {
return &x
}
Loading

0 comments on commit a721984

Please sign in to comment.