Skip to content

Commit

Permalink
observe storage urls from host-etcd endpoint address ips
Browse files Browse the repository at this point in the history
  • Loading branch information
sanchezl committed Feb 20, 2020
1 parent 57d49d7 commit ab08c50
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/openshift/library-go/pkg/operator/v1helpers"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcdobserver"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/images"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/ingresses"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/project"
Expand All @@ -28,6 +29,7 @@ type ConfigObserver struct {
// NewConfigObserver initializes a new configuration observer.
func NewConfigObserver(
kubeInformers kubeinformers.SharedInformerFactory,
kubeInformersForEtcdNamespace kubeinformers.SharedInformerFactory,
operatorClient v1helpers.OperatorClient,
resourceSyncer resourcesynccontroller.ResourceSyncer,
operatorConfigInformers operatorv1informers.SharedInformerFactory,
Expand All @@ -45,6 +47,7 @@ func NewConfigObserver(
ProjectConfigLister: configInformers.Config().V1().Projects().Lister(),
ProxyLister_: configInformers.Config().V1().Proxies().Lister(),
IngressConfigLister: configInformers.Config().V1().Ingresses().Lister(),
EndpointsLister: kubeInformersForEtcdNamespace.Core().V1().Endpoints().Lister(),
SecretLister_: kubeInformers.Core().V1().Secrets().Lister(),
PreRunCachesSynced: []cache.InformerSynced{
operatorConfigInformers.Operator().V1().OpenShiftAPIServers().Informer().HasSynced,
Expand All @@ -58,6 +61,7 @@ func NewConfigObserver(
images.ObserveExternalRegistryHostnames,
images.ObserveAllowedRegistriesForImport,
ingresses.ObserveIngressDomain,
etcdobserver.ObserveStorageURLs,
libgoapiserver.ObserveTLSSecurityProfile,
project.ObserveProjectRequestMessage,
project.ObserveProjectRequestTemplateName,
Expand All @@ -70,5 +74,6 @@ func NewConfigObserver(
configInformers.Config().V1().Ingresses().Informer().AddEventHandler(c.EventHandler())
configInformers.Config().V1().Projects().Informer().AddEventHandler(c.EventHandler())
configInformers.Config().V1().Proxies().Informer().AddEventHandler(c.EventHandler())
kubeInformersForEtcdNamespace.Core().V1().Endpoints().Informer().AddEventHandler(c.EventHandler())
return c
}
96 changes: 96 additions & 0 deletions pkg/operator/configobservation/etcdobserver/observe_etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package etcdobserver

import (
"fmt"
"net"
"reflect"
"sort"
"strings"

"github.com/openshift/library-go/pkg/operator/configobserver"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/operatorclient"
)

const (
etcdEndpointNamespace = operatorclient.EtcdEndpointNamespace
etcdEndpointName = operatorclient.EtcdEndpointName
)

// ObserveStorageURLs observes the storage URL config.
func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) {
listers := genericListers.(configobservation.Listers)
storageConfigURLsPath := []string{"storageConfig", "urls"}
var errs []error

previouslyObservedConfig := map[string]interface{}{}
currentEtcdURLs, _, err := unstructured.NestedStringSlice(currentConfig, storageConfigURLsPath...)
if err != nil {
errs = append(errs, err)
}
if len(currentEtcdURLs) > 0 {
if err := unstructured.SetNestedStringSlice(previouslyObservedConfig, currentEtcdURLs, storageConfigURLsPath...); err != nil {
errs = append(errs, err)
}
}

var etcdURLs sort.StringSlice
etcdEndpoints, err := listers.EndpointsLister.Endpoints(etcdEndpointNamespace).Get(etcdEndpointName)
if errors.IsNotFound(err) {
recorder.Warningf("ObserveStorageFailed", "Required endpoints/%s in the %s namespace not found", etcdEndpointName, etcdEndpointNamespace)
errs = append(errs, fmt.Errorf("endpoints/%s in the %s namespace: not found", etcdEndpointName, etcdEndpointNamespace))
return previouslyObservedConfig, errs
}
if err != nil {
recorder.Warningf("ObserveStorageFailed", "Error getting endpoints/%s in the %s namespace: %v", etcdEndpointName, etcdEndpointNamespace, err)
errs = append(errs, err)
return previouslyObservedConfig, errs
}

for subsetIndex, subset := range etcdEndpoints.Subsets {
for addressIndex, address := range subset.Addresses {
ip := net.ParseIP(address.IP)
if ip == nil {
ipErr := fmt.Errorf("endpoints/%s in the %s namespace: subsets[%v]addresses[%v].IP is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, subsetIndex, addressIndex)
errs = append(errs, ipErr)
continue
}
// skip placeholder ip addresses used in previous versions where the hostname was used instead
if strings.HasPrefix(ip.String(), "192.0.2.") || strings.HasPrefix(ip.String(), "2001:db8:") {
// not considered an error
continue
}
// use the canonical representation of the ip address (not original input) when constructing the url
if ip.To4() != nil {
etcdURLs = append(etcdURLs, fmt.Sprintf("https://%s:2379", ip))
} else {
etcdURLs = append(etcdURLs, fmt.Sprintf("https://[%s]:2379", ip))
}
}
}

observedConfig := map[string]interface{}{}

// do not add empty storage urls slice to observed config, we don't want override defaults with an empty slice
if len(etcdURLs) > 0 {
etcdURLs.Sort()
if err := unstructured.SetNestedStringSlice(observedConfig, etcdURLs, storageConfigURLsPath...); err != nil {
errs = append(errs, err)
return previouslyObservedConfig, errs
}
} else {
err := fmt.Errorf("endpoints/%s in the %s namespace: no etcd endpoint addresses found, falling back to default etcd service", etcdEndpointName, etcdEndpointNamespace)
recorder.Warningf("ObserveStorageFallback", err.Error())
errs = append(errs, err)
}

if !reflect.DeepEqual(currentEtcdURLs, etcdURLs) {
recorder.Eventf("ObserveStorageUpdated", "Updated storage urls to %s", strings.Join(etcdURLs, ","))
}

return observedConfig, errs
}
188 changes: 188 additions & 0 deletions pkg/operator/configobservation/etcdobserver/observe_etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package etcdobserver

import (
"reflect"
"testing"

"github.com/openshift/library-go/pkg/operator/events"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/mergepatch"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
)

const clusterFQDN = "foo.bar"

func TestObserveStorageURLs(t *testing.T) {
tests := []struct {
name string
currentConfig map[string]interface{}
expected map[string]interface{}
expectErrors bool
endpoint *v1.Endpoints
}{
{
name: "NoEtcdHosts",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
expected: observedConfig(withStorageURL("https://previous.url:2379")),
expectErrors: true,
},
{
name: "ValidIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("test", "10.0.0.1")),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
},
{
name: "InvalidIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "10.0.0.1"),
withAddress("test-1", "192.192.0.2.1"),
),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
expectErrors: true,
},
{
name: "ValidIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("test", "FE80:CD00:0000:0CDE:1257:0000:211E:729C")),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
},
{
name: "InvalidIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("test-1", "FE80:CD00:0000:0CDE:1257:0000:211E:729C:invalid"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
expectErrors: true,
},
{
name: "FakeIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "10.0.0.1"),
withAddress("test-1", "192.0.2.1"),
),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
},
{
name: "FakeIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("test-1", "2001:0DB8:0000:0CDE:1257:0000:211E:729C"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
},
{
name: "ValidIPv4AsIPv6Literal",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("test", "::ffff:a00:1")),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
},
{
name: "FakeIPv4AsIPv6Literal",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("test-1", "::ffff:c000:201"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
},
{
name: "NoAddressesFound",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(),
expected: observedConfig(),
expectErrors: true,
},
{
name: "OnlyFakeAddressesFound",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "192.0.2.1"),
withAddress("test-1", "::ffff:c000:201"),
),
expected: observedConfig(),
expectErrors: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
lister := configobservation.Listers{
EndpointsLister: corev1listers.NewEndpointsLister(indexer),
}
if tt.endpoint != nil {
if err := indexer.Add(tt.endpoint); err != nil {
t.Fatalf("error adding endpoint to store: %#v", err)
}
}
actual, errs := ObserveStorageURLs(lister, events.NewInMemoryRecorder("test"), tt.currentConfig)
if tt.expectErrors && len(errs) == 0 {
t.Errorf("errors expected")
}
if !tt.expectErrors && len(errs) != 0 {
t.Errorf("unexpected errors: %v", errs)
}
if !reflect.DeepEqual(actual, tt.expected) {
t.Errorf("ObserveStorageURLs() gotObservedConfig = %v, want %v", actual, tt.expected)
}
if t.Failed() {
t.Log("\n" + mergepatch.ToYAMLOrError(actual))
for _, err := range errs {
t.Log(err)
}
}
})
}
}

