Skip to content

Commit

Permalink
Prefer etcd endpoints configmap for storage URL discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
ironcladlou committed May 19, 2020
1 parent 924ea05 commit f419b9c
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/openshift/library-go/pkg/operator/v1helpers"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcdobserver"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcdendpoints"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/images"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/ingresses"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/project"
Expand Down Expand Up @@ -44,6 +44,7 @@ func NewConfigObserver(
ProxyLister_: configInformers.Config().V1().Proxies().Lister(),
IngressConfigLister: configInformers.Config().V1().Ingresses().Lister(),
EndpointsLister: kubeInformersForEtcdNamespace.Core().V1().Endpoints().Lister(),
ConfigmapLister: kubeInformersForEtcdNamespace.Core().V1().ConfigMaps().Lister(),
SecretLister_: kubeInformers.Core().V1().Secrets().Lister(),
PreRunCachesSynced: []cache.InformerSynced{
operatorConfigInformers.Operator().V1().OpenShiftAPIServers().Informer().HasSynced,
Expand All @@ -54,14 +55,18 @@ func NewConfigObserver(
configInformers.Config().V1().Ingresses().Informer().HasSynced,
kubeInformersForEtcdNamespace.Core().V1().Endpoints().Informer().HasSynced,
kubeInformers.Core().V1().Secrets().Informer().HasSynced,
kubeInformersForEtcdNamespace.Core().V1().ConfigMaps().Informer().HasSynced,
},
},
[]factory.Informer{operatorConfigInformers.Operator().V1().OpenShiftAPIServers().Informer()},
[]factory.Informer{
operatorConfigInformers.Operator().V1().OpenShiftAPIServers().Informer(),
kubeInformersForEtcdNamespace.Core().V1().ConfigMaps().Informer(),
},
images.ObserveInternalRegistryHostname,
images.ObserveExternalRegistryHostnames,
images.ObserveAllowedRegistriesForImport,
ingresses.ObserveIngressDomain,
etcdobserver.ObserveStorageURLs,
etcdendpoints.BackwardsCompatibleObserveStorageURLs,
libgoapiserver.ObserveTLSSecurityProfile,
project.ObserveProjectRequestMessage,
project.ObserveProjectRequestTemplateName,
Expand Down
109 changes: 109 additions & 0 deletions pkg/operator/configobservation/etcdendpoints/observe_etcd_endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package etcdendpoints

import (
"fmt"
"net"
"reflect"
"strings"

"github.com/openshift/library-go/pkg/operator/configobserver"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
endpointsobserver "github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcdobserver"
)

const (
etcdEndpointNamespace = "openshift-etcd"
etcdEndpointName = "etcd-endpoints"
)

// BackwardsCompatibleObserveStorageURLs will prefer to observe the new
// etcd-endpoints configmap, and will fall back to reading the endpoint
// resource only if the configmap doesn't yet exist.
func BackwardsCompatibleObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) {
listers := genericListers.(configobservation.Listers)
if _, err := listers.ConfigmapLister.ConfigMaps(etcdEndpointNamespace).Get(etcdEndpointName); err == nil {
return ObserveStorageURLs(genericListers, recorder, currentConfig)
} else if !errors.IsNotFound(err) {
return currentConfig, []error{err}
}
return endpointsobserver.ObserveStorageURLs(genericListers, recorder, currentConfig)
}

