Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for number of events received from etcd #116128

Merged
merged 1 commit into from Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go
Expand Up @@ -74,6 +74,17 @@ var (
[]string{"resource"},
)

EventsReceivedCounter = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "events_received_total",
Help: "Counter of events received in watch cache broken by resource type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)

EventsCounter = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -147,6 +158,7 @@ func Register() {
legacyregistry.MustRegister(listCacheNumFetched)
legacyregistry.MustRegister(listCacheNumReturned)
legacyregistry.MustRegister(InitCounter)
legacyregistry.MustRegister(EventsReceivedCounter)
legacyregistry.MustRegister(EventsCounter)
legacyregistry.MustRegister(TerminatedWatchersCounter)
legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal)
Expand Down
Expand Up @@ -280,6 +280,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
metrics.EventsReceivedCounter.WithLabelValues(w.groupResource.String()).Inc()

key, err := w.keyFunc(event.Object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
Expand Down
14 changes: 14 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go
Expand Up @@ -64,6 +64,15 @@ var (
},
[]string{"endpoint"},
)
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: "apiserver",
Name: "storage_events_received_total",
Help: "Number of etcd events received split by kind.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_bookmark_counts",
Expand Down Expand Up @@ -153,6 +162,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
}

// RecordEtcdEvent updated the etcd_events_received_total metric.
func RecordEtcdEvent(resource string) {
etcdEventsReceivedCounts.WithLabelValues(resource).Inc()
}

// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
func RecordEtcdBookmark(resource string) {
etcdBookmarkCounts.WithLabelValues(resource).Inc()
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
Expand Up @@ -256,6 +256,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
}

for _, e := range wres.Events {
metrics.RecordEtcdEvent(wc.watcher.groupResource.String())
parsedEvent, err := parseEvent(e)
if err != nil {
logWatchChannelErr(err)
Expand Down