Skip to content

Commit

Permalink
observe storage urls from host-etcd endpoint address ips
Browse files Browse the repository at this point in the history
  • Loading branch information
sanchezl committed Feb 19, 2020
1 parent 57d49d7 commit 3d281f3
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/openshift/library-go/pkg/operator/v1helpers"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcd"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/images"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/ingresses"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/project"
Expand All @@ -28,6 +29,7 @@ type ConfigObserver struct {
// NewConfigObserver initializes a new configuration observer.
func NewConfigObserver(
kubeInformers kubeinformers.SharedInformerFactory,
kubeInformersForEtcdNamespace kubeinformers.SharedInformerFactory,
operatorClient v1helpers.OperatorClient,
resourceSyncer resourcesynccontroller.ResourceSyncer,
operatorConfigInformers operatorv1informers.SharedInformerFactory,
Expand All @@ -45,6 +47,7 @@ func NewConfigObserver(
ProjectConfigLister: configInformers.Config().V1().Projects().Lister(),
ProxyLister_: configInformers.Config().V1().Proxies().Lister(),
IngressConfigLister: configInformers.Config().V1().Ingresses().Lister(),
EndpointsLister: kubeInformersForEtcdNamespace.Core().V1().Endpoints().Lister(),
SecretLister_: kubeInformers.Core().V1().Secrets().Lister(),
PreRunCachesSynced: []cache.InformerSynced{
operatorConfigInformers.Operator().V1().OpenShiftAPIServers().Informer().HasSynced,
Expand All @@ -58,6 +61,7 @@ func NewConfigObserver(
images.ObserveExternalRegistryHostnames,
images.ObserveAllowedRegistriesForImport,
ingresses.ObserveIngressDomain,
etcd.ObserveStorageURLs,
libgoapiserver.ObserveTLSSecurityProfile,
project.ObserveProjectRequestMessage,
project.ObserveProjectRequestTemplateName,
Expand All @@ -70,5 +74,6 @@ func NewConfigObserver(
configInformers.Config().V1().Ingresses().Informer().AddEventHandler(c.EventHandler())
configInformers.Config().V1().Projects().Informer().AddEventHandler(c.EventHandler())
configInformers.Config().V1().Proxies().Informer().AddEventHandler(c.EventHandler())
kubeInformersForEtcdNamespace.Core().V1().Endpoints().Informer().AddEventHandler(c.EventHandler())
return c
}
84 changes: 84 additions & 0 deletions pkg/operator/configobservation/etcd/observe_etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package etcd

import (
"fmt"
"net"
"reflect"
"strings"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/openshift/library-go/pkg/operator/configobserver"
"github.com/openshift/library-go/pkg/operator/events"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/operatorclient"
)

const (
etcdEndpointNamespace = operatorclient.EtcdEndpointNamespace
etcdEndpointName = operatorclient.EtcdEndpointName
)

// ObserveStorageURLs observes the storage config URLs. If there is a problem observing the current storage config URLs,
// then the previously observed storage config URLs will be re-used.
func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (observedConfig map[string]interface{}, errs []error) {
listers := genericListers.(configobservation.Listers)
observedConfig = map[string]interface{}{}
storageConfigURLsPath := []string{"storageConfig", "urls"}

currentEtcdURLs, found, err := unstructured.NestedStringSlice(currentConfig, storageConfigURLsPath...)
if err != nil {
errs = append(errs, err)
}
if found {
if err := unstructured.SetNestedStringSlice(observedConfig, currentEtcdURLs, storageConfigURLsPath...); err != nil {
errs = append(errs, err)
}
}

var etcdURLs []string
etcdEndpoints, err := listers.EndpointsLister.Endpoints(etcdEndpointNamespace).Get(etcdEndpointName)
if errors.IsNotFound(err) {
recorder.Warningf("ObserveStorageFailed", "Required %s/%s endpoint not found", etcdEndpointNamespace, etcdEndpointName)
errs = append(errs, fmt.Errorf("endpoints/host-etcd.kube-system: not found"))
return
}
if err != nil {
recorder.Warningf("ObserveStorageFailed", "Error getting %s/%s endpoint: %v", etcdEndpointNamespace, etcdEndpointName, err)
errs = append(errs, err)
return
}
for subsetIndex, subset := range etcdEndpoints.Subsets {
for addressIndex, address := range subset.Addresses {
if ip := net.ParseIP(address.IP); ip == nil {
ipErr := fmt.Errorf("endpoints %s/%s: subsets[%v]addresses[%v].IP is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, subsetIndex, addressIndex)
errs = append(errs, ipErr)
continue
}
etcdURLs = append(etcdURLs, "https://"+address.IP+":2379")
}
}

if len(etcdURLs) == 0 {
emptyURLErr := fmt.Errorf("endpoints %s/%s: no etcd endpoint addresses found", etcdEndpointNamespace, etcdEndpointName)
recorder.Warning("ObserveStorageFailed", emptyURLErr.Error())
errs = append(errs, emptyURLErr)
}

if len(errs) > 0 {
return
}

if err := unstructured.SetNestedStringSlice(observedConfig, etcdURLs, storageConfigURLsPath...); err != nil {
errs = append(errs, err)
return
}

if !reflect.DeepEqual(currentEtcdURLs, etcdURLs) {
recorder.Eventf("ObserveStorageUpdated", "Updated storage urls to %s", strings.Join(etcdURLs, ","))
}

return
}
152 changes: 152 additions & 0 deletions pkg/operator/configobservation/etcd/observe_etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package etcd

import (
"fmt"
"github.com/openshift/library-go/pkg/operator/events"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes/fake"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"reflect"
"testing"

"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation"
)

const clusterFQDN = "foo.bar"

func fakeObjectReference(ep *v1.Endpoints) *v1.ObjectReference {
return &v1.ObjectReference{
Kind: ep.Kind,
Namespace: ep.Namespace,
Name: ep.Name,
UID: ep.UID,
APIVersion: ep.APIVersion,
ResourceVersion: ep.ResourceVersion,
}
}

func getWantObserverConfig(etcdURLs []string) (map[string]interface{}, error) {
wantObserverConfig := map[string]interface{}{}
if len(etcdURLs) == 0 {
return wantObserverConfig, nil
}
storageConfigURLsPath := []string{"storageConfig", "urls"}
if err := unstructured.SetNestedStringSlice(wantObserverConfig, etcdURLs, storageConfigURLsPath...); err != nil {
return nil, err
}
return wantObserverConfig, nil
}

func getEndpoint(hostname, ip string) *v1.Endpoints {
return &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "host-etcd",
Namespace: "openshift-etcd",
Annotations: map[string]string{
"alpha.installer.openshift.io/dns-suffix": clusterFQDN,
},
},
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: ip,
Hostname: hostname,
},
},
},
},
}
}

