Skip to content

Commit

Permalink
Fixing how EndpointSlice Mirroring handles Service selector transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
robscott committed Nov 4, 2021
1 parent be2af70 commit 30f791a
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,11 @@ func (c *Controller) syncEndpoints(key string) error {
return err
}

// This means that if a Service transitions away from a nil selector, any
// mirrored EndpointSlices will not be cleaned up. #91072 tracks this issue
// for this controller along with the Endpoints and EndpointSlice
// controllers.
// If a selector is specified, clean up any mirrored slices.
if svc.Spec.Selector != nil {
return nil
klog.V(4).Infof("%s/%s Service now has selector, cleaning up any mirrored EndpointSlices", namespace, name)
c.endpointSliceTracker.DeleteService(namespace, name)
return c.deleteMirroredSlices(namespace, name)
}

endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
Expand Down Expand Up @@ -371,7 +370,7 @@ func (c *Controller) onServiceUpdate(prevObj, obj interface{}) {
service := obj.(*v1.Service)
prevService := prevObj.(*v1.Service)
if service == nil || prevService == nil {
utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj))
utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Service, got %T, %T", prevObj, obj))
return
}
if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) {
Expand Down
145 changes: 145 additions & 0 deletions test/integration/endpointslice/endpointslicemirroring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/controller/endpoint"
Expand Down Expand Up @@ -408,6 +409,150 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
}
}

func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, server, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

config := restclient.Config{Host: server.URL}
client, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}

resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(client, resyncPeriod)

epsmController := endpointslicemirroring.NewController(
informers.Core().V1().Endpoints(),
informers.Discovery().V1beta1().EndpointSlices(),
informers.Core().V1().Services(),
int32(100),
client,
1*time.Second)

// Start informer and controllers
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go epsmController.Run(1, stopCh)

testCases := []struct {
testName string
startingSelector map[string]string
startingMirroredSlices int
endingSelector map[string]string
endingMirroredSlices int
}{
{
testName: "nil -> {foo: bar} selector",
startingSelector: nil,
startingMirroredSlices: 1,
endingSelector: map[string]string{"foo": "bar"},
endingMirroredSlices: 0,
},
{
testName: "{foo: bar} -> nil selector",
startingSelector: map[string]string{"foo": "bar"},
startingMirroredSlices: 0,
endingSelector: nil,
endingMirroredSlices: 1,
},
{
testName: "{} -> {foo: bar} selector",
startingSelector: map[string]string{},
startingMirroredSlices: 1,
endingSelector: map[string]string{"foo": "bar"},
endingMirroredSlices: 0,
},
{
testName: "{foo: bar} -> {} selector",
startingSelector: map[string]string{"foo": "bar"},
startingMirroredSlices: 0,
endingSelector: map[string]string{},
endingMirroredSlices: 1,
},
}

for i, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-mirroring-%d", i), server, t)
defer framework.DeleteTestingNamespace(ns, server, t)
meta := metav1.ObjectMeta{Name: "test-123", Namespace: ns.Name}

service := &corev1.Service{
ObjectMeta: meta,
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
Selector: tc.startingSelector,
},
}

customEndpoints := &corev1.Endpoints{
ObjectMeta: meta,
Subsets: []corev1.EndpointSubset{{
Ports: []corev1.EndpointPort{{
Port: 80,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
}},
}

svc, err := client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v", err)
}

_, err = client.CoreV1().Endpoints(ns.Name).Create(context.TODO(), customEndpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating endpoints: %v", err)
}

// verify the expected number of mirrored slices exist
err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.startingMirroredSlices)
if err != nil {
t.Fatalf("Timed out waiting for initial mirrored slices to match expectations: %v", err)
}

svc.Spec.Selector = tc.endingSelector
_, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), svc, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error updating service: %v", err)
}

// verify the expected number of mirrored slices exist
err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.endingMirroredSlices)
if err != nil {
t.Fatalf("Timed out waiting for final mirrored slices to match expectations: %v", err)
}
})
}
}

func waitForMirroredSlices(t *testing.T, client *kubernetes.Clientset, nsName, svcName string, num int) error {
t.Helper()
return wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
lSelector := discovery.LabelServiceName + "=" + svcName
lSelector += "," + discovery.LabelManagedBy + "=endpointslicemirroring-controller.k8s.io"
esList, err := client.DiscoveryV1beta1().EndpointSlices(nsName).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
if err != nil {
t.Logf("Error listing EndpointSlices: %v", err)
return false, err
}

if len(esList.Items) != num {
t.Logf("Expected %d slices to be mirrored, got %d", num, len(esList.Items))
return false, nil
}

return true, nil
})
}

// isSubset check if all the elements in a exist in b
func isSubset(a, b map[string]string) bool {
if len(a) > len(b) {
Expand Down

0 comments on commit 30f791a

Please sign in to comment.