Skip to content

Commit

Permalink
Add new metric servicesCountByTrafficDistribution
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Mar 4, 2024
1 parent 51f86b9 commit 606cae9
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 18 deletions.
58 changes: 54 additions & 4 deletions staging/src/k8s.io/endpointslice/metrics/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (
"math"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
endpointsliceutil "k8s.io/endpointslice/util"
)

// NewCache returns a new Cache with the specified endpointsPerSlice.
func NewCache(endpointsPerSlice int32) *Cache {
return &Cache{
maxEndpointsPerSlice: endpointsPerSlice,
cache: map[types.NamespacedName]*ServicePortCache{},
maxEndpointsPerSlice: endpointsPerSlice,
cache: map[types.NamespacedName]*ServicePortCache{},
servicesByTrafficDistribution: make(map[string]map[types.NamespacedName]bool),
}
}

Expand All @@ -40,7 +42,7 @@ type Cache struct {
maxEndpointsPerSlice int32

// lock protects changes to numEndpoints, numSlicesActual, numSlicesDesired,
// and cache.
// cache and servicesByTrafficDistribution
lock sync.Mutex
// numEndpoints represents the total number of endpoints stored in
// EndpointSlices.
Expand All @@ -52,8 +54,18 @@ type Cache struct {
// cache stores a ServicePortCache grouped by NamespacedNames representing
// Services.
cache map[types.NamespacedName]*ServicePortCache
// Tracks all services partitioned by their trafficDistribution field.
//
// The type should be read as map[trafficDistribution]setOfServices
servicesByTrafficDistribution map[string]map[types.NamespacedName]bool
}

const (
// Label value for cases when service.spec.trafficDistribution is set to an
// unknown value.
trafficDistributionImplementationSpecific = "ImplementationSpecific"
)

// ServicePortCache tracks values for total numbers of desired endpoints as well
// as the efficiency of EndpointSlice endpoints distribution for each unique
// Service Port combination.
Expand Down Expand Up @@ -124,20 +136,53 @@ func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *
c.updateMetrics()
}

func (c *Cache) UpdateTrafficDistributionForService(serviceNN types.NamespacedName, trafficDistributionPtr *string) {
c.lock.Lock()
defer c.lock.Unlock()

defer c.updateMetrics()

for _, serviceSet := range c.servicesByTrafficDistribution {
delete(serviceSet, serviceNN)
}

if trafficDistributionPtr == nil {
return
}

trafficDistribution := *trafficDistributionPtr
// If we don't explicitly recognize a value for trafficDistribution, it should
// be treated as an implementation specific value. All such implementation
// specific values should use the label value "ImplementationSpecific" to not
// explode the metric labels cardinality.
if trafficDistribution != corev1.ServiceTrafficDistributionPreferClose {
trafficDistribution = trafficDistributionImplementationSpecific
}
serviceSet, ok := c.servicesByTrafficDistribution[trafficDistribution]
if !ok {
serviceSet = make(map[types.NamespacedName]bool)
c.servicesByTrafficDistribution[trafficDistribution] = serviceSet
}
serviceSet[serviceNN] = true
}

// DeleteService removes references of a Service from the global cache and
// updates the corresponding metrics.
func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
c.lock.Lock()
defer c.lock.Unlock()

for _, serviceSet := range c.servicesByTrafficDistribution {
delete(serviceSet, serviceNN)
}

if spCache, ok := c.cache[serviceNN]; ok {
actualSlices, desiredSlices, endpoints := spCache.totals(int(c.maxEndpointsPerSlice))
c.numEndpoints = c.numEndpoints - endpoints
c.numSlicesDesired -= desiredSlices
c.numSlicesActual -= actualSlices
c.updateMetrics()
delete(c.cache, serviceNN)

}
}

Expand All @@ -147,6 +192,11 @@ func (c *Cache) updateMetrics() {
NumEndpointSlices.WithLabelValues().Set(float64(c.numSlicesActual))
DesiredEndpointSlices.WithLabelValues().Set(float64(c.numSlicesDesired))
EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))

ServicesCountByTrafficDistribution.Reset()
for trafficDistribution, services := range c.servicesByTrafficDistribution {
ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution).Set(float64(len(services)))
}
}

