Skip to content

Commit

Permalink
endpoinslices must mirror services labels
Browse files Browse the repository at this point in the history
Implement, in the endpoint slice controller, the same logic
used for labels in the legacy endpoints controller.

The labels in the endpoint and in the parent must be equivalent.
Headless services add the well-known IsHeadlessService label.
Slices must have two well known labels: LabelServiceName and
LabelManagedBy.
  • Loading branch information
Antonio Ojea committed Sep 21, 2020
1 parent 5f79e91 commit b7d8045
Show file tree
Hide file tree
Showing 5 changed files with 684 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/controller/endpointslice/BUILD
Expand Up @@ -14,6 +14,7 @@ go_library(
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/discovery/validation:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/endpointslice/metrics:go_default_library",
Expand Down
16 changes: 14 additions & 2 deletions pkg/controller/endpointslice/reconciler.go
Expand Up @@ -246,9 +246,11 @@ func (r *reconciler) finalize(

// reconcileByPortMapping compares the endpoints found in existing slices with
// the list of desired endpoints and returns lists of slices to create, update,
// and delete. The logic is split up into several main steps:
// and delete. It also checks that the slices mirror the parent services labels.
// The logic is split up into several main steps:
// 1. Iterate through existing slices, delete endpoints that are no longer
// desired and update matching endpoints that have changed.
// desired and update matching endpoints that have changed. It also checks
// if the slices have the labels of the parent services, and updates them if not.
// 2. Iterate through slices that have been modified in 1 and fill them up with
// any remaining desired endpoints.
// 3. If there still desired endpoints left, try to fit them into a previously
Expand Down Expand Up @@ -287,6 +289,9 @@ func (r *reconciler) reconcileByPortMapping(
}
}

// generate the slice labels and check if parent labels have changed
labels, labelsChanged := setEndpointSliceLabels(existingSlice, service)

// If an endpoint was updated or removed, mark for update or delete
if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) {
if len(existingSlice.Endpoints) > len(newEndpoints) {
Expand All @@ -299,9 +304,16 @@ func (r *reconciler) reconcileByPortMapping(
// otherwise, copy and mark for update
epSlice := existingSlice.DeepCopy()
epSlice.Endpoints = newEndpoints
epSlice.Labels = labels
slicesByName[existingSlice.Name] = epSlice
sliceNamesToUpdate.Insert(epSlice.Name)
}
} else if labelsChanged {
// if labels have changed, copy and mark for update
epSlice := existingSlice.DeepCopy()
epSlice.Labels = labels
slicesByName[existingSlice.Name] = epSlice
sliceNamesToUpdate.Insert(epSlice.Name)
} else {
// slices with no changes will be useful if there are leftover endpoints
sliceNamesUnchanged.Insert(existingSlice.Name)
Expand Down
170 changes: 168 additions & 2 deletions pkg/controller/endpointslice/reconciler_test.go
Expand Up @@ -71,6 +71,13 @@ func TestReconcile1Pod(t *testing.T) {
namespace := "test"
ipv6Family := corev1.IPv6Protocol
svcv4, _ := newServiceAndEndpointMeta("foo", namespace)
svcv4ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
svcv4ClusterIP.Spec.ClusterIP = "1.1.1.1"
svcv4Labels, _ := newServiceAndEndpointMeta("foo", namespace)
svcv4Labels.Labels = map[string]string{"foo": "bar"}
svcv4BadLabels, _ := newServiceAndEndpointMeta("foo", namespace)
svcv4BadLabels.Labels = map[string]string{discovery.LabelServiceName: "bad",
discovery.LabelManagedBy: "actor", corev1.IsHeadlessService: "invalid"}
svcv6, _ := newServiceAndEndpointMeta("foo", namespace)
svcv6.Spec.IPFamily = &ipv6Family
svcv6ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
Expand All @@ -93,6 +100,7 @@ func TestReconcile1Pod(t *testing.T) {
service corev1.Service
expectedAddressType discovery.AddressType
expectedEndpoint discovery.Endpoint
expectedLabels map[string]string
}{
"ipv4": {
service: svcv4,
Expand All @@ -111,6 +119,80 @@ func TestReconcile1Pod(t *testing.T) {
Name: "pod1",
},
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
corev1.IsHeadlessService: "",
},
},
"ipv4-clusterip": {
service: svcv4ClusterIP,
expectedAddressType: discovery.AddressTypeIPv4,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
},
},
"ipv4-labels": {
service: svcv4Labels,
expectedAddressType: discovery.AddressTypeIPv4,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
"foo": "bar",
corev1.IsHeadlessService: "",
},
},
"ipv4-bad-labels": {
service: svcv4BadLabels,
expectedAddressType: discovery.AddressTypeIPv4,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
corev1.IsHeadlessService: "",
},
},
"ipv6": {
service: svcv6,
Expand All @@ -129,6 +211,11 @@ func TestReconcile1Pod(t *testing.T) {
Name: "pod1",
},
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
corev1.IsHeadlessService: "",
},
},
"ipv6-clusterip": {
service: svcv6ClusterIP,
Expand All @@ -147,6 +234,10 @@ func TestReconcile1Pod(t *testing.T) {
Name: "pod1",
},
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
},
},
}

