Skip to content

Commit

Permalink
Reconcile KAS endpoints and endpoint slice
Browse files Browse the repository at this point in the history
  • Loading branch information
rtheis committed May 9, 2024
1 parent 983b05b commit 225b286
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func generateConfig(p KubeAPIServerConfigParams) *kcpv1.KubeAPIServerConfig {
}
args.Set("enable-aggregator-routing", "true")
args.Set("enable-logs-handler", "false")
// TODO(rtheis): Move to "none" once the hosted cluster config operator is reconciling
// the Kubernetes API server endpoints and endpointslice. This will ensure that there
// is a smooth highly available transition off the lease reconciler.
args.Set("endpoint-reconciler-type", "lease")
args.Set("etcd-cafile", cpath(kasVolumeEtcdCA().Name, certs.CASignerCertMapKey))
args.Set("etcd-certfile", cpath(kasVolumeEtcdClientCert().Name, pki.EtcdClientCrtKey))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package kas

import (
"github.com/openshift/hypershift/support/util"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/utils/ptr"
)

func ReconcileKASEndpoints(endpoints *corev1.Endpoints, address string, port int32) {
endpoints.Labels = map[string]string{
discoveryv1.LabelSkipMirror: "true",
}
endpoints.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: address,
}},
Ports: []corev1.EndpointPort{{
Name: "https",
Port: port,
Protocol: corev1.ProtocolTCP,
}},
}}
}

func ReconcileKASEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, address string, port int32) {
endpointSlice.Labels = map[string]string{
discoveryv1.LabelServiceName: "kubernetes",
}
ipv4, err := util.IsIPv4(address)
if err != nil || ipv4 {
endpointSlice.AddressType = discoveryv1.AddressTypeIPv4
} else {
endpointSlice.AddressType = discoveryv1.AddressTypeIPv6
}
endpointSlice.Endpoints = []discoveryv1.Endpoint{{
Addresses: []string{
address,
},
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
}}
endpointSlice.Ports = []discoveryv1.EndpointPort{{
Name: ptr.To("https"),
Port: ptr.To(port),
Protocol: ptr.To(corev1.ProtocolTCP),
}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,3 @@ func InstallConfigConfigMap() *corev1.ConfigMap {
},
}
}

func APIServerEndpoints() *corev1.Endpoints {
return &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "kubernetes",
},
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package manifests

import (
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func KASEndpoints() *corev1.Endpoints {
return &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "kubernetes",
Namespace: corev1.NamespaceDefault,
},
}
}