// numDesiredSlices calculates the number of EndpointSlices that would exist
Expand Down
96 changes: 96 additions & 0 deletions staging/src/k8s.io/endpointslice/metrics/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
endpointsliceutil "k8s.io/endpointslice/util"
Expand Down Expand Up @@ -89,6 +91,96 @@ func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int
}
}

// Tests the mutations to servicesByTrafficDistribution field within Cache
// object.
func TestCache_ServicesByTrafficDistribution(t *testing.T) {
cache := NewCache(0)

service1 := types.NamespacedName{Namespace: "ns1", Name: "service1"}
service2 := types.NamespacedName{Namespace: "ns1", Name: "service2"}
service3 := types.NamespacedName{Namespace: "ns2", Name: "service3"}
service4 := types.NamespacedName{Namespace: "ns3", Name: "service4"}

// Define helper function for assertion
mustHaveServicesByTrafficDistribution := func(wantServicesByTrafficDistribution map[string]map[types.NamespacedName]bool, desc string) {
t.Helper()
gotServicesByTrafficDistribution := cache.servicesByTrafficDistribution
if diff := cmp.Diff(wantServicesByTrafficDistribution, gotServicesByTrafficDistribution); diff != "" {
t.Fatalf("UpdateTrafficDistributionForService(%v) resulted in unexpected diff for cache.servicesByTrafficDistribution; (-want, +got)\n%v", desc, diff)
}
}

// Mutate and make assertions

desc := "service1 starts using trafficDistribution=PreferClose"
cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
}, desc)

desc = "service1 starts using trafficDistribution=PreferClose, retries of similar mutation should be idempotent"
cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
}, desc)

desc = "service2 starts using trafficDistribution=PreferClose"
cache.UpdateTrafficDistributionForService(service2, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true}, // Delta
}, desc)

desc = "service3 starts using trafficDistribution=InvalidValue"
cache.UpdateTrafficDistributionForService(service3, ptrTo("InvalidValue"))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true},
trafficDistributionImplementationSpecific: {service3: true}, // Delta
}, desc)

desc = "service4 starts using trafficDistribution=nil"
cache.UpdateTrafficDistributionForService(service4, nil)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true},
trafficDistributionImplementationSpecific: {service3: true},
}, desc)

desc = "service2 transitions trafficDistribution: PreferClose -> InvalidValue"
cache.UpdateTrafficDistributionForService(service2, ptrTo("InvalidValue"))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true}, // Delta
trafficDistributionImplementationSpecific: {service3: true, service2: true}, // Delta
}, desc)

desc = "service3 gets deleted"
cache.DeleteService(service3)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
trafficDistributionImplementationSpecific: {service2: true}, // Delta
}, desc)

desc = "service1 transitions trafficDistribution: PreferClose -> empty"
cache.UpdateTrafficDistributionForService(service1, ptrTo(""))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {}, // Delta
trafficDistributionImplementationSpecific: {service1: true, service2: true}, // Delta
}, desc)

desc = "service1 transitions trafficDistribution: InvalidValue -> nil"
cache.UpdateTrafficDistributionForService(service1, nil)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {},
trafficDistributionImplementationSpecific: {service2: true}, // Delta
}, desc)

desc = "service2 transitions trafficDistribution: InvalidValue -> nil"
cache.UpdateTrafficDistributionForService(service2, nil)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {},
trafficDistributionImplementationSpecific: {}, // Delta
}, desc)

}

func benchmarkUpdateServicePortCache(b *testing.B, num int) {
c := NewCache(int32(100))
ns := "benchmark"
Expand Down Expand Up @@ -132,3 +224,7 @@ func BenchmarkUpdateServicePortCache10000(b *testing.B) {
func BenchmarkUpdateServicePortCache100000(b *testing.B) {
benchmarkUpdateServicePortCache(b, 100000)
}

func ptrTo[T any](obj T) *T {
return &obj
}
13 changes: 13 additions & 0 deletions staging/src/k8s.io/endpointslice/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ var (
},
[]string{"result"}, // either "success", "stale", or "error"
)

// ServicesCountByTrafficDistribution tracks the number of Services using some
// specific trafficDistribution
ServicesCountByTrafficDistribution = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: EndpointSliceSubsystem,
Name: "services_count_by_traffic_distribution",
Help: "Number of Services using some specific trafficDistribution",
StabilityLevel: metrics.ALPHA,
},
[]string{"traffic_distribution"}, // One of ["PreferClose", "ImplementationSpecific"]
)
)

