Skip to content

Commit

Permalink
refactor: update NamespaceFilterer interface to be more generic
Browse files Browse the repository at this point in the history
  • Loading branch information
marcsanmi committed Jun 6, 2022
1 parent 7158e36 commit e57ba69
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 49 deletions.
105 changes: 70 additions & 35 deletions internal/discovery/namespace_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/newrelic/nri-kubernetes/v3/internal/config"
"github.com/newrelic/nri-kubernetes/v3/internal/storer"
"github.com/newrelic/nri-kubernetes/v3/src/definition"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand All @@ -17,9 +18,9 @@ import (

const defaultNamespaceResyncDuration = 10 * time.Minute

// NamespaceFilterer provides an interface to filter namespaces.
type NamespaceFilterer interface {
IsAllowed(namespace string) bool
// Filterer provider a interface to match from a given RawMetrics.
type Filterer interface {
Match(metrics definition.RawMetrics) bool
}

// NamespaceFilter is a struct holding pointers to the config and the namespace lister.
Expand Down Expand Up @@ -49,43 +50,66 @@ func NewNamespaceFilter(c *config.NamespaceSelector, client kubernetes.Interface
}
}

// IsAllowed checks given any namespace, if it's allowed to be scraped by using the NamespaceLister
func (nf *NamespaceFilter) IsAllowed(namespace string) bool {
// By default, we scrape every namespace.
// Match checks given any metrics, if a namespace it's allowed to be scraped given a certain match labels or
// expressions configuration.
func (nf *NamespaceFilter) Match(metrics definition.RawMetrics) bool {
namespaceRaw, ok := metrics["namespace"]
if !ok {
return true
}

namespace, ok := namespaceRaw.(string)
if !ok {
log.Tracef("Allowing %q as namespace as invalid type provided %t", namespaceRaw, namespaceRaw)
return true
}

if nf.c == nil {
log.Tracef("Allowing %q namespace as selector is nil", namespace)
return true
}

// Scrape namespaces by honoring the matchLabels values.
if nf.c.MatchLabels != nil {
namespaceList, err := nf.lister.List(labels.SelectorFromSet(nf.c.MatchLabels))
log.Tracef("Filtering %q namespace by MatchLabels", namespace)
return nf.matchNamespaceByLabels(namespace)
}

if nf.c.MatchExpressions != nil {
log.Tracef("Filtering %q namespace by MatchExpressions", namespace)
return nf.matchNamespaceByExpressions(namespace)
}

return true
}

// matchNamespaceByLabels filters a namespace using the selector from the MatchLabels config.
func (nf *NamespaceFilter) matchNamespaceByLabels(namespace string) bool {
namespaceList, err := nf.lister.List(labels.SelectorFromSet(nf.c.MatchLabels))
if err != nil {
nf.logger.Errorf("listing namespaces with MatchLabels: %v", err)
return true
}

return containsNamespace(namespace, namespaceList)
}

// matchNamespaceByExpressions filters a namespace using the selector from the MatchExpressions config.
func (nf *NamespaceFilter) matchNamespaceByExpressions(namespace string) bool {
for _, expression := range nf.c.MatchExpressions {
selector, err := labels.Parse(expression.String())
if err != nil {
nf.logger.Errorf("listing namespaces with MatchLabels: %v", err)
nf.logger.Errorf("parsing labels: %v", err)
return true
}

return containsNamespace(namespace, namespaceList)
}
namespaceList, err := nf.lister.List(selector)
if err != nil {
nf.logger.Errorf("listing namespaces with MatchExpressions: %v", err)
return true
}

// Scrape namespaces by honoring the matchExpressions values.
// Multiple expressions are evaluated with a logical AND between them.
if nf.c.MatchExpressions != nil {
for _, expression := range nf.c.MatchExpressions {
selector, err := labels.Parse(expression.String())
if err != nil {
nf.logger.Errorf("parsing labels: %v", err)
return true
}

namespaceList, err := nf.lister.List(selector)
if err != nil {
nf.logger.Errorf("listing namespaces with MatchExpressions: %v", err)
return true
}

if !containsNamespace(namespace, namespaceList) {
return false
}
if !containsNamespace(namespace, namespaceList) {
return false
}
}

Expand All @@ -105,27 +129,38 @@ func (nf *NamespaceFilter) Close() error {

// CachedNamespaceFilter is a wrapper of the NamespaceFilterer and the cache.
type CachedNamespaceFilter struct {
NsFilter NamespaceFilterer
NsFilter Filterer
cache storer.Storer
}

// NewCachedNamespaceFilter create a new CachedNamespaceFilter, wrapping the cache and the NamespaceFilterer.
func NewCachedNamespaceFilter(ns NamespaceFilterer, storer storer.Storer) *CachedNamespaceFilter {
func NewCachedNamespaceFilter(ns Filterer, storer storer.Storer) *CachedNamespaceFilter {
return &CachedNamespaceFilter{
NsFilter: ns,
cache: storer,
}
}

// IsAllowed check the cache and calls the underlying NamespaceFilter if the result is not found.
func (cnf *CachedNamespaceFilter) IsAllowed(namespace string) bool {
// Match check the cache and calls the underlying NamespaceFilter if the result is not found.
func (cnf *CachedNamespaceFilter) Match(metrics definition.RawMetrics) bool {
namespaceRaw, ok := metrics["namespace"]
if !ok {
return true
}

namespace, ok := namespaceRaw.(string)
if !ok {
log.Tracef("Allowing %q from cache as namespace invalid type provided %t", namespaceRaw, namespaceRaw)
return true
}

// Check if the namespace is already in the cache.
var allowed bool
if _, err := cnf.cache.Get(namespace, &allowed); err == nil {
return allowed
}

allowed = cnf.NsFilter.IsAllowed(namespace)
allowed = cnf.NsFilter.Match(metrics)

// Save the namespace in the cache.
_ = cnf.cache.Set(namespace, allowed)
Expand Down
33 changes: 19 additions & 14 deletions internal/discovery/namespace_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/newrelic/nri-kubernetes/v3/internal/config"
"github.com/newrelic/nri-kubernetes/v3/internal/discovery"
"github.com/newrelic/nri-kubernetes/v3/internal/storer"
"github.com/newrelic/nri-kubernetes/v3/src/definition"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand All @@ -23,6 +24,8 @@ const namespaceName = "test_namespace"
func TestNamespaceFilterer_IsAllowed(t *testing.T) {
t.Parallel()

metrics := definition.RawMetrics{"namespace": namespaceName}

type testData struct {
namespaceLabels labels.Set
namespaceSelector config.NamespaceSelector
Expand Down Expand Up @@ -154,14 +157,16 @@ func TestNamespaceFilterer_IsAllowed(t *testing.T) {
ns.Close()
})

require.Equal(t, testData.expected, ns.IsAllowed(namespaceName))
require.Equal(t, testData.expected, ns.Match(metrics))
})
}
}

func TestNamespaceFilterer_Cache(t *testing.T) {
t.Parallel()

metrics := definition.RawMetrics{"namespace": namespaceName}

type testData struct {
warmCache func(cache *storer.InMemoryStore)
prepare func(nsFilterMock *NamespaceFilterMock)
Expand All @@ -173,10 +178,10 @@ func TestNamespaceFilterer_Cache(t *testing.T) {
"namespace_cache_miss_fallback_to_call_informer": {
warmCache: func(cache *storer.InMemoryStore) {},
prepare: func(nsFilterMock *NamespaceFilterMock) {
nsFilterMock.On("IsAllowed", namespaceName).Return(true).Once()
nsFilterMock.On("Match", metrics).Return(true).Once()
},
assert: func(expected bool, cnsf *discovery.CachedNamespaceFilter) {
require.Equal(t, expected, cnsf.IsAllowed(namespaceName))
require.Equal(t, expected, cnsf.Match(metrics))
},
expected: true,
},
Expand All @@ -185,10 +190,10 @@ func TestNamespaceFilterer_Cache(t *testing.T) {
cache.Set(namespaceName, true)
},
prepare: func(nsFilterMock *NamespaceFilterMock) {
nsFilterMock.AssertNotCalled(t, "IsAllowed")
nsFilterMock.AssertNotCalled(t, "Match")
},
assert: func(expected bool, cnsf *discovery.CachedNamespaceFilter) {
require.Equal(t, expected, cnsf.IsAllowed(namespaceName))
require.Equal(t, expected, cnsf.Match(metrics))
},
expected: true,
},
Expand All @@ -197,21 +202,21 @@ func TestNamespaceFilterer_Cache(t *testing.T) {
cache.Set(namespaceName, false)
},
prepare: func(nsFilterMock *NamespaceFilterMock) {
nsFilterMock.AssertNotCalled(t, "IsAllowed")
nsFilterMock.AssertNotCalled(t, "Match")
},
assert: func(expected bool, cnsf *discovery.CachedNamespaceFilter) {
require.Equal(t, expected, cnsf.IsAllowed(namespaceName))
require.Equal(t, expected, cnsf.Match(metrics))
},
expected: false,
},
"namespace_cache_miss_subsequent_call_uses_cache": {
warmCache: func(cache *storer.InMemoryStore) {},
prepare: func(nsFilterMock *NamespaceFilterMock) {
nsFilterMock.On("IsAllowed", namespaceName).Return(true).Once()
nsFilterMock.On("Match", metrics).Return(true).Once()
},
assert: func(expected bool, cnsf *discovery.CachedNamespaceFilter) {
require.Equal(t, expected, cnsf.IsAllowed(namespaceName))
require.Equal(t, expected, cnsf.IsAllowed(namespaceName))
require.Equal(t, expected, cnsf.Match(metrics))
require.Equal(t, expected, cnsf.Match(metrics))
},
expected: true,
},
Expand Down Expand Up @@ -267,7 +272,7 @@ func TestNamespaceFilter_InformerCacheSync(t *testing.T) {
nil,
)
// Check that recently created namespace is not allowed.
require.Equal(t, false, ns.IsAllowed(namespaceName))
require.Equal(t, false, ns.Match(definition.RawMetrics{"namespace": namespaceName}))

t.Cleanup(func() {
cancel()
Expand All @@ -284,7 +289,7 @@ func TestNamespaceFilter_InformerCacheSync(t *testing.T) {

// Give some room to the informer to sync, and check that the new namespace is filtered properly.
err = wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(context.Context) (bool, error) {
return ns.IsAllowed(anotherNamespaceName), nil
return ns.Match(definition.RawMetrics{"namespace": anotherNamespaceName}), nil
})
require.NoError(t, err, "Timed out waiting for the informer to sync")
}
Expand All @@ -297,8 +302,8 @@ func newNamespaceFilterMock() *NamespaceFilterMock {
return &NamespaceFilterMock{}
}

func (ns *NamespaceFilterMock) IsAllowed(namespace string) bool {
args := ns.Called(namespace)
func (ns *NamespaceFilterMock) Match(metrics definition.RawMetrics) bool {
args := ns.Called(metrics)
return args.Bool(0)
}

Expand Down

0 comments on commit e57ba69

Please sign in to comment.