Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Troubleshooting memory usage for caches #57

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Options struct {
Resync time.Duration
TweakList TweakListOptionsFunc
WaitHealthy func(ctx context.Context)

isUserContext bool
isDownstream bool
}

func NewCache(obj, listObj runtime.Object, client *client.Client, opts *Options) cache.SharedIndexInformer {
Expand All @@ -37,11 +40,13 @@ func NewCache(obj, listObj runtime.Object, client *client.Client, opts *Options)
opts = applyDefaultCacheOptions(opts)

lw := &deferredListWatcher{
client: client,
tweakList: opts.TweakList,
namespace: opts.Namespace,
listObj: listObj,
waitHealthy: opts.WaitHealthy,
client: client,
tweakList: opts.TweakList,
namespace: opts.Namespace,
listObj: listObj,
waitHealthy: opts.WaitHealthy,
isUserContext: opts.isUserContext,
isDownstream: opts.isDownstream,
}

return &deferredCache{
Expand Down Expand Up @@ -88,18 +93,38 @@ type deferredCache struct {
}

type deferredListWatcher struct {
lw cache.ListerWatcher
client *client.Client
tweakList TweakListOptionsFunc
namespace string
listObj runtime.Object
waitHealthy func(ctx context.Context)
lw cache.ListerWatcher
client *client.Client
tweakList TweakListOptionsFunc
namespace string
listObj runtime.Object
waitHealthy func(ctx context.Context)
isUserContext bool
isDownstream bool
}

func (d *deferredListWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
if d.lw == nil {
return nil, fmt.Errorf("cache not started")
}
if d.isUserContext {
if d.isDownstream {
return d.userContextDownstreamList(options)
}
return d.userContextUpstreamList(options)
}
return d.managementContextList(options)
}

func (d *deferredListWatcher) userContextDownstreamList(options metav1.ListOptions) (runtime.Object, error) {
return d.lw.List(options)
}

func (d *deferredListWatcher) userContextUpstreamList(options metav1.ListOptions) (runtime.Object, error) {
return d.lw.List(options)
}

func (d *deferredListWatcher) managementContextList(options metav1.ListOptions) (runtime.Object, error) {
return d.lw.List(options)
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/cache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package cache

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/rancher/lasso/pkg/metrics"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const (
cacheMetricsCollectionPeriod = 1 * time.Minute
cacheContextLabel = "context"
cacheKindLabel = "kind"
)

var (
cacheStoreCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: "cache",
Name: "store_count",
Help: "Number of items in the cache store",
}, []string{cacheContextLabel, cacheKindLabel},
)
cacheStartedCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: "cache",
Name: "started_count",
Help: "Number of started caches per factory",
}, []string{cacheContextLabel},
)
)

func init() {
metrics.Register(cacheStoreCount, cacheStartedCount)
}

func startMetricsCollection(ctx context.Context, s *sharedCacheFactory, contextName string) {
go func() {
ticker := time.NewTicker(cacheMetricsCollectionPeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

factoryMetrics := collectMetrics(s)
for kind, count := range factoryMetrics.numEntries {
cacheStoreCount.WithLabelValues(contextName, kind).Set(float64(count))
}
cacheStartedCount.WithLabelValues(contextName).Set(float64(factoryMetrics.startedCaches))
ticker.Reset(cacheMetricsCollectionPeriod)
}
}()
}

func collectMetrics(s *sharedCacheFactory) sharedCacheFactoryMetrics {
numEntries := map[string]int{}
for gvk, c := range s.caches {
kindKey := objectKindKey(gvk)
items := c.GetStore().List()
numEntries[kindKey] = len(items)
}
return sharedCacheFactoryMetrics{
numEntries: numEntries,
startedCaches: len(s.startedCaches),
}
}

type sharedCacheFactoryMetrics struct {
numEntries map[string]int
startedCaches int
}

