Skip to content

Commit

Permalink
Merge 0856746 into 432d069
Browse files Browse the repository at this point in the history
  • Loading branch information
ytsarev committed Jan 28, 2020
2 parents 432d069 + 0856746 commit 971fa0a
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 16 deletions.
57 changes: 48 additions & 9 deletions source/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand All @@ -36,10 +37,11 @@ import (
// crdSource is an implementation of Source that provides endpoints by listing
// specified CRD and fetching Endpoints embedded in Spec.
type crdSource struct {
crdClient rest.Interface
namespace string
crdResource string
codec runtime.ParameterCodec
crdClient rest.Interface
namespace string
crdResource string
codec runtime.ParameterCodec
annotationFilter string
}

func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error {
Expand Down Expand Up @@ -99,12 +101,13 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, kube
}

// NewCRDSource creates a new crdSource with the given config.
func NewCRDSource(crdClient rest.Interface, namespace, kind string, scheme *runtime.Scheme) (Source, error) {
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, scheme *runtime.Scheme) (Source, error) {
return &crdSource{
crdResource: strings.ToLower(kind) + "s",
namespace: namespace,
crdClient: crdClient,
codec: runtime.NewParameterCodec(scheme),
crdResource: strings.ToLower(kind) + "s",
namespace: namespace,
annotationFilter: annotationFilter,
crdClient: crdClient,
codec: runtime.NewParameterCodec(scheme),
}, nil
}

Expand All @@ -117,6 +120,11 @@ func (cs *crdSource) Endpoints() ([]*endpoint.Endpoint, error) {
return nil, err
}

result, err = cs.filterByAnnotations(result)
if err != nil {
return nil, err
}

for _, dnsEndpoint := range result.Items {
// Make sure that all endpoints have targets for A or CNAME type
crdEndpoints := []*endpoint.Endpoint{}
Expand Down Expand Up @@ -192,3 +200,34 @@ func (cs *crdSource) UpdateStatus(dnsEndpoint *endpoint.DNSEndpoint) (result *en
Into(result)
return
}

// filterByAnnotations filters a list of dnsendpoints by a given annotation selector.
func (cs *crdSource) filterByAnnotations(dnsendpoints *endpoint.DNSEndpointList) (*endpoint.DNSEndpointList, error) {
labelSelector, err := metav1.ParseToLabelSelector(cs.annotationFilter)
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, err
}

// empty filter returns original list
if selector.Empty() {
return dnsendpoints, nil
}

filteredList := endpoint.DNSEndpointList{}

for _, dnsendpoint := range dnsendpoints.Items {
// convert the dnsendpoint' annotations to an equivalent label selector
annotations := labels.Set(dnsendpoint.Annotations)

// include dnsendpoint if its annotations match the selector
if selector.Matches(annotations) {
filteredList.Items = append(filteredList.Items, dnsendpoint)
}
}

return &filteredList, nil
}
55 changes: 49 additions & 6 deletions source/crd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj))))
}

func startCRDServerToServeTargets(endpoints []*endpoint.Endpoint, apiVersion, kind, namespace, name string, t *testing.T) rest.Interface {
func startCRDServerToServeTargets(endpoints []*endpoint.Endpoint, apiVersion, kind, namespace, name string, annotations map[string]string, t *testing.T) rest.Interface {
groupVersion, _ := schema.ParseGroupVersion(apiVersion)
scheme := runtime.NewScheme()
addKnownTypes(scheme, groupVersion)
Expand All @@ -68,9 +68,10 @@ func startCRDServerToServeTargets(endpoints []*endpoint.Endpoint, apiVersion, ki
Kind: kind,
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Generation: 1,
Name: name,
Namespace: namespace,
Annotations: annotations,
Generation: 1,
},
Spec: endpoint.DNSEndpointSpec{
Endpoints: endpoints,
Expand Down Expand Up @@ -136,6 +137,8 @@ func testCRDSourceEndpoints(t *testing.T) {
endpoints []*endpoint.Endpoint
expectEndpoints bool
expectError bool
annotationFilter string
annotations map[string]string
}{
{
title: "invalid crd api version",
Expand Down Expand Up @@ -264,16 +267,56 @@ func testCRDSourceEndpoints(t *testing.T) {
expectEndpoints: true,
expectError: false,
},
{
title: "valid crd gvk with annotation and non matching annotation filter",
registeredAPIVersion: "test.k8s.io/v1alpha1",
apiVersion: "test.k8s.io/v1alpha1",
registeredKind: "DNSEndpoint",
kind: "DNSEndpoint",
namespace: "foo",
registeredNamespace: "foo",
annotations: map[string]string{"test": "that"},
annotationFilter: "test=filter_something_else",
endpoints: []*endpoint.Endpoint{
{DNSName: "abc.example.org",
Targets: endpoint.Targets{"1.2.3.4"},
RecordType: endpoint.RecordTypeA,
RecordTTL: 180,
},
},
expectEndpoints: false,
expectError: false,
},
{
title: "valid crd gvk with annotation and matching annotation filter",
registeredAPIVersion: "test.k8s.io/v1alpha1",
apiVersion: "test.k8s.io/v1alpha1",
registeredKind: "DNSEndpoint",
kind: "DNSEndpoint",
namespace: "foo",
registeredNamespace: "foo",
annotations: map[string]string{"test": "that"},
annotationFilter: "test=that",
endpoints: []*endpoint.Endpoint{
{DNSName: "abc.example.org",
Targets: endpoint.Targets{"1.2.3.4"},
RecordType: endpoint.RecordTypeA,
RecordTTL: 180,
},
},
expectEndpoints: true,
expectError: false,
},
} {
t.Run(ti.title, func(t *testing.T) {
restClient := startCRDServerToServeTargets(ti.endpoints, ti.registeredAPIVersion, ti.registeredKind, ti.registeredNamespace, "test", t)
restClient := startCRDServerToServeTargets(ti.endpoints, ti.registeredAPIVersion, ti.registeredKind, ti.registeredNamespace, "test", ti.annotations, t)
groupVersion, err := schema.ParseGroupVersion(ti.apiVersion)
require.NoError(t, err)

scheme := runtime.NewScheme()
addKnownTypes(scheme, groupVersion)

cs, _ := NewCRDSource(restClient, ti.namespace, ti.kind, scheme)
cs, _ := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, scheme)

receivedEndpoints, err := cs.Endpoints()
if ti.expectError {
Expand Down
2 changes: 1 addition & 1 deletion source/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil {
return nil, err
}
return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, scheme)
return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, scheme)
}
return nil, ErrSourceNotFound
}
Expand Down

0 comments on commit 971fa0a

Please sign in to comment.