// ObserveStorageURLs observes the storage config URLs. If there is a problem observing the current storage config URLs,
// then the previously observed storage config URLs will be re-used.
func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) {
listers := genericListers.(configobservation.Listers)
storageConfigURLsPath := []string{"storageConfig", "urls"}
var errs []error

previouslyObservedConfig := map[string]interface{}{}
currentEtcdURLs, found, err := unstructured.NestedStringSlice(currentConfig, storageConfigURLsPath...)
if err != nil {
errs = append(errs, err)
}
if found {
if err := unstructured.SetNestedStringSlice(previouslyObservedConfig, currentEtcdURLs, storageConfigURLsPath...); err != nil {
errs = append(errs, err)
}
}

var etcdURLs []string
etcdEndpoints, err := listers.ConfigmapLister.ConfigMaps(etcdEndpointNamespace).Get(etcdEndpointName)
if errors.IsNotFound(err) {
recorder.Warningf("ObserveStorageFailed", "Required %s/%s configmap not found", etcdEndpointNamespace, etcdEndpointName)
return previouslyObservedConfig, append(errs, fmt.Errorf("configmaps/%s.%s: not found", etcdEndpointName, etcdEndpointNamespace))
}
if err != nil {
recorder.Warningf("ObserveStorageFailed", "Error getting %s/%s configmap: %v", etcdEndpointNamespace, etcdEndpointName, err)
return previouslyObservedConfig, append(errs, err)
}

// note: etcd bootstrap should never be added to the in-cluster kube-apiserver
// this can result in some early pods crashlooping, but ensures that we never contact the bootstrap machine from
// the in-cluster kube-apiserver so we can safely teardown out of order.

for k := range etcdEndpoints.Data {
address := etcdEndpoints.Data[k]
ip := net.ParseIP(address)
if ip == nil {
ipErr := fmt.Errorf("configmaps/%s in the %s namespace: %v is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, address)
errs = append(errs, ipErr)
continue
}
// skip placeholder ip addresses used in previous versions where the hostname was used instead
if strings.HasPrefix(ip.String(), "192.0.2.") || strings.HasPrefix(ip.String(), "2001:db8:") {
// not considered an error
continue
}
// use the canonical representation of the ip address (not original input) when constructing the url
if ip.To4() != nil {
etcdURLs = append(etcdURLs, fmt.Sprintf("https://%s:2379", ip))
} else {
etcdURLs = append(etcdURLs, fmt.Sprintf("https://[%s]:2379", ip))
}
}

if len(etcdURLs) == 0 {
emptyURLErr := fmt.Errorf("configmaps %s/%s: no etcd endpoint addresses found", etcdEndpointNamespace, etcdEndpointName)
recorder.Warning("ObserveStorageFailed", emptyURLErr.Error())
return previouslyObservedConfig, append(errs, emptyURLErr)
}

// always append `localhost` url
etcdURLs = append(etcdURLs, "https://localhost:2379")

observedConfig := map[string]interface{}{}
if err := unstructured.SetNestedStringSlice(observedConfig, etcdURLs, storageConfigURLsPath...); err != nil {
return previouslyObservedConfig, append(errs, err)
}

if !reflect.DeepEqual(currentEtcdURLs, etcdURLs) {
recorder.Eventf("ObserveStorageUpdated", "Updated storage urls to %s", strings.Join(etcdURLs, ","))
}

return observedConfig, errs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package etcdendpoints

import (
"encoding/base64"
"reflect"
"testing"

"github.com/openshift/library-go/pkg/operator/events"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/mergepatch"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
)

func TestObserveStorageURLs(t *testing.T) {
tests := []struct {
name string
currentConfig map[string]interface{}
expected map[string]interface{}
expectErrors bool
endpoint *v1.ConfigMap
}{
{
name: "NoEtcdHosts",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
expected: observedConfig(withStorageURL("https://previous.url:2379")),
expectErrors: true,
},
{
name: "ValidIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("10.0.0.1")),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379"), withLocalhostStorageURLs()),
},
{
name: "InvalidIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("10.0.0.1"),
withAddress("192.192.0.2.1"),
),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379"), withLocalhostStorageURLs()),
expectErrors: true,
},
{
name: "ValidIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C")),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379"), withLocalhostStorageURLs()),
},
{
name: "InvalidIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C:invalid"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379"), withLocalhostStorageURLs()),
expectErrors: true,
},
{
name: "FakeIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("10.0.0.1"),
withAddress("192.0.2.1"),
),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379"), withLocalhostStorageURLs()),
},
{
name: "FakeIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("2001:0DB8:0000:0CDE:1257:0000:211E:729C"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379"), withLocalhostStorageURLs()),
},
{
name: "ValidIPv4AsIPv6Literal",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("::ffff:a00:1")),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379"), withLocalhostStorageURLs()),
},
{
name: "FakeIPv4AsIPv6Literal",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("::ffff:c000:201"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379"), withLocalhostStorageURLs()),
},
{
name: "NoAddressesFound",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(),
expected: observedConfig(withLocalhostStorageURLs()),
expectErrors: true,
},
{
name: "OnlyFakeAddressesFound",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("192.0.2.1"),
withAddress("::ffff:c000:201"),
),
expected: observedConfig(withLocalhostStorageURLs()),
expectErrors: true,
},
{
name: "IgnoreBootstrap",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withBootstrap("10.0.0.2"),
withAddress("10.0.0.1"),
),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379"), withLocalhostStorageURLs()),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
lister := configobservation.Listers{
ConfigmapLister: corev1listers.NewConfigMapLister(indexer),
}
if tt.endpoint != nil {
if err := indexer.Add(tt.endpoint); err != nil {
t.Fatalf("error adding endpoint to store: %#v", err)
}
}
actual, errs := ObserveStorageURLs(lister, events.NewInMemoryRecorder("test"), tt.currentConfig)
if tt.expectErrors && len(errs) == 0 {
t.Errorf("errors expected")
}
if !tt.expectErrors && len(errs) != 0 {
t.Errorf("unexpected errors: %v", errs)
}
if !reflect.DeepEqual(actual, tt.expected) {
t.Errorf("ObserveStorageURLs() gotObservedConfig = %v, want %v", actual, tt.expected)
}
if t.Failed() {
t.Log("\n" + mergepatch.ToYAMLOrError(actual))
for _, err := range errs {
t.Log(err)
}
}
})
}
}

