Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1805254: openshift-apiserver should directly use the host-etcd IPs #312

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/openshift/library-go/pkg/operator/v1helpers"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcdobserver"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/images"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/ingresses"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/project"
Expand All @@ -28,6 +29,7 @@ type ConfigObserver struct {
// NewConfigObserver initializes a new configuration observer.
func NewConfigObserver(
kubeInformers kubeinformers.SharedInformerFactory,
kubeInformersForEtcdNamespace kubeinformers.SharedInformerFactory,
operatorClient v1helpers.OperatorClient,
resourceSyncer resourcesynccontroller.ResourceSyncer,
operatorConfigInformers operatorv1informers.SharedInformerFactory,
Expand All @@ -45,6 +47,7 @@ func NewConfigObserver(
ProjectConfigLister: configInformers.Config().V1().Projects().Lister(),
ProxyLister_: configInformers.Config().V1().Proxies().Lister(),
IngressConfigLister: configInformers.Config().V1().Ingresses().Lister(),
EndpointsLister: kubeInformersForEtcdNamespace.Core().V1().Endpoints().Lister(),
SecretLister_: kubeInformers.Core().V1().Secrets().Lister(),
PreRunCachesSynced: []cache.InformerSynced{
operatorConfigInformers.Operator().V1().OpenShiftAPIServers().Informer().HasSynced,
Expand All @@ -58,6 +61,7 @@ func NewConfigObserver(
images.ObserveExternalRegistryHostnames,
images.ObserveAllowedRegistriesForImport,
ingresses.ObserveIngressDomain,
etcdobserver.ObserveStorageURLs,
libgoapiserver.ObserveTLSSecurityProfile,
project.ObserveProjectRequestMessage,
project.ObserveProjectRequestTemplateName,
Expand All @@ -70,5 +74,6 @@ func NewConfigObserver(
configInformers.Config().V1().Ingresses().Informer().AddEventHandler(c.EventHandler())
configInformers.Config().V1().Projects().Informer().AddEventHandler(c.EventHandler())
configInformers.Config().V1().Proxies().Informer().AddEventHandler(c.EventHandler())
kubeInformersForEtcdNamespace.Core().V1().Endpoints().Informer().AddEventHandler(c.EventHandler())
return c
}
96 changes: 96 additions & 0 deletions pkg/operator/configobservation/etcdobserver/observe_etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package etcdobserver

import (
"fmt"
"net"
"reflect"
"sort"
"strings"

"github.com/openshift/library-go/pkg/operator/configobserver"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/operatorclient"
)

const (
etcdEndpointNamespace = operatorclient.EtcdEndpointNamespace
etcdEndpointName = operatorclient.EtcdEndpointName
)

// ObserveStorageURLs observes the storage URL config.
func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) {
listers := genericListers.(configobservation.Listers)
storageConfigURLsPath := []string{"storageConfig", "urls"}
var errs []error

previouslyObservedConfig := map[string]interface{}{}
currentEtcdURLs, _, err := unstructured.NestedStringSlice(currentConfig, storageConfigURLsPath...)
if err != nil {
errs = append(errs, err)
}
if len(currentEtcdURLs) > 0 {
if err := unstructured.SetNestedStringSlice(previouslyObservedConfig, currentEtcdURLs, storageConfigURLsPath...); err != nil {
errs = append(errs, err)
}
}

var etcdURLs sort.StringSlice
etcdEndpoints, err := listers.EndpointsLister.Endpoints(etcdEndpointNamespace).Get(etcdEndpointName)
if errors.IsNotFound(err) {
recorder.Warningf("ObserveStorageFailed", "Required endpoints/%s in the %s namespace not found", etcdEndpointName, etcdEndpointNamespace)
errs = append(errs, fmt.Errorf("endpoints/%s in the %s namespace: not found", etcdEndpointName, etcdEndpointNamespace))
return previouslyObservedConfig, errs
}
if err != nil {
recorder.Warningf("ObserveStorageFailed", "Error getting endpoints/%s in the %s namespace: %v", etcdEndpointName, etcdEndpointNamespace, err)
errs = append(errs, err)
return previouslyObservedConfig, errs
}

for subsetIndex, subset := range etcdEndpoints.Subsets {
for addressIndex, address := range subset.Addresses {
ip := net.ParseIP(address.IP)
if ip == nil {
ipErr := fmt.Errorf("endpoints/%s in the %s namespace: subsets[%v]addresses[%v].IP is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, subsetIndex, addressIndex)
errs = append(errs, ipErr)
continue
}
// skip placeholder ip addresses used in previous versions where the hostname was used instead
if strings.HasPrefix(ip.String(), "192.0.2.") || strings.HasPrefix(ip.String(), "2001:db8:") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these? Needs comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

standard documentation cidrs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done
Comment added.

// not considered an error
continue
}
// use the canonical representation of the ip address (not original input) when constructing the url
if ip.To4() != nil {
etcdURLs = append(etcdURLs, fmt.Sprintf("https://%s:2379", ip))
} else {
etcdURLs = append(etcdURLs, fmt.Sprintf("https://[%s]:2379", ip))
}
}
}

observedConfig := map[string]interface{}{}

// do not add empty storage urls slice to observed config, we don't want override defaults with an empty slice
if len(etcdURLs) > 0 {
etcdURLs.Sort()
if err := unstructured.SetNestedStringSlice(observedConfig, etcdURLs, storageConfigURLsPath...); err != nil {
errs = append(errs, err)
return previouslyObservedConfig, errs
}
} else {
err := fmt.Errorf("endpoints/%s in the %s namespace: no etcd endpoint addresses found, falling back to default etcd service", etcdEndpointName, etcdEndpointNamespace)
recorder.Warningf("ObserveStorageFallback", err.Error())
errs = append(errs, err)
}

if !reflect.DeepEqual(currentEtcdURLs, etcdURLs) {
recorder.Eventf("ObserveStorageUpdated", "Updated storage urls to %s", strings.Join(etcdURLs, ","))
}

return observedConfig, errs
}
188 changes: 188 additions & 0 deletions pkg/operator/configobservation/etcdobserver/observe_etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package etcdobserver

import (
"reflect"
"testing"

"github.com/openshift/library-go/pkg/operator/events"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/mergepatch"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
)

const clusterFQDN = "foo.bar"

func TestObserveStorageURLs(t *testing.T) {
tests := []struct {
name string
currentConfig map[string]interface{}
expected map[string]interface{}
expectErrors bool
endpoint *v1.Endpoints
}{
{
name: "NoEtcdHosts",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
expected: observedConfig(withStorageURL("https://previous.url:2379")),
expectErrors: true,
},
{
name: "ValidIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("test", "10.0.0.1")),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
},
{
name: "InvalidIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "10.0.0.1"),
withAddress("test-1", "192.192.0.2.1"),
),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
expectErrors: true,
},
{
name: "ValidIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("test", "FE80:CD00:0000:0CDE:1257:0000:211E:729C")),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
},
{
name: "InvalidIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("test-1", "FE80:CD00:0000:0CDE:1257:0000:211E:729C:invalid"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
expectErrors: true,
},
{
name: "FakeIPv4",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "10.0.0.1"),
withAddress("test-1", "192.0.2.1"),
),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
},
{
name: "FakeIPv6",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("test-1", "2001:0DB8:0000:0CDE:1257:0000:211E:729C"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
},
{
name: "ValidIPv4AsIPv6Literal",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(withAddress("test", "::ffff:a00:1")),
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")),
},
{
name: "FakeIPv4AsIPv6Literal",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
withAddress("test-1", "::ffff:c000:201"),
),
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")),
},
{
name: "NoAddressesFound",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(),
expected: observedConfig(),
expectErrors: true,
},
{
name: "OnlyFakeAddressesFound",
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")),
endpoint: endpoints(
withAddress("test-0", "192.0.2.1"),
withAddress("test-1", "::ffff:c000:201"),
),
expected: observedConfig(),
expectErrors: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
lister := configobservation.Listers{
EndpointsLister: corev1listers.NewEndpointsLister(indexer),
}
if tt.endpoint != nil {
if err := indexer.Add(tt.endpoint); err != nil {
t.Fatalf("error adding endpoint to store: %#v", err)
}
}
actual, errs := ObserveStorageURLs(lister, events.NewInMemoryRecorder("test"), tt.currentConfig)
if tt.expectErrors && len(errs) == 0 {
t.Errorf("errors expected")
}
if !tt.expectErrors && len(errs) != 0 {
t.Errorf("unexpected errors: %v", errs)
}
if !reflect.DeepEqual(actual, tt.expected) {
t.Errorf("ObserveStorageURLs() gotObservedConfig = %v, want %v", actual, tt.expected)
}
if t.Failed() {
t.Log("\n" + mergepatch.ToYAMLOrError(actual))
for _, err := range errs {
t.Log(err)
}
}
})
}
}