Expand All @@ -173,8 +264,8 @@ func TestReconcile1Pod(t *testing.T) {
t.Errorf("Expected EndpointSlice name to start with %s, got %s", testCase.service.Name, slice.Name)
}

if slice.Labels[discovery.LabelServiceName] != testCase.service.Name {
t.Errorf("Expected EndpointSlice to have label set with %s value, got %s", testCase.service.Name, slice.Labels[discovery.LabelServiceName])
if !reflect.DeepEqual(testCase.expectedLabels, slice.Labels) {
t.Errorf("Expected EndpointSlice to have labels: %v , got %v", testCase.expectedLabels, slice.Labels)
}

if slice.Annotations[corev1.EndpointsLastChangeTriggerTime] != triggerTime.Format(time.RFC3339Nano) {
Expand Down Expand Up @@ -430,6 +521,81 @@ func TestReconcileEndpointSlicesUpdating(t *testing.T) {
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
}

// In some cases, such as service labels updates, all slices for that service will require a change
// This test ensures that we are updating those slices and not calling create + delete for each
func TestReconcileEndpointSlicesServicesLabelsUpdating(t *testing.T) {
client := newClientset()
namespace := "test"
svc, _ := newServiceAndEndpointMeta("foo", namespace)

// start with 250 pods
pods := []*corev1.Pod{}
for i := 0; i < 250; i++ {
ready := !(i%3 == 0)
pods = append(pods, newPod(i, namespace, ready, 1))
}

r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
numActionsExpected := 3
assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")

slices := fetchEndpointSlices(t, client, namespace)
numActionsExpected++
expectUnorderedSlicesWithLengths(t, slices, []int{100, 100, 50})

// update service with new labels
svc.Labels = map[string]string{"foo": "bar"}
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{&slices[0], &slices[1], &slices[2]}, time.Now())

numActionsExpected += 3
assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
expectActions(t, client.Actions(), 3, "update", "endpointslices")

newSlices := fetchEndpointSlices(t, client, namespace)
expectUnorderedSlicesWithLengths(t, newSlices, []int{100, 100, 50})
// check that the labels were updated
for _, slice := range newSlices {
w, ok := slice.Labels["foo"]
if !ok {
t.Errorf("Expected label \"foo\" from parent service not found")
} else if "bar" != w {
t.Errorf("Expected EndpointSlice to have parent service labels: have %s value, expected bar", w)
}
}
}

// In some cases, such as service labels updates, all slices for that service will require a change
// However, this should not happen for reserved labels
func TestReconcileEndpointSlicesServicesReservedLabels(t *testing.T) {
client := newClientset()
namespace := "test"
svc, _ := newServiceAndEndpointMeta("foo", namespace)

// start with 250 pods
pods := []*corev1.Pod{}
for i := 0; i < 250; i++ {
ready := !(i%3 == 0)
pods = append(pods, newPod(i, namespace, ready, 1))
}

r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
numActionsExpected := 3
assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
slices := fetchEndpointSlices(t, client, namespace)
numActionsExpected++
expectUnorderedSlicesWithLengths(t, slices, []int{100, 100, 50})

// update service with new labels
svc.Labels = map[string]string{discovery.LabelServiceName: "bad", discovery.LabelManagedBy: "actor", corev1.IsHeadlessService: "invalid"}
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{&slices[0], &slices[1], &slices[2]}, time.Now())
assert.Len(t, client.Actions(), numActionsExpected, "Expected no additional clientset actions")

newSlices := fetchEndpointSlices(t, client, namespace)
expectUnorderedSlicesWithLengths(t, newSlices, []int{100, 100, 50})
}

