Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s: Support discovery of ingresses #3111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions config/config.go
Expand Up @@ -1005,10 +1005,11 @@ type KubernetesRole string

// The valid options for KubernetesRole.
const (
KubernetesRoleNode = "node"
KubernetesRolePod = "pod"
KubernetesRoleService = "service"
KubernetesRoleEndpoint = "endpoints"
KubernetesRoleNode KubernetesRole = "node"
KubernetesRolePod KubernetesRole = "pod"
KubernetesRoleService KubernetesRole = "service"
KubernetesRoleEndpoint KubernetesRole = "endpoints"
KubernetesRoleIngress KubernetesRole = "ingress"
)

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand All @@ -1017,7 +1018,7 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error
return err
}
switch *c {
case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint:
case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleIngress:
return nil
default:
return fmt.Errorf("Unknown Kubernetes SD role %q", *c)
Expand Down
21 changes: 11 additions & 10 deletions discovery/kubernetes/endpoints.go
Expand Up @@ -154,18 +154,19 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}

func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) {
endpoints, isEndpoints := o.(*apiv1.Endpoints)
if !isEndpoints {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
endpoints, ok = deletedState.Obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj)
}
endpoints, ok := o.(*apiv1.Endpoints)
if ok {
return endpoints, nil
}

deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
endpoints, ok = deletedState.Obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj)
}
return endpoints, nil
}

Expand Down
184 changes: 184 additions & 0 deletions discovery/kubernetes/ingress.go
@@ -0,0 +1,184 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubernetes

import (
"fmt"

"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
)

// Ingress implements discovery of Kubernetes ingresss.
type Ingress struct {
logger log.Logger
informer cache.SharedInformer
store cache.Store
}

// NewIngress returns a new ingress discovery.
func NewIngress(l log.Logger, inf cache.SharedInformer) *Ingress {
return &Ingress{logger: l, informer: inf, store: inf.GetStore()}
}

// Run implements the TargetProvider interface.
func (s *Ingress) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range s.store.List() {
tg := s.buildIngress(o.(*v1beta1.Ingress))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this interface conversion always safe? Shall we guard against a panic?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an odd design, but the cache actually makes sure that it's the right type before inserting it into cache, so yes it's actually safe here to do this, otherwise something went utterly wrong and all of Kubernetes is broken 🙂.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, also the same in other parts of the k8s discovery.

initial = append(initial, tg)
}
select {
case <-ctx.Done():
return
case ch <- initial:
}

// Send target groups for ingress updates.
send := func(tg *config.TargetGroup) {
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
}
}
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "add").Inc()

ingress, err := convertToIngress(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Ingress object failed")
return
}
send(s.buildIngress(ingress))
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "delete").Inc()

ingress, err := convertToIngress(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Ingress object failed")
return
}
send(&config.TargetGroup{Source: ingressSource(ingress)})
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("ingress", "update").Inc()

ingress, err := convertToIngress(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Ingress object failed")
return
}
send(s.buildIngress(ingress))
},
})

// Block until the target provider is explicitly canceled.
<-ctx.Done()
}

func convertToIngress(o interface{}) (*v1beta1.Ingress, error) {
ingress, ok := o.(*v1beta1.Ingress)
if ok {
return ingress, nil
}

deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
ingress, ok = deletedState.Obj.(*v1beta1.Ingress)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Ingress object: %v", deletedState.Obj)
}
return ingress, nil
}

func ingressSource(s *v1beta1.Ingress) string {
return "ingress/" + s.Namespace + "/" + s.Name
}

const (
ingressNameLabel = metaLabelPrefix + "ingress_name"
ingressLabelPrefix = metaLabelPrefix + "ingress_label_"
ingressAnnotationPrefix = metaLabelPrefix + "ingress_annotation_"
ingressSchemeLabel = metaLabelPrefix + "ingress_scheme"
ingressHostLabel = metaLabelPrefix + "ingress_host"
ingressPathLabel = metaLabelPrefix + "ingress_path"
)