var registerMetrics sync.Once
Expand All @@ -134,5 +146,6 @@ func RegisterMetrics() {
legacyregistry.MustRegister(EndpointSliceChanges)
legacyregistry.MustRegister(EndpointSlicesChangedPerSync)
legacyregistry.MustRegister(EndpointSliceSyncs)
legacyregistry.MustRegister(ServicesCountByTrafficDistribution)
})
}
3 changes: 3 additions & 0 deletions staging/src/k8s.io/endpointslice/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
}

if canUseTrafficDistribution {
r.metricsCache.UpdateTrafficDistributionForService(serviceNN, service.Spec.TrafficDistribution)
slicesToCreate, slicesToUpdate, _ = trafficdist.ReconcileHints(service.Spec.TrafficDistribution, slicesToCreate, slicesToUpdate, unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete))
} else {
r.metricsCache.UpdateTrafficDistributionForService(serviceNN, nil)
}

err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
Expand Down
48 changes: 34 additions & 14 deletions staging/src/k8s.io/endpointslice/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,9 @@ func TestReconcile_TrafficDistribution(t *testing.T) {
slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used.
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
slicesChangedPerSyncTrafficDist: 1, // 1 EPS configured using trafficDistribution.
servicesCountByTrafficDistribution: map[string]int{
"PreferClose": 1,
},
},
},
{
Expand Down Expand Up @@ -2102,7 +2105,7 @@ func TestReconcile_TrafficDistribution(t *testing.T) {
},
{
name: "trafficDistribution=<empty>, topologyAnnotation=<empty>",
desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added",
desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added, but the servicesCountByTrafficDistribution metric should reflect this",
trafficDistributionFeatureGateEnabled: true,
trafficDistribution: "",
topologyAnnotation: "",
Expand All @@ -2119,6 +2122,9 @@ func TestReconcile_TrafficDistribution(t *testing.T) {
slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used.
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
servicesCountByTrafficDistribution: map[string]int{
"ImplementationSpecific": 1,
},
},
},
}
Expand Down Expand Up @@ -2317,19 +2323,20 @@ func reconcileHelper(t *testing.T, r *Reconciler, service *corev1.Service, pods
// Metrics helpers

type expectedMetrics struct {
desiredSlices int
actualSlices int
desiredEndpoints int
addedPerSync int
removedPerSync int
numCreated int
numUpdated int
numDeleted int
slicesChangedPerSync int
slicesChangedPerSyncTopology int
slicesChangedPerSyncTrafficDist int
syncSuccesses int
syncErrors int
desiredSlices int
actualSlices int
desiredEndpoints int
addedPerSync int
removedPerSync int
numCreated int
numUpdated int
numDeleted int
slicesChangedPerSync int
slicesChangedPerSyncTopology int
slicesChangedPerSyncTrafficDist int
syncSuccesses int
syncErrors int
servicesCountByTrafficDistribution map[string]int
}

func expectMetrics(t *testing.T, em expectedMetrics) {
Expand Down Expand Up @@ -2412,6 +2419,18 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
if actualSyncErrors != float64(em.syncErrors) {
t.Errorf("Expected endpointSliceSyncErrors to be %d, got %v", em.syncErrors, actualSyncErrors)
}

for _, trafficDistribution := range []string{"PreferClose", "ImplementationSpecific"} {
gotServicesCount, err := testutil.GetGaugeMetricValue(metrics.ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution))
var wantServicesCount int
if em.servicesCountByTrafficDistribution != nil {
wantServicesCount = em.servicesCountByTrafficDistribution[trafficDistribution]
}
handleErr(t, err, fmt.Sprintf("%v[traffic_distribution=%v]", "services_count_by_traffic_distribution", trafficDistribution))
if int(gotServicesCount) != wantServicesCount {
t.Errorf("Expected servicesCountByTrafficDistribution for traffic_distribution=%v to be %v, got %v", trafficDistribution, wantServicesCount, gotServicesCount)
}
}
}

func handleErr(t *testing.T, err error, metricName string) {
Expand All @@ -2430,4 +2449,5 @@ func setupMetrics() {
metrics.EndpointSliceChanges.Reset()
metrics.EndpointSlicesChangedPerSync.Reset()
metrics.EndpointSliceSyncs.Reset()
metrics.ServicesCountByTrafficDistribution.Reset()
}

0 comments on commit 606cae9

Please sign in to comment.