// In this test, we start with 10 slices that only have 30 endpoints each
// An initial reconcile makes no changes (as desired to limit writes)
// When we change a service port, all slices will need to be updated in some way
Expand Down
69 changes: 64 additions & 5 deletions pkg/controller/endpointslice/utils.go
Expand Up @@ -21,6 +21,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,6 +31,7 @@ import (
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/apis/discovery/validation"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
utilnet "k8s.io/utils/net"
Expand Down Expand Up @@ -152,12 +154,9 @@ func endpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *discovery.EndpointSlice {
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
ownerRef := metav1.NewControllerRef(service, gvk)
return &discovery.EndpointSlice{
epSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
discovery.LabelServiceName: service.Name,
discovery.LabelManagedBy: controllerName,
},
Labels: map[string]string{},
GenerateName: getEndpointSlicePrefix(service.Name),
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Namespace: service.Namespace,
Expand All @@ -166,6 +165,10 @@ func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *disc
AddressType: endpointMeta.AddressType,
Endpoints: []discovery.Endpoint{},
}
// add parent service labels
epSlice.Labels, _ = setEndpointSliceLabels(epSlice, service)

return epSlice
}

// getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name.
Expand Down Expand Up @@ -267,6 +270,62 @@ func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error
return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil
}

// setEndpointSliceLabels returns a map with the new endpoint slices labels and true if there was an update.
// Slices labels must be equivalent to the Service labels except for the reserved IsHeadlessService, LabelServiceName and LabelManagedBy labels
// Changes to IsHeadlessService, LabelServiceName and LabelManagedBy labels on the Service do not result in updates to EndpointSlice labels.
func setEndpointSliceLabels(epSlice *discovery.EndpointSlice, service *corev1.Service) (map[string]string, bool) {
updated := false
epLabels := make(map[string]string)
svcLabels := make(map[string]string)

// check if the endpoint slice and the service have the same labels
// clone current slice labels except the reserved labels
for key, value := range epSlice.Labels {
if IsReservedLabelKey(key) {
continue
}
// copy endpoint slice labels
epLabels[key] = value
}

for key, value := range service.Labels {
if IsReservedLabelKey(key) {
klog.Warningf("Service %s/%s using reserved endpoint slices label, skipping label %s: %s", service.Namespace, service.Name, key, value)
continue
}
// copy service labels
svcLabels[key] = value
}

// if the labels are not identical update the slice with the corresponding service labels
if !apiequality.Semantic.DeepEqual(epLabels, svcLabels) {
updated = true
}

// add or remove headless label depending on the service Type
if !helper.IsServiceIPSet(service) {
svcLabels[v1.IsHeadlessService] = ""
} else {
delete(svcLabels, v1.IsHeadlessService)
}

// override endpoint slices reserved labels
svcLabels[discovery.LabelServiceName] = service.Name
svcLabels[discovery.LabelManagedBy] = controllerName

return svcLabels, updated
}

// IsReservedLabelKey return true if the label is one of the reserved label for slices
func IsReservedLabelKey(label string) bool {
if label == discovery.LabelServiceName ||
label == discovery.LabelManagedBy ||
label == v1.IsHeadlessService {
return true
}
return false
}

// endpointSliceEndpointLen helps sort endpoint slices by the number of
// endpoints they contain.
type endpointSliceEndpointLen []*discovery.EndpointSlice
Expand Down

0 comments on commit b7d8045

Please sign in to comment.