Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Federation] Use federated informer for service controller and annotations to store lb ingress #41258

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions federation/apis/federation/BUILD
Expand Up @@ -19,6 +19,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
Expand Down
16 changes: 16 additions & 0 deletions federation/apis/federation/types.go
Expand Up @@ -19,6 +19,7 @@ package federation
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
)

// ServerAddressByClientCIDR helps the client to determine the server address that they should use, depending on the clientCIDR that they match.
Expand Down Expand Up @@ -153,3 +154,18 @@ type ClusterReplicaSetPreferences struct {
// A number expressing the preference to put an additional replica to this LocalReplicaSet. 0 by default.
Weight int64
}

// Annotation for a federated service to keep record of service loadbalancer ingresses in federated cluster
type FederatedServiceIngress struct {
// List of loadbalancer ingress of a service in all federated clusters
// +optional
Items []ClusterServiceIngress `json:"items,omitempty"`
}

// Loadbalancer ingresses of a service within a federated cluster
type ClusterServiceIngress struct {
// Cluster is the name of the federated cluster
Cluster string `json:"cluster"`
// List of loadbalancer ingresses of a federated service within a federated cluster
Items []v1.LoadBalancerIngress `json:"items"`
}
35 changes: 35 additions & 0 deletions federation/apis/federation/zz_generated.deepcopy.go
Expand Up @@ -25,6 +25,7 @@ import (
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
api "k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
reflect "reflect"
)

Expand All @@ -40,9 +41,11 @@ func RegisterDeepCopies(scheme *runtime.Scheme) error {
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterCondition, InType: reflect.TypeOf(&ClusterCondition{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterList, InType: reflect.TypeOf(&ClusterList{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterReplicaSetPreferences, InType: reflect.TypeOf(&ClusterReplicaSetPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterServiceIngress, InType: reflect.TypeOf(&ClusterServiceIngress{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterSpec, InType: reflect.TypeOf(&ClusterSpec{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterStatus, InType: reflect.TypeOf(&ClusterStatus{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedReplicaSetPreferences, InType: reflect.TypeOf(&FederatedReplicaSetPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedServiceIngress, InType: reflect.TypeOf(&FederatedServiceIngress{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ServerAddressByClientCIDR, InType: reflect.TypeOf(&ServerAddressByClientCIDR{})},
)
}
Expand Down Expand Up @@ -110,6 +113,20 @@ func DeepCopy_federation_ClusterReplicaSetPreferences(in interface{}, out interf
}
}

func DeepCopy_federation_ClusterServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*ClusterServiceIngress)
out := out.(*ClusterServiceIngress)
*out = *in
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]api_v1.LoadBalancerIngress, len(*in))
copy(*out, *in)
}
return nil
}
}

func DeepCopy_federation_ClusterSpec(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*ClusterSpec)
Expand Down Expand Up @@ -172,6 +189,24 @@ func DeepCopy_federation_FederatedReplicaSetPreferences(in interface{}, out inte
}
}

func DeepCopy_federation_FederatedServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*FederatedServiceIngress)
out := out.(*FederatedServiceIngress)
*out = *in
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ClusterServiceIngress, len(*in))
for i := range *in {
if err := DeepCopy_federation_ClusterServiceIngress(&(*in)[i], &(*out)[i], c); err != nil {
return err
}
}
}
return nil
}
}