func objectKindKey(gvk schema.GroupVersionKind) string {
return gvk.Kind + "." + gvk.Group + "/" + gvk.Version
}
64 changes: 60 additions & 4 deletions pkg/cache/sharedinformerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -13,6 +14,19 @@ import (
"k8s.io/client-go/tools/cache"
)

const (
contextIDKey = "cache_context_id"
contextClusterNameKey = "cache_context_cluster_name"
)

func WithContextID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, contextIDKey, id)
}

func WithClusterName(ctx context.Context, clusterName string) context.Context {
return context.WithValue(ctx, contextClusterNameKey, clusterName)
}

type TweakListOptionsFunc func(*v1.ListOptions)

type SharedCacheFactoryOptions struct {
Expand All @@ -24,6 +38,9 @@ type SharedCacheFactoryOptions struct {
KindNamespace map[schema.GroupVersionKind]string
KindTweakList map[schema.GroupVersionKind]TweakListOptionsFunc
HealthCallback func(healthy bool)

IsUserContext bool
IsDownstream bool
}

type sharedCacheFactory struct {
Expand All @@ -40,6 +57,10 @@ type sharedCacheFactory struct {

caches map[schema.GroupVersionKind]cache.SharedIndexInformer
startedCaches map[schema.GroupVersionKind]bool

isUserContext bool
isDownstream bool
started bool
}

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
Expand All @@ -60,6 +81,8 @@ func NewSharedCachedFactory(sharedClientFactory client.SharedClientFactory, opts
healthcheck: healthcheck{
callback: opts.HealthCallback,
},
isUserContext: opts.IsUserContext,
isDownstream: opts.IsDownstream,
}

return factory
Expand Down Expand Up @@ -91,6 +114,31 @@ func (f *sharedCacheFactory) StartGVK(ctx context.Context, gvk schema.GroupVersi
return nil
}

var seenNames = map[string]int{}

func getUniqueName(n string) string {
id := seenNames[n]
seenNames[n] = id + 1
if id == 0 {
return n
}
return fmt.Sprintf("%s_%d", n, id)
}

func (f *sharedCacheFactory) getContextName(ctx context.Context) string {
name := "mgmt_context"
if f.isUserContext {
name = "user_context"
}
if id, ok := ctx.Value(contextIDKey).(string); ok {
name += "_" + id
}
if clusterName, ok := ctx.Value(contextClusterNameKey).(string); ok {
name += "_" + clusterName
}
return name
}

func (f *sharedCacheFactory) Start(ctx context.Context) error {
f.lock.Lock()
defer f.lock.Unlock()
Expand All @@ -106,6 +154,12 @@ func (f *sharedCacheFactory) Start(ctx context.Context) error {
}
}

if !f.started {
contextName := getUniqueName(f.getContextName(ctx))
startMetricsCollection(ctx, f, contextName)
f.started = true
}

return nil
}

Expand Down Expand Up @@ -193,10 +247,12 @@ func (f *sharedCacheFactory) ForResourceKind(gvr schema.GroupVersionResource, ki
client := f.sharedClientFactory.ForResourceKind(gvr, kind, namespaced)

cache := NewCache(obj, objList, client, &Options{
Namespace: namespace,
Resync: resyncPeriod,
TweakList: tweakList,
WaitHealthy: f.healthcheck.ensureHealthy,
Namespace: namespace,
Resync: resyncPeriod,
TweakList: tweakList,
WaitHealthy: f.healthcheck.ensureHealthy,
isUserContext: f.isUserContext,
isDownstream: f.isDownstream,
})
f.caches[gvk] = cache

Expand Down
6 changes: 6 additions & 0 deletions pkg/metrics/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ func init() {
workqueue.SetProvider(workqueueMetricsProvider{})
}
}

func Register(cs ...prometheus.Collector) {
if prometheusMetrics {
prometheus.DefaultRegisterer.MustRegister(cs...)
}
}