func ingressLabels(ingress *v1beta1.Ingress) model.LabelSet {
ls := make(model.LabelSet, len(ingress.Labels)+len(ingress.Annotations)+2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why +2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, that should have a comment.. But I don't know, I've just copied that over from services.go. Maybe to allocate capacity already for the address label? I can also just get rid of the +2.
@fabxc Maybe you know why it's in here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it should be +1, as you only set one additional label besides the labels+annotations. I guess you copied it from a discovery which sets two additional labels.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, looks to me like all roles only set the namespace as additional labels. Thought maybe something outside discovery attaches another label. Should I convert it to +1 everywhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could check the original commit, probably a second label was set there? Anyway, changing to +1 everywhere makes sense to me.

Copy link
Member

@grobie grobie Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. So below in the caller, the additional namespace label is set. I'd say, move it up here and keep the +2?

ls[ingressNameLabel] = lv(ingress.Name)
ls[namespaceLabel] = lv(ingress.Namespace)

for k, v := range ingress.Labels {
ln := strutil.SanitizeLabelName(ingressLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}

for k, v := range ingress.Annotations {
ln := strutil.SanitizeLabelName(ingressAnnotationPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
return ls
}

func pathsFromIngressRule(rv *v1beta1.IngressRuleValue) []string {
if rv.HTTP == nil {
return []string{"/"}
}
paths := make([]string, len(rv.HTTP.Paths))
for n, p := range rv.HTTP.Paths {
path := p.Path
if path == "" {
path = "/"
}
paths[n] = path
}
return paths
}

func (s *Ingress) buildIngress(ingress *v1beta1.Ingress) *config.TargetGroup {
tg := &config.TargetGroup{
Source: ingressSource(ingress),
}
tg.Labels = ingressLabels(ingress)

schema := "http"
if ingress.Spec.TLS != nil {
schema = "https"
}
for _, rule := range ingress.Spec.Rules {
paths := pathsFromIngressRule(&rule.IngressRuleValue)

for _, path := range paths {
tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(rule.Host),
ingressSchemeLabel: lv(schema),
ingressHostLabel: lv(rule.Host),
ingressPathLabel: lv(path),
})
}
}

return tg
}
137 changes: 137 additions & 0 deletions discovery/kubernetes/ingress_test.go
@@ -0,0 +1,137 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubernetes

import (
"testing"

"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
)

func ingressStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1beta1.Ingress).ObjectMeta.Name, nil
}

func newFakeIngressInformer() *fakeInformer {
return newFakeInformer(ingressStoreKeyFunc)
}

func makeTestIngressDiscovery() (*Ingress, *fakeInformer) {
i := newFakeIngressInformer()
return NewIngress(log.Base(), i), i
}

func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
return &v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "testingress",
Namespace: "default",
Labels: map[string]string{"testlabel": "testvalue"},
Annotations: map[string]string{"testannotation": "testannotationvalue"},
},
Spec: v1beta1.IngressSpec{
TLS: tls,
Rules: []v1beta1.IngressRule{
{
Host: "example.com",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{Path: "/"},
{Path: "/foo"},
},
},
},
},
{
// No backend config, ignored
Host: "nobackend.example.com",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{},
},
},
{
Host: "test.example.com",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{{}},
},
},
},
},
},
}
}

func expectedTargetGroups(tls bool) []*config.TargetGroup {
scheme := "http"
if tls {
scheme = "https"
}
return []*config.TargetGroup{
{
Targets: []model.LabelSet{
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/",
"__address__": "example.com",
},
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/foo",
"__address__": "example.com",
},
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
"__meta_kubernetes_ingress_host": "test.example.com",
"__address__": "test.example.com",
"__meta_kubernetes_ingress_path": "/",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_ingress_name": "testingress",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_ingress_label_testlabel": "testvalue",
"__meta_kubernetes_ingress_annotation_testannotation": "testannotationvalue",
},
Source: "ingress/default/testingress",
},
}
}

func TestIngressDiscoveryInitial(t *testing.T) {
n, i := makeTestIngressDiscovery()
i.GetStore().Add(makeIngress(nil))

k8sDiscoveryTest{
discovery: n,
expectedInitial: expectedTargetGroups(false),
}.Run(t)
}

func TestIngressDiscoveryInitialTLS(t *testing.T) {
n, i := makeTestIngressDiscovery()
i.GetStore().Add(makeIngress([]v1beta1.IngressTLS{{}}))

k8sDiscoveryTest{
discovery: n,
expectedInitial: expectedTargetGroups(true),
}.Run(t)
}