func TestObserveStorageURLs(t *testing.T) {
tests := []struct {
name string
indexer cache.Indexer
currentConfig map[string]interface{}
wantStorageURLs []string
wantErrs []error
endpoint *v1.Endpoints
}{
//{
// name: "test etcd-bootstrap with dummy IP",
// indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
// currentConfig: nil,
// wantStorageURLs: []string{"https://etcd-bootstrap." + clusterFQDN + ":2379"},
// wantErrs: nil,
// endpoint: getEndpoint("etcd-bootstrap", "192.0.2.1"),
//},
{
name: "test etcd-bootstrap with real IP",
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
currentConfig: nil,
wantStorageURLs: []string{"https://10.0.0.1:2379"},
wantErrs: nil,
endpoint: getEndpoint("etcd-bootstrap", "10.0.0.1"),
},
{
name: "test etcd-bootstrap with invalid IPv4",
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
currentConfig: nil,
wantStorageURLs: []string{},
wantErrs: []error{fmt.Errorf("endpoints %s/%s: subsets[%v]addresses[%v].IP is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, 0, 0),
fmt.Errorf("endpoints %s/%s: no etcd endpoint addresses found", etcdEndpointNamespace, etcdEndpointName)},
endpoint: getEndpoint("etcd-bootstrap", "192.192.0.2.1"),
},
{
name: "test etcd-bootstrap with valid IPv6",
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
currentConfig: nil,
wantStorageURLs: []string{"https://FE80:CD00:0000:0CDE:1257:0000:211E:729C:2379"},
wantErrs: nil,
endpoint: getEndpoint("etcd-bootstrap", "FE80:CD00:0000:0CDE:1257:0000:211E:729C"),
},
{
name: "test etcd-bootstrap with invalid IPv6",
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
currentConfig: nil,
wantStorageURLs: []string{},
wantErrs: []error{
fmt.Errorf("endpoints %s/%s: subsets[%v]addresses[%v].IP is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, 0, 0),
fmt.Errorf("endpoints %s/%s: no etcd endpoint addresses found", etcdEndpointNamespace, etcdEndpointName),
},
endpoint: getEndpoint("etcd-bootstrap", "FE80:CD00:0000:0CDE:1257:0000:211E:729C:invalid"),
},
{
name: "test etcd member",
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
currentConfig: nil,
wantStorageURLs: []string{"https://192.0.2.1:2379"},
wantErrs: nil,
endpoint: getEndpoint("etcd-0", "192.0.2.1"),
},
// TODO: Add more test cases
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
lister := configobservation.Listers{
EndpointsLister: corev1listers.NewEndpointsLister(tt.indexer),
}
r := events.NewRecorder(client.CoreV1().Events("openshift-etcd"), "test-operator",
fakeObjectReference(tt.endpoint))
if err := tt.indexer.Add(tt.endpoint); err != nil {
t.Errorf("error adding endpoint to store: %#v", err)
}
wantObserverConfig, err := getWantObserverConfig(tt.wantStorageURLs)
if err != nil {
t.Errorf("error getting wantObserverConfig: %#v", err)
}
gotObservedConfig, gotErrs := ObserveStorageURLs(lister, r, tt.currentConfig)
if !reflect.DeepEqual(gotObservedConfig, wantObserverConfig) {
t.Errorf("ObserveStorageURLs() gotObservedConfig = %v, want %v", gotObservedConfig, wantObserverConfig)
}
if !reflect.DeepEqual(gotErrs, tt.wantErrs) {
t.Errorf("ObserveStorageURLs() gotErrs = %v, want %v", gotErrs, tt.wantErrs)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/operator/operatorclient/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ const (
GlobalMachineSpecifiedConfigNamespace = "openshift-config-managed"
OperatorNamespace = "openshift-apiserver-operator"
TargetNamespace = "openshift-apiserver"
EtcdEndpointNamespace = "openshift-etcd"
EtcdEndpointName = "host-etcd"
)
3 changes: 3 additions & 0 deletions pkg/operator/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
operatorclient.GlobalMachineSpecifiedConfigNamespace,
operatorclient.OperatorNamespace,
operatorclient.TargetNamespace,
operatorclient.EtcdEndpointNamespace,
)
apiregistrationInformers := apiregistrationinformers.NewSharedInformerFactory(apiregistrationv1Client, 10*time.Minute)
configInformers := configinformers.NewSharedInformerFactory(configClient, 10*time.Minute)
Expand Down Expand Up @@ -176,6 +177,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
{Resource: "namespaces", Name: operatorclient.GlobalMachineSpecifiedConfigNamespace},
{Resource: "namespaces", Name: operatorclient.OperatorNamespace},
{Resource: "namespaces", Name: operatorclient.TargetNamespace},
{Resource: "endpoints", Name: operatorclient.EtcdEndpointName, Namespace: operatorclient.EtcdEndpointNamespace},
},
apiServicesReferences()...,
),
Expand All @@ -192,6 +194,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller

configObserver := configobservercontroller.NewConfigObserver(
kubeInformersForNamespaces.InformersFor(operatorclient.TargetNamespace),
kubeInformersForNamespaces.InformersFor(operatorclient.EtcdEndpointNamespace),
operatorClient,
resourceSyncController,
operatorConfigInformers,
Expand Down

0 comments on commit 3d281f3

Please sign in to comment.