Skip to content

Commit

Permalink
implement field selector for clusterIP on services
Browse files Browse the repository at this point in the history
This will allow components that don't need to watch headless services
(heavily used on ai/ml workloads) to filter them server side.

Specially useful for kubelet and kube-proxy

Co-authored-by: Jianbo Ma <sakuranlbj@gmail.com>

Change-Id: I6434d2c8c77aaf725ec5c07acbcda14311f24bfa

Change-Id: Iba9e25afb90712facfb3dee25c500bbe08ef38fc
  • Loading branch information
aojea committed Mar 5, 2024
1 parent 2a44cb0 commit 0595ec7
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 1 deletion.
17 changes: 17 additions & 0 deletions pkg/apis/core/v1/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func addConversionFuncs(scheme *runtime.Scheme) error {
if err := AddFieldLabelConversionsForSecret(scheme); err != nil {
return err
}
if err := AddFieldLabelConversionsForService(scheme); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -488,6 +491,20 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error {
})
}

func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error {
return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"),
func(label, value string) (string, string, error) {
switch label {
case "metadata.namespace",
"metadata.name",
"spec.clusterIP":
return label, value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
}

var initContainerAnnotations = map[string]bool{
"pod.beta.kubernetes.io/init-containers": true,
"pod.alpha.kubernetes.io/init-containers": true,
Expand Down
6 changes: 5 additions & 1 deletion pkg/registry/core/service/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func NewREST(
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Service{} },
NewListFunc: func() runtime.Object { return &api.ServiceList{} },
PredicateFunc: svcreg.Matcher,
DefaultQualifiedResource: api.Resource("services"),
SingularQualifiedResource: api.Resource("service"),
ReturnDeletedObject: true,
Expand All @@ -99,7 +100,10 @@ func NewREST(

TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: svcreg.GetAttrs,
}
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/registry/core/service/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ package service

import (
"context"
"fmt"
"reflect"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/registry/generic"
pkgstorage "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/api/legacyscheme"
Expand Down Expand Up @@ -162,6 +167,33 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt
return nil
}

// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
service, ok := obj.(*api.Service)
if !ok {
return nil, nil, fmt.Errorf("not a service")
}
return service.Labels, SelectableFields(service), nil
}

// Matcher returns a selection predicate for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate {
return pkgstorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

// SelectableFields returns a field set that can be used for filter selection
func SelectableFields(service *api.Service) fields.Set {
objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, false)
serviceSpecificFieldsSet := fields.Set{
"spec.clusterIP": service.Spec.ClusterIP,
}
return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet)
}

// dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates
// are not enabled. The typical pattern is:
//
Expand Down
94 changes: 94 additions & 0 deletions pkg/registry/core/service/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
Expand Down Expand Up @@ -782,3 +784,95 @@ func TestDropTypeDependentFields(t *testing.T) {
})
}
}

func TestMatchService(t *testing.T) {
testCases := []struct {
name string
in *api.Service
fieldSelector fields.Selector
expectMatch bool
}{
{
name: "match on headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
expectMatch: true,
},
{
name: "no match on clusterIP service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
expectMatch: false,
},
{
name: "match on clusterIP service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
expectMatch: true,
},
{
name: "match on non-headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"),
expectMatch: true,
},
{
name: "match on any ClusterIP set service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""),
expectMatch: true,
},
{
name: "match on clusterIP IPv6 service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
expectMatch: true,
},
{
name: "no match on headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
expectMatch: false,
},
{
name: "no match on headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
expectMatch: false,
},
{
name: "no match on empty service",
in: &api.Service{},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
expectMatch: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
m := Matcher(labels.Everything(), testCase.fieldSelector)
result, err := m.Matches(testCase.in)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if result != testCase.expectMatch {
t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in)
}
})
}
}
142 changes: 142 additions & 0 deletions test/integration/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ package service
import (
"context"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
Expand Down Expand Up @@ -264,3 +270,139 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t
t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
}
}

func Test_ServiceClusterIPSelector(t *testing.T) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}

ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)

// create headless service
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-headless",
Namespace: ns.Name,
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
Selector: map[string]string{
"foo": "bar",
},
},
}

_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

// informer to watch only non-headless services
kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String()
}))

serviceInformer := kubeInformers.Core().V1().Services().Informer()
serviceLister := kubeInformers.Core().V1().Services().Lister()
serviceHasSynced := serviceInformer.HasSynced
if _, err = serviceInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
t.Logf("Added Service %#v", svc)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldSvc := oldObj.(*corev1.Service)
newSvc := newObj.(*corev1.Service)
t.Logf("Updated Service %#v to %#v", oldSvc, newSvc)
},
DeleteFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
t.Logf("Deleted Service %#v", svc)
},
},
); err != nil {
t.Fatalf("Error adding service informer handler: %v", err)
}
kubeInformers.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), serviceHasSynced)
svcs, err := serviceLister.List(labels.Everything())
if err != nil {
t.Fatalf("Error listing services: %v", err)
}
// only the kubernetes.default service expected
if len(svcs) != 1 || svcs[0].Name != "kubernetes" {
t.Fatalf("expected 1 services, got %d", len(svcs))
}

// create a new service with ClusterIP
service2 := service.DeepCopy()
service2.Spec.ClusterIP = ""
service2.Name = "test-clusterip"
_, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service test-clusterip: %v", err)
}

// mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ...
service.Spec.ExternalName = "test"
service.Spec.Type = corev1.ServiceTypeExternalName
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service without ClusterIP: %v", err)
}

// mutate the Service to get the ClusterIP again
service.Spec.ExternalName = ""
service.Spec.ClusterIP = ""
service.Spec.Type = corev1.ServiceTypeClusterIP
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service with ClusterIP: %v", err)
}
}

0 comments on commit 0595ec7

Please sign in to comment.