func KASEndpointSlice() *discoveryv1.EndpointSlice {
return &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "kubernetes",
Namespace: corev1.NamespaceDefault,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -52,6 +53,7 @@ import (
ccm "github.com/openshift/hypershift/control-plane-operator/hostedclusterconfigoperator/controllers/resources/cloudcontrollermanager/azure"
"github.com/openshift/hypershift/control-plane-operator/hostedclusterconfigoperator/controllers/resources/crd"
"github.com/openshift/hypershift/control-plane-operator/hostedclusterconfigoperator/controllers/resources/ingress"
"github.com/openshift/hypershift/control-plane-operator/hostedclusterconfigoperator/controllers/resources/kas"
"github.com/openshift/hypershift/control-plane-operator/hostedclusterconfigoperator/controllers/resources/konnectivity"
"github.com/openshift/hypershift/control-plane-operator/hostedclusterconfigoperator/controllers/resources/kubeadminpassword"
"github.com/openshift/hypershift/control-plane-operator/hostedclusterconfigoperator/controllers/resources/manifests"
Expand Down Expand Up @@ -206,6 +208,7 @@ func Setup(ctx context.Context, opts *operator.HostedClusterConfigOperatorConfig
&admissionregistrationv1.ValidatingWebhookConfiguration{},
&prometheusoperatorv1.PrometheusRule{},
&operatorv1.IngressController{},
&discoveryv1.EndpointSlice{},
}
for _, r := range resourcesToWatch {
if err := c.Watch(source.Kind(opts.Manager.GetCache(), r), eventHandler()); err != nil {
Expand Down Expand Up @@ -260,23 +263,14 @@ func (r *reconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result
errs = append(errs, fmt.Errorf("failed to reconcile crds: %w", err))
}

// We only keep reconciling the endpoint for existing clusters that are relying on this for nodes haproxy to work.
// Otherwise, changing the haproxy config to !=443 would result in a NodePool rollout which want to avoid for existing clusters.
// Existing clusters are given the *hcp.Spec.Networking.APIServer.Port == 443 semantic as we were enforcing this default previously,
// and it's a now a forbidden operation.
if hcp.Spec.Networking.APIServer != nil && hcp.Spec.Networking.APIServer.Port != nil &&
*hcp.Spec.Networking.APIServer.Port == 443 {
log.Info("reconciling kubernetes.default endpoints")
endpoints := manifests.APIServerEndpoints()
if _, err := r.CreateOrUpdate(ctx, r.client, endpoints, func() error {
if len(endpoints.Subsets) == 0 || len(endpoints.Subsets[0].Ports) == 0 {
return nil
}
endpoints.Subsets[0].Ports[0].Port = 443
return nil
}); err != nil {
errs = append(errs, fmt.Errorf("failed to reconcile kubernetes.default endpoints: %w", err))
}
// Clusters with "none" as their Kubernetes API server endpoint reconciliation
// type must manually manage the Kubernetes endpoints and endpointslice resources.
// Due to recent Kubernetes changes, we need to reconcile these resources to avoid
// problems such as [1].
// [1] https://github.com/kubernetes/kubernetes/issues/118777
log.Info("reconciling kubernetes.default endpoints and endpointslice")
if err := r.reconcileKASEndpoints(ctx, hcp); err != nil {
errs = append(errs, fmt.Errorf("failed to reconcile kubernetes.default endpoints and endpointslice: %w", err))
}

log.Info("reconciling install configmap")
Expand Down Expand Up @@ -1163,6 +1157,39 @@ func (r *reconciler) reconcileOpenshiftOAuthAPIServerAPIServices(ctx context.Con
return errors.NewAggregate(errs)
}

func (r *reconciler) reconcileKASEndpoints(ctx context.Context, hcp *hyperv1.HostedControlPlane) error {
var errs []error

kasAdvertiseAddress := util.GetAdvertiseAddress(hcp, config.DefaultAdvertiseIPv4Address, config.DefaultAdvertiseIPv6Address)
kasEndpointsPort := util.KASPodPort(hcp)
kasEndpointSlicePort := kasEndpointsPort

// We only keep reconciling the endpoint for existing clusters that are relying on this for nodes haproxy to work.
// Otherwise, changing the haproxy config to !=443 would result in a NodePool rollout which want to avoid for existing clusters.
// Existing clusters are given the *hcp.Spec.Networking.APIServer.Port == 443 semantic as we were enforcing this default previously,
// and it's a now a forbidden operation.
if hcp.Spec.Networking.APIServer != nil && hcp.Spec.Networking.APIServer.Port != nil &&
*hcp.Spec.Networking.APIServer.Port == 443 {
kasEndpointsPort = 443
}

kasEndpoints := manifests.KASEndpoints()
if _, err := r.CreateOrUpdate(ctx, r.client, kasEndpoints, func() error {
kas.ReconcileKASEndpoints(kasEndpoints, kasAdvertiseAddress, kasEndpointsPort)
return nil
}); err != nil {
errs = append(errs, fmt.Errorf("failed to reconcile kubernetes.default endpoints: %w", err))
}
kasEndpointSlice := manifests.KASEndpointSlice()
if _, err := r.CreateOrUpdate(ctx, r.client, kasEndpointSlice, func() error {
kas.ReconcileKASEndpointSlice(kasEndpointSlice, kasAdvertiseAddress, kasEndpointSlicePort)
return nil
}); err != nil {
errs = append(errs, fmt.Errorf("failed to reconcile kubernetes.default endpoint slice: %w", err))
}
return errors.NewAggregate(errs)
}

func (r *reconciler) reconcileOpenshiftAPIServerEndpoints(ctx context.Context, hcp *hyperv1.HostedControlPlane) error {
cpService := manifests.OpenShiftAPIServerService(hcp.Namespace)
if err := r.cpClient.Get(ctx, client.ObjectKeyFromObject(cpService), cpService); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion support/util/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,14 @@ func AllowedCIDRBlocks(hcp *hyperv1.HostedControlPlane) []hyperv1.CIDRBlock {

func GetAdvertiseAddress(hcp *hyperv1.HostedControlPlane, ipv4DefaultAddress, ipv6DefaultAddress string) string {
var advertiseAddress string
var ipv4 bool
var err error

ipv4, err := IsIPv4(hcp.Spec.Networking.ServiceNetwork[0].CIDR.String())
if len(hcp.Spec.Networking.ServiceNetwork) > 0 {
ipv4, err = IsIPv4(hcp.Spec.Networking.ServiceNetwork[0].CIDR.String())
} else {
ipv4 = true
}
if err != nil || ipv4 {
if address := AdvertiseAddressWithDefault(hcp, ipv4DefaultAddress); len(address) > 0 {
advertiseAddress = address
Expand Down
11 changes: 11 additions & 0 deletions support/util/networking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func TestGetAdvertiseAddress(t *testing.T) {
},
want: DefaultAdvertiseIPv6Address,
},
{
name: "given no ServiceNetwork CIDR in the HCP, it should return IPv4 based default address",
hcp: &hyperv1.HostedControlPlane{
Spec: hyperv1.HostedControlPlaneSpec{
Networking: hyperv1.ClusterNetworking{
ServiceNetwork: []hyperv1.ServiceNetworkEntry{},
},
},
},
want: DefaultAdvertiseIPv4Address,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 225b286

Please sign in to comment.