Skip to content

Commit

Permalink
Merge pull request kubernetes#118953 from mskrocki/escLib
Browse files Browse the repository at this point in the history
Convert EndpointSlice Reconciler to a library in staging.
  • Loading branch information
k8s-ci-robot committed Jul 14, 2023
2 parents b635f2a + 43b509d commit 5c72df7
Show file tree
Hide file tree
Showing 47 changed files with 1,258 additions and 326 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ require (
k8s.io/cri-api v0.0.0
k8s.io/csi-translation-lib v0.0.0
k8s.io/dynamic-resource-allocation v0.0.0
k8s.io/endpointslice v0.0.0
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d
k8s.io/klog/v2 v2.100.1
k8s.io/kms v0.0.0
Expand Down Expand Up @@ -262,6 +263,7 @@ replace (
k8s.io/cri-api => ./staging/src/k8s.io/cri-api
k8s.io/csi-translation-lib => ./staging/src/k8s.io/csi-translation-lib
k8s.io/dynamic-resource-allocation => ./staging/src/k8s.io/dynamic-resource-allocation
k8s.io/endpointslice => ./staging/src/k8s.io/endpointslice
k8s.io/kms => ./staging/src/k8s.io/kms
k8s.io/kube-aggregator => ./staging/src/k8s.io/kube-aggregator
k8s.io/kube-controller-manager => ./staging/src/k8s.io/kube-controller-manager
Expand Down
37 changes: 28 additions & 9 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -38,13 +39,13 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
endpointsliceutil "k8s.io/endpointslice/util"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/v1/endpoints"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
utillabels "k8s.io/kubernetes/pkg/util/labels"
utilnet "k8s.io/utils/net"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced

e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
e.eventBroadcaster = broadcaster
e.eventRecorder = recorder

Expand Down Expand Up @@ -152,7 +153,7 @@ type Controller struct {

// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
// annotation.
triggerTimeTracker *endpointutil.TriggerTimeTracker
triggerTimeTracker *endpointsliceutil.TriggerTimeTracker

endpointUpdatesBatchPeriod time.Duration
}
Expand Down Expand Up @@ -193,7 +194,7 @@ func (e *Controller) Run(ctx context.Context, workers int) {
// enqueue them. obj must have *v1.Pod type.
func (e *Controller) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
services, err := endpointutil.GetPodServiceMemberships(e.serviceLister, pod)
services, err := endpointsliceutil.GetPodServiceMemberships(e.serviceLister, pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
return
Expand Down Expand Up @@ -262,7 +263,7 @@ func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointA
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *v1.Pod types.
func (e *Controller) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur)
services := endpointsliceutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur)
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
Expand All @@ -271,7 +272,7 @@ func (e *Controller) updatePod(old, cur interface{}) {
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (e *Controller) deletePod(obj interface{}) {
pod := endpointutil.GetPodFromDeleteAction(obj)
pod := endpointsliceutil.GetPodFromDeleteAction(obj)
if pod != nil {
e.addPod(pod)
}
Expand Down Expand Up @@ -412,7 +413,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
var totalNotReadyEps int

for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service))
continue
}
Expand All @@ -426,7 +427,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
}

epa := *ep
if endpointutil.ShouldSetHostname(pod, service) {
if endpointsliceutil.ShouldSetHostname(pod, service) {
epa.Hostname = pod.Spec.Hostname
}

Expand Down Expand Up @@ -483,7 +484,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
// updates caused by Pod updates that we don't care, e.g. annotation update.
if !createEndpoints &&
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
Expand Down Expand Up @@ -702,3 +703,21 @@ func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddr
}
return addresses[0:maxNum]
}

// semanticIgnoreResourceVersion does semantic deep equality checks for objects
// but excludes ResourceVersion of ObjectReference. They are used when comparing
// endpoints in Endpoints and EndpointSlice objects to avoid unnecessary updates
// caused by Pod resourceVersion change.
var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie(
func(a, b v1.ObjectReference) bool {
a.ResourceVersion = ""
b.ResourceVersion = ""
return a == b
},
)

// endpointSubsetsEqualIgnoreResourceVersion returns true if EndpointSubsets
// have equal attributes but excludes ResourceVersion of Pod.
func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool {
return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2)
}
98 changes: 98 additions & 0 deletions pkg/controller/endpoint/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2559,3 +2559,101 @@ func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan
case <-receivingChan:
}
}

func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) {
copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset {
newSubSet := orig.DeepCopy()
mutator(newSubSet)
return newSubSet
}
es1 := &v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "1.1.1.1",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: "1.1.1.2",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"},
},
},
Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}},
}
es2 := &v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "2.2.2.1",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: "2.2.2.2",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"},
},
},
Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}},
}
tests := []struct {
name string
subsets1 []v1.EndpointSubset
subsets2 []v1.EndpointSubset
expected bool
}{
{
name: "Subsets removed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1},
expected: false,
},
{
name: "Ready Pod IP changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
es.Addresses[0].IP = "1.1.1.10"
}), *es2},
expected: false,
},
{
name: "NotReady Pod IP changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.NotReadyAddresses[0].IP = "2.2.2.10"
})},
expected: false,
},
{
name: "Pod ResourceVersion changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.Addresses[0].TargetRef.ResourceVersion = "100"
})},
expected: true,
},
{
name: "Pod ResourceVersion removed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.Addresses[0].TargetRef.ResourceVersion = ""
})},
expected: true,
},
{
name: "Ports changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
es.Ports[0].Port = 8082
})},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := endpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected {
t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected)
}
})
}
}

0 comments on commit 5c72df7

Please sign in to comment.