func DeepCopy_federation_ServerAddressByClientCIDR(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*ServerAddressByClientCIDR)
Expand Down
17 changes: 17 additions & 0 deletions federation/pkg/federation-controller/service/BUILD
Expand Up @@ -15,11 +15,13 @@ go_library(
"dns.go",
"doc.go",
"endpoint_helper.go",
"ingress.go",
"service_helper.go",
"servicecontroller.go",
],
tags = ["automanaged"],
deps = [
"//federation/apis/federation:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/cache:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library",
Expand All @@ -36,8 +38,10 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
Expand All @@ -46,6 +50,7 @@ go_library(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
Expand All @@ -62,10 +67,22 @@ go_test(
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

Expand Down
59 changes: 33 additions & 26 deletions federation/pkg/federation-controller/service/dns.go
Expand Up @@ -24,8 +24,10 @@ import (

"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
"k8s.io/kubernetes/pkg/api/v1"
)

const (
Expand All @@ -34,24 +36,32 @@ const (
)

// getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error)
func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedService *cachedService) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
var (
zoneNames []string
regionName string
)
if zoneNames, regionName, err = s.getClusterZoneNames(clusterName); err != nil {
return nil, nil, nil, err
}
for lbClusterName, lbStatus := range cachedService.serviceStatusMap {

// If federated service is deleted, return empty endpoints, so that DNS records are removed
if service.DeletionTimestamp != nil {
return zoneEndpoints, regionEndpoints, globalEndpoints, nil
}

serviceIngress, err := ParseFederatedServiceIngress(service)
if err != nil {
return nil, nil, nil, err
}

for _, lbClusterIngress := range serviceIngress.Items {
lbClusterName := lbClusterIngress.Cluster
lbZoneNames, lbRegionName, err := s.getClusterZoneNames(lbClusterName)
if err != nil {
return nil, nil, nil, err
}
for _, ingress := range lbStatus.Ingress {
readyEndpoints, ok := cachedService.endpointMap[lbClusterName]
if !ok || readyEndpoints == 0 {
continue
}
for _, ingress := range lbClusterIngress.Items {
var address string
// We should get either an IP address or a hostname - use whichever one we get
if ingress.IP != "" {
Expand All @@ -61,7 +71,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
}
if len(address) <= 0 {
return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.",
cachedService.lastState.Name, cachedService.lastState.Namespace, clusterName)
service.Name, service.Namespace, clusterName)
}
for _, lbZoneName := range lbZoneNames {
for _, zoneName := range zoneNames {
Expand All @@ -80,15 +90,12 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
}

// getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS)
func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []string, region string, err error) {
client, ok := s.clusterCache.clientMap[clusterName]
if !ok {
return nil, "", fmt.Errorf("Cluster cache does not contain entry for cluster %s", clusterName)
}
if client.cluster == nil {
return nil, "", fmt.Errorf("Cluster cache entry for cluster %s is nil", clusterName)
func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, string, error) {
cluster, err := s.federationClient.Federation().Clusters().Get(clusterName, metav1.GetOptions{})
if err != nil {
return nil, "", err
}
return client.cluster.Status.Zones, client.cluster.Status.Region, nil
return cluster.Status.Zones, cluster.Status.Region, nil
}

// getServiceDnsSuffix returns the DNS suffix to use when creating federated-service DNS records
Expand Down Expand Up @@ -284,7 +291,7 @@ given the current state of that service in that cluster. This should be called
(or vice versa). Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint
are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed. Also, if no shards exist
in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist. */
func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *cachedService) error {
func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Service) error {
// Quinton: Pseudocode....
// See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648
// For each service we need the following DNS names:
Expand All @@ -298,21 +305,21 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// - a set of A records to IP addresses of all healthy shards in all regions, if one or more of these exist.
// - no record (NXRECORD response) if no healthy shards exist in any regions
//
// For each cached service, cachedService.lastState tracks the current known state of the service, while cachedService.appliedState contains
// the state of the service when we last successfully synced its DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records).
// Each service has the current known state of loadbalancer ingress for the federated cluster stored in annotations.
// So generate the DNS records based on the current state and ensure those desired DNS records match the
// actual DNS records (add new records, remove deleted records, and update changed records).
//
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
}
if s.dns == nil {
return nil
}
if cachedService == nil {
return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
if service == nil {
return fmt.Errorf("nil service passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
}
serviceName := cachedService.lastState.Name
namespaceName := cachedService.lastState.Namespace
serviceName := service.Name
namespaceName := service.Namespace
zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
if err != nil {
return err
Expand All @@ -324,7 +331,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
if err != nil {
return err
}
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, cachedService)
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service)
if err != nil {
return err
}
Expand Down