Skip to content

Commit

Permalink
Replace etcd endpoint representation with configmap
Browse files Browse the repository at this point in the history
Kube service design asserts `endpoint` resources cannot exist without a
corresponding `service` resource, and Kube will actively delete the endpoint
when the service is deleted or if Kube detects the endpoint is a "stray".

The operator needs to:

1. Manage etcd endpoint state atomically.
2. Maintain exclusive ownership of the etcd endpoint state resource.

Altogether this makes the `endpoint` resource inappropriate for the task. The
competition between the operator and the Kube endpoints controller to manage the
endpoint has led to instability.

To resolve the problems, persist etcd endpoint state in a `configmap`.

Maintain compatibility by continuing to write the `endpoint`, and update
consuming components to prefer the `configmap` over the `endpoint`.

Also requires:
openshift/cluster-kube-apiserver-operator#859
openshift/cluster-openshift-apiserver-operator#364
  • Loading branch information
ironcladlou committed May 18, 2020
1 parent ec550f4 commit c52f9e7
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 22 deletions.
7 changes: 7 additions & 0 deletions bindata/bootkube/manifests/00_etcd-endpoints-cm.yaml
@@ -0,0 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: etcd-endpoints
namespace: openshift-etcd
annotations:
alpha.installer.openshift.io/etcd-bootstrap: {{ .BootstrapIP }}
32 changes: 16 additions & 16 deletions pkg/etcdcli/etcdcli.go
Expand Up @@ -32,13 +32,13 @@ import (
const BootstrapIPAnnotationKey = "alpha.installer.openshift.io/etcd-bootstrap"

type etcdClientGetter struct {
nodeLister corev1listers.NodeLister
endpointsLister corev1listers.EndpointsLister
networkLister configv1listers.NetworkLister
nodeLister corev1listers.NodeLister
configmapsLister corev1listers.ConfigMapLister
networkLister configv1listers.NetworkLister

nodeListerSynced cache.InformerSynced
endpointsListerSynced cache.InformerSynced
networkListerSynced cache.InformerSynced
nodeListerSynced cache.InformerSynced
configmapsListerSynced cache.InformerSynced
networkListerSynced cache.InformerSynced

eventRecorder events.Recorder

Expand All @@ -49,13 +49,13 @@ type etcdClientGetter struct {

func NewEtcdClient(kubeInformers v1helpers.KubeInformersForNamespaces, networkInformer configv1informers.NetworkInformer, eventRecorder events.Recorder) EtcdClient {
return &etcdClientGetter{
nodeLister: kubeInformers.InformersFor("").Core().V1().Nodes().Lister(),
endpointsLister: kubeInformers.InformersFor(operatorclient.TargetNamespace).Core().V1().Endpoints().Lister(),
networkLister: networkInformer.Lister(),
nodeListerSynced: kubeInformers.InformersFor("").Core().V1().Nodes().Informer().HasSynced,
endpointsListerSynced: kubeInformers.InformersFor(operatorclient.TargetNamespace).Core().V1().Endpoints().Informer().HasSynced,
networkListerSynced: networkInformer.Informer().HasSynced,
eventRecorder: eventRecorder.WithComponentSuffix("etcd-client"),
nodeLister: kubeInformers.InformersFor("").Core().V1().Nodes().Lister(),
configmapsLister: kubeInformers.InformersFor(operatorclient.TargetNamespace).Core().V1().ConfigMaps().Lister(),
networkLister: networkInformer.Lister(),
nodeListerSynced: kubeInformers.InformersFor("").Core().V1().Nodes().Informer().HasSynced,
configmapsListerSynced: kubeInformers.InformersFor(operatorclient.TargetNamespace).Core().V1().ConfigMaps().Informer().HasSynced,
networkListerSynced: networkInformer.Informer().HasSynced,
eventRecorder: eventRecorder.WithComponentSuffix("etcd-client"),
}
}

Expand All @@ -65,7 +65,7 @@ func (g *etcdClientGetter) getEtcdClient() (*clientv3.Client, error) {
if !g.nodeListerSynced() {
return nil, fmt.Errorf("node lister not synced")
}
if !g.endpointsListerSynced() {
if !g.configmapsListerSynced() {
return nil, fmt.Errorf("node lister not synced")
}
if !g.networkListerSynced() {
Expand All @@ -87,13 +87,13 @@ func (g *etcdClientGetter) getEtcdClient() (*clientv3.Client, error) {
etcdEndpoints = append(etcdEndpoints, fmt.Sprintf("https://%s:2379", internalIP))
}

hostEtcd, err := g.endpointsLister.Endpoints(operatorclient.TargetNamespace).Get("host-etcd-2")
hostEtcd, err := g.configmapsLister.ConfigMaps(operatorclient.TargetNamespace).Get("etcd-endpoints")
if err != nil {
return nil, err
}
bootstrapIP, ok := hostEtcd.Annotations[BootstrapIPAnnotationKey]
if !ok {
klog.V(2).Infof("service/host-etcd-2 is missing annotation %s", BootstrapIPAnnotationKey)
klog.V(2).Infof("configmaps/etcd-endpoints is missing annotation %s", BootstrapIPAnnotationKey)
}
if bootstrapIP != "" {
// escape if IPv6
Expand Down
6 changes: 3 additions & 3 deletions pkg/etcdenvvar/envvarcontroller.go
Expand Up @@ -32,7 +32,7 @@ type EnvVarController struct {

infrastructureLister configv1listers.InfrastructureLister
networkLister configv1listers.NetworkLister
endpointLister corev1listers.EndpointsLister
configmapLister corev1listers.ConfigMapLister
nodeLister corev1listers.NodeLister

// queue only ever has one item, but it has nice error handling backoff/retry semantics
Expand All @@ -57,7 +57,7 @@ func NewEnvVarController(
operatorClient: operatorClient,
infrastructureLister: infrastructureInformer.Lister(),
networkLister: networkInformer.Lister(),
endpointLister: kubeInformersForNamespaces.InformersFor(operatorclient.TargetNamespace).Core().V1().Endpoints().Lister(),
configmapLister: kubeInformersForNamespaces.InformersFor(operatorclient.TargetNamespace).Core().V1().ConfigMaps().Lister(),
nodeLister: kubeInformersForNamespaces.InformersFor("").Core().V1().Nodes().Lister(),
targetImagePullSpec: targetImagePullSpec,

Expand Down Expand Up @@ -132,7 +132,7 @@ func (c *EnvVarController) checkEnvVars() error {
targetImagePullSpec: c.targetImagePullSpec,
spec: *operatorSpec,
status: *operatorStatus,
endpointLister: c.endpointLister,
configmapLister: c.configmapLister,
nodeLister: c.nodeLister,
infrastructureLister: c.infrastructureLister,
networkLister: c.networkLister,
Expand Down
6 changes: 3 additions & 3 deletions pkg/etcdenvvar/etcd_env.go
Expand Up @@ -21,7 +21,7 @@ type envVarContext struct {
nodeLister corev1listers.NodeLister
infrastructureLister configv1listers.InfrastructureLister
networkLister configv1listers.NetworkLister
endpointLister corev1listers.EndpointsLister
configmapLister corev1listers.ConfigMapLister
targetImagePullSpec string
}

Expand Down Expand Up @@ -132,11 +132,11 @@ func getEtcdGrpcEndpoints(envVarContext envVarContext) (string, error) {
endpoints = append(endpoints, fmt.Sprintf("https://%s:2379", endpointIP))
}

hostEtcdEndpoints, err := envVarContext.endpointLister.Endpoints(operatorclient.TargetNamespace).Get("host-etcd-2")
etcdEndpoints, err := envVarContext.configmapLister.ConfigMaps(operatorclient.TargetNamespace).Get("etcd-endpoints")
if err != nil {
return "", err
}
if bootstrapIP := hostEtcdEndpoints.Annotations["alpha.installer.openshift.io/etcd-bootstrap"]; len(bootstrapIP) > 0 {
if bootstrapIP := etcdEndpoints.Annotations["alpha.installer.openshift.io/etcd-bootstrap"]; len(bootstrapIP) > 0 {
urlHost, err := dnshelpers.GetURLHostForIP(bootstrapIP)
if err != nil {
return "", err
Expand Down
126 changes: 126 additions & 0 deletions pkg/operator/etcdendpointscontroller/etcdendpointscontroller.go
@@ -0,0 +1,126 @@
package etcdendpointscontroller

import (
"context"
"encoding/base64"
"fmt"
"time"

operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/v1helpers"
operatorv1helpers "github.com/openshift/library-go/pkg/operator/v1helpers"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"

"github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient"
)

// EtcdEndpointsController maintains a configmap resource with
// IP addresses for etcd. It should never depend on DNS directly or transitively.
type EtcdEndpointsController struct {
operatorClient v1helpers.OperatorClient
nodeLister corev1listers.NodeLister
configmapLister corev1listers.ConfigMapLister
configmapClient corev1client.ConfigMapsGetter
}

func NewEtcdEndpointsController(
operatorClient v1helpers.OperatorClient,
eventRecorder events.Recorder,
kubeClient kubernetes.Interface,
kubeInformers operatorv1helpers.KubeInformersForNamespaces,
) factory.Controller {
kubeInformersForTargetNamespace := kubeInformers.InformersFor(operatorclient.TargetNamespace)
configmapsInformer := kubeInformersForTargetNamespace.Core().V1().ConfigMaps()
kubeInformersForCluster := kubeInformers.InformersFor("")
nodeInformer := kubeInformersForCluster.Core().V1().Nodes()

c := &EtcdEndpointsController{
operatorClient: operatorClient,
nodeLister: nodeInformer.Lister(),
configmapLister: configmapsInformer.Lister(),
configmapClient: kubeClient.CoreV1(),
}
return factory.New().ResyncEvery(time.Minute).WithInformers(
operatorClient.Informer(),
configmapsInformer.Informer(),
nodeInformer.Informer(),
).WithSync(c.sync).ToController("EtcdEndpointsController", eventRecorder.WithComponentSuffix("etcd-endpoints-controller"))
}

func (c *EtcdEndpointsController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
err := c.syncConfigMap(ctx, syncCtx.Recorder())

if err != nil {
_, _, updateErr := v1helpers.UpdateStatus(c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "EtcdEndpointsDegraded",
Status: operatorv1.ConditionTrue,
Reason: "ErrorUpdatingEtcdEndpoints",
Message: err.Error(),
}))
if updateErr != nil {
syncCtx.Recorder().Warning("EtcdEndpointsErrorUpdatingStatus", updateErr.Error())
}
return err
}

_, _, updateErr := v1helpers.UpdateStatus(c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "EtcdEndpointsDegraded",
Status: operatorv1.ConditionFalse,
Reason: "EtcdEndpointsUpdated",
}))
if updateErr != nil {
syncCtx.Recorder().Warning("EtcdEndpointsErrorUpdatingStatus", updateErr.Error())
return updateErr
}
return nil
}

func (c *EtcdEndpointsController) syncConfigMap(ctx context.Context, recorder events.Recorder) error {
required := configMapAsset()

// create endpoint addresses for each node
nodes, err := c.nodeLister.List(labels.Set{"node-role.kubernetes.io/master": ""}.AsSelector())
if err != nil {
return fmt.Errorf("unable to list expected etcd member nodes: %v", err)
}
endpointAddresses := map[string]string{}
for _, node := range nodes {
var nodeInternalIP string
for _, nodeAddress := range node.Status.Addresses {
if nodeAddress.Type == corev1.NodeInternalIP {
nodeInternalIP = nodeAddress.Address
break
}
}
if len(nodeInternalIP) == 0 {
return fmt.Errorf("unable to determine internal ip address for node %s", node.Name)
}
endpointAddresses[base64.StdEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(nodeInternalIP))] = nodeInternalIP
}

if len(endpointAddresses) == 0 {
return fmt.Errorf("no master nodes are present")
}

required.Data = endpointAddresses

_, _, err = resourceapply.ApplyConfigMap(c.configmapClient, recorder, required)
return err
}

func configMapAsset() *corev1.ConfigMap {
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "etcd-endpoints",
Namespace: operatorclient.TargetNamespace,
},
}
}
8 changes: 8 additions & 0 deletions pkg/operator/starter.go
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/openshift/cluster-etcd-operator/pkg/operator/configobservation/configobservercontroller"
"github.com/openshift/cluster-etcd-operator/pkg/operator/etcd_assets"
"github.com/openshift/cluster-etcd-operator/pkg/operator/etcdcertsigner"
"github.com/openshift/cluster-etcd-operator/pkg/operator/etcdendpointscontroller"
"github.com/openshift/cluster-etcd-operator/pkg/operator/etcdmemberipmigrator"
"github.com/openshift/cluster-etcd-operator/pkg/operator/etcdmemberscontroller"
"github.com/openshift/cluster-etcd-operator/pkg/operator/hostendpointscontroller2"
Expand Down Expand Up @@ -191,6 +192,12 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle
coreClient,
kubeInformersForNamespaces,
)
etcdEndpointsController := etcdendpointscontroller.NewEtcdEndpointsController(
operatorClient,
controllerContext.EventRecorder,
coreClient,
kubeInformersForNamespaces,
)

clusterMemberController := clustermembercontroller.NewClusterMemberController(
operatorClient,
Expand Down Expand Up @@ -242,6 +249,7 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle
go targetConfigReconciler.Run(ctx, 1)
go etcdCertSignerController.Run(ctx, 1)
go hostEtcdEndpointController2.Run(ctx, 1)
go etcdEndpointsController.Run(ctx, 1)
go resourceSyncController.Run(ctx, 1)
go statusController.Run(ctx, 1)
go configObserver.Run(ctx, 1)
Expand Down

0 comments on commit c52f9e7

Please sign in to comment.