func observedConfig(configs ...func(map[string]interface{})) map[string]interface{} {
observedConfig := map[string]interface{}{}
for _, config := range configs {
config(observedConfig)
}
return observedConfig
}

func withStorageURL(url string) func(map[string]interface{}) {
return func(observedConfig map[string]interface{}) {
urls, _, _ := unstructured.NestedStringSlice(observedConfig, "storageConfig", "urls")
urls = append(urls, url)
_ = unstructured.SetNestedStringSlice(observedConfig, urls, "storageConfig", "urls")
}
}

func withLocalhostStorageURLs() func(map[string]interface{}) {
return func(observedConfig map[string]interface{}) {
urls, _, _ := unstructured.NestedStringSlice(observedConfig, "storageConfig", "urls")
urls = append(urls, "https://localhost:2379")
_ = unstructured.SetNestedStringSlice(observedConfig, urls, "storageConfig", "urls")
}
}

func endpoints(configs ...func(endpoints *v1.ConfigMap)) *v1.ConfigMap {
endpoints := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "etcd-endpoints",
Namespace: "openshift-etcd",
},
Data: map[string]string{},
}
for _, config := range configs {
config(endpoints)
}
return endpoints
}

func withBootstrap(ip string) func(*v1.ConfigMap) {
return func(endpoints *v1.ConfigMap) {
if endpoints.Annotations == nil {
endpoints.Annotations = map[string]string{}
}
endpoints.Annotations["alpha.installer.openshift.io/etcd-bootstrap"] = ip
}
}

func withAddress(ip string) func(*v1.ConfigMap) {
return func(endpoints *v1.ConfigMap) {
endpoints.Data[base64.StdEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(ip))] = ip
}
}
1 change: 1 addition & 0 deletions pkg/operator/configobservation/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Listers struct {
EndpointsLister corelistersv1.EndpointsLister
PreRunCachesSynced []cache.InformerSynced
SecretLister_ corelistersv1.SecretLister
ConfigmapLister corelistersv1.ConfigMapLister
}

func (l Listers) ResourceSyncer() resourcesynccontroller.ResourceSyncer {
Expand Down

0 comments on commit f419b9c

Please sign in to comment.