func observedConfig(configs ...func(map[string]interface{})) map[string]interface{} {
observedConfig := map[string]interface{}{}
for _, config := range configs {
config(observedConfig)
}
return observedConfig
}

func withStorageURL(url string) func(map[string]interface{}) {
return func(observedConfig map[string]interface{}) {
urls, _, _ := unstructured.NestedStringSlice(observedConfig, "storageConfig", "urls")
urls = append(urls, url)
_ = unstructured.SetNestedStringSlice(observedConfig, urls, "storageConfig", "urls")
}
}

func endpoints(configs ...func(endpoints *v1.Endpoints)) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "host-etcd",
Namespace: "openshift-etcd",
Annotations: map[string]string{
"alpha.installer.openshift.io/dns-suffix": clusterFQDN,
},
},
Subsets: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{}}},
}
for _, config := range configs {
config(endpoints)
}
return endpoints
}

func withAddress(hostname, ip string) func(*v1.Endpoints) {
return func(endpoints *v1.Endpoints) {
endpoints.Subsets[0].Addresses = append(endpoints.Subsets[0].Addresses, v1.EndpointAddress{
Hostname: hostname,
IP: ip,
})
}
}
2 changes: 2 additions & 0 deletions pkg/operator/operatorclient/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ const (
GlobalMachineSpecifiedConfigNamespace = "openshift-config-managed"
OperatorNamespace = "openshift-apiserver-operator"
TargetNamespace = "openshift-apiserver"
EtcdEndpointNamespace = "openshift-etcd"
EtcdEndpointName = "host-etcd"
)
3 changes: 3 additions & 0 deletions pkg/operator/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
operatorclient.GlobalMachineSpecifiedConfigNamespace,
operatorclient.OperatorNamespace,
operatorclient.TargetNamespace,
operatorclient.EtcdEndpointNamespace,
)
apiregistrationInformers := apiregistrationinformers.NewSharedInformerFactory(apiregistrationv1Client, 10*time.Minute)
configInformers := configinformers.NewSharedInformerFactory(configClient, 10*time.Minute)
Expand Down Expand Up @@ -176,6 +177,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
{Resource: "namespaces", Name: operatorclient.GlobalMachineSpecifiedConfigNamespace},
{Resource: "namespaces", Name: operatorclient.OperatorNamespace},
{Resource: "namespaces", Name: operatorclient.TargetNamespace},
{Resource: "endpoints", Name: operatorclient.EtcdEndpointName, Namespace: operatorclient.EtcdEndpointNamespace},
},
apiServicesReferences()...,
),
Expand All @@ -192,6 +194,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller

configObserver := configobservercontroller.NewConfigObserver(
kubeInformersForNamespaces.InformersFor(operatorclient.TargetNamespace),
kubeInformersForNamespaces.InformersFor(operatorclient.EtcdEndpointNamespace),
operatorClient,
resourceSyncController,
operatorConfigInformers,
Expand Down