func observedConfig(configs ...func(map[string]interface{})) map[string]interface{} {
observedConfig := map[string]interface{}{}
for _, config := range configs {
config(observedConfig)
}
return observedConfig
}

func withStorageURL(url string) func(map[string]interface{}) {
return func(observedConfig map[string]interface{}) {
urls, _, _ := unstructured.NestedStringSlice(observedConfig, "storageConfig", "urls")
urls = append(urls, url)
_ = unstructured.SetNestedStringSlice(observedConfig, urls, "storageConfig", "urls")
}
}

func endpoints(configs ...func(endpoints *v1.Endpoints)) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "host-etcd",
Namespace: "openshift-etcd",
Annotations: map[string]string{
"alpha.installer.openshift.io/dns-suffix": clusterFQDN,
},
},
Subsets: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{}}},
}
for _, config := range configs {
config(endpoints)
}
return endpoints
}

func withAddress(hostname, ip string) func(*v1.Endpoints) {
return func(endpoints *v1.Endpoints) {
endpoints.Subsets[0].Addresses = append(endpoints.Subsets[0].Addresses, v1.EndpointAddress{
Hostname: hostname,
IP: ip,
})
}
}
2 changes: 2 additions & 0 deletions pkg/operator/operatorclient/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ const (
GlobalMachineSpecifiedConfigNamespace = "openshift-config-managed"
OperatorNamespace = "openshift-apiserver-operator"
TargetNamespace = "openshift-apiserver"
EtcdEndpointNamespace = "openshift-etcd"
EtcdEndpointName = "host-etcd"
)
3 changes: 3 additions & 0 deletions pkg/operator/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
operatorclient.GlobalMachineSpecifiedConfigNamespace,
operatorclient.OperatorNamespace,
operatorclient.TargetNamespace,
operatorclient.EtcdEndpointNamespace,
)
apiregistrationInformers := apiregistrationinformers.NewSharedInformerFactory(apiregistrationv1Client, 10*time.Minute)
configInformers := configinformers.NewSharedInformerFactory(configClient, 10*time.Minute)
Expand Down Expand Up @@ -176,6 +177,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
{Resource: "namespaces", Name: operatorclient.GlobalMachineSpecifiedConfigNamespace},
{Resource: "namespaces", Name: operatorclient.OperatorNamespace},
{Resource: "namespaces", Name: operatorclient.TargetNamespace},
{Resource: "endpoints", Name: operatorclient.EtcdEndpointName, Namespace: operatorclient.EtcdEndpointNamespace},
},
apiServicesReferences()...,
),
Expand All @@ -192,6 +194,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller

configObserver := configobservercontroller.NewConfigObserver(
kubeInformersForNamespaces.InformersFor(operatorclient.TargetNamespace),
kubeInformersForNamespaces.InformersFor(operatorclient.EtcdEndpointNamespace),
operatorClient,
resourceSyncController,
operatorConfigInformers,
Expand Down

0 comments on commit ab08c50

Please sign in to comment.