-
Notifications
You must be signed in to change notification settings - Fork 85
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Prefer etcd endpoints configmap for storage URL discovery
- Loading branch information
1 parent
924ea05
commit f5b4988
Showing
6 changed files
with
343 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
reviewers: | ||
- ironcladlou | ||
- hexfusion | ||
- retroflexer | ||
approvers: | ||
- ironcladlou | ||
- hexfusion | ||
- retroflexer |
103 changes: 103 additions & 0 deletions
103
pkg/operator/configobservation/etcdendpoints/observe_etcd_endpoints.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package etcdendpoints | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
"reflect" | ||
"strings" | ||
|
||
"github.com/openshift/library-go/pkg/operator/configobserver" | ||
"github.com/openshift/library-go/pkg/operator/events" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
|
||
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation" | ||
endpointsobserver "github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcdobserver" | ||
) | ||
|
||
const ( | ||
etcdEndpointNamespace = "openshift-etcd" | ||
etcdEndpointName = "etcd-endpoints" | ||
) | ||
|
||
var fallbackObserver configobserver.ObserveConfigFunc = endpointsobserver.ObserveStorageURLs | ||
|
||
// ObserveStorageURLs observes the storage config URLs. If there is a problem observing the current storage config URLs, | ||
// then the previously observed storage config URLs will be re-used. | ||
func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) { | ||
listers := genericListers.(configobservation.Listers) | ||
storageConfigURLsPath := []string{"storageConfig", "urls"} | ||
var errs []error | ||
|
||
previouslyObservedConfig := map[string]interface{}{} | ||
currentEtcdURLs, found, err := unstructured.NestedStringSlice(currentConfig, storageConfigURLsPath...) | ||
if err != nil { | ||
errs = append(errs, err) | ||
} | ||
if found { | ||
if err := unstructured.SetNestedStringSlice(previouslyObservedConfig, currentEtcdURLs, storageConfigURLsPath...); err != nil { | ||
errs = append(errs, err) | ||
} | ||
} | ||
|
||
var etcdURLs []string | ||
etcdEndpoints, err := listers.ConfigmapLister.ConfigMaps(etcdEndpointNamespace).Get(etcdEndpointName) | ||
if errors.IsNotFound(err) { | ||
// In clusters prior to 4.5, fall back to reading the old host-etcd-2 endpoint | ||
// resource, if possible. In 4.6 we can assume consumers have been updated to | ||
// use the configmap, delete the fallback code, and throw an error if the | ||
// configmap doesn't exist. | ||
observedConfig, fallbackErrors := fallbackObserver(listers, recorder, currentConfig) | ||
if len(fallbackErrors) > 0 { | ||
errs = append(errs, fallbackErrors...) | ||
return previouslyObservedConfig, append(errs, fmt.Errorf("configmap %s/%s not found, and fallback observer failed", etcdEndpointNamespace, etcdEndpointName)) | ||
} | ||
return observedConfig, errs | ||
} | ||
if err != nil { | ||
recorder.Warningf("ObserveStorageFailed", "Error getting %s/%s configmap: %v", etcdEndpointNamespace, etcdEndpointName, err) | ||
return previouslyObservedConfig, append(errs, err) | ||
} | ||
|
||
// note: etcd bootstrap should never be added to the in-cluster kube-apiserver | ||
// this can result in some early pods crashlooping, but ensures that we never contact the bootstrap machine from | ||
// the in-cluster kube-apiserver so we can safely teardown out of order. | ||
|
||
for k := range etcdEndpoints.Data { | ||
address := etcdEndpoints.Data[k] | ||
ip := net.ParseIP(address) | ||
if ip == nil { | ||
ipErr := fmt.Errorf("configmaps/%s in the %s namespace: %v is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, address) | ||
errs = append(errs, ipErr) | ||
continue | ||
} | ||
// skip placeholder ip addresses used in previous versions where the hostname was used instead | ||
if strings.HasPrefix(ip.String(), "192.0.2.") || strings.HasPrefix(ip.String(), "2001:db8:") { | ||
// not considered an error | ||
continue | ||
} | ||
// use the canonical representation of the ip address (not original input) when constructing the url | ||
if ip.To4() != nil { | ||
etcdURLs = append(etcdURLs, fmt.Sprintf("https://%s:2379", ip)) | ||
} else { | ||
etcdURLs = append(etcdURLs, fmt.Sprintf("https://[%s]:2379", ip)) | ||
} | ||
} | ||
|
||
if len(etcdURLs) == 0 { | ||
emptyURLErr := fmt.Errorf("configmaps %s/%s: no etcd endpoint addresses found", etcdEndpointNamespace, etcdEndpointName) | ||
recorder.Warning("ObserveStorageFailed", emptyURLErr.Error()) | ||
return previouslyObservedConfig, append(errs, emptyURLErr) | ||
} | ||
|
||
observedConfig := map[string]interface{}{} | ||
if err := unstructured.SetNestedStringSlice(observedConfig, etcdURLs, storageConfigURLsPath...); err != nil { | ||
return previouslyObservedConfig, append(errs, err) | ||
} | ||
|
||
if !reflect.DeepEqual(currentEtcdURLs, etcdURLs) { | ||
recorder.Eventf("ObserveStorageUpdated", "Updated storage urls to %s", strings.Join(etcdURLs, ",")) | ||
} | ||
|
||
return observedConfig, errs | ||
} |
219 changes: 219 additions & 0 deletions
219
pkg/operator/configobservation/etcdendpoints/observe_etcd_endpoints_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
package etcdendpoints | ||
|
||
import ( | ||
"encoding/base64" | ||
"fmt" | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/openshift/library-go/pkg/operator/configobserver" | ||
"github.com/openshift/library-go/pkg/operator/events" | ||
v1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
"k8s.io/apimachinery/pkg/util/mergepatch" | ||
corev1listers "k8s.io/client-go/listers/core/v1" | ||
"k8s.io/client-go/tools/cache" | ||
|
||
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation" | ||
endpointsobserver "github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/configobservation/etcdobserver" | ||
) | ||
|
||
func TestObserveStorageURLs(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
currentConfig map[string]interface{} | ||
fallback configobserver.ObserveConfigFunc | ||
expected map[string]interface{} | ||
expectErrors bool | ||
endpoint *v1.ConfigMap | ||
}{ | ||
{ | ||
name: "NoConfigMapSuccessfulFallback", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
fallback: fallback(observedConfig(withStorageURL("https://10.0.0.1:2379"))), | ||
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")), | ||
expectErrors: false, | ||
}, | ||
{ | ||
name: "NoConfigMapFailedFallback", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
fallback: fallback(observedConfig(withStorageURL("https://10.0.0.1:2379")), fmt.Errorf("endpoint not found")), | ||
expected: observedConfig(withStorageURL("https://previous.url:2379")), | ||
expectErrors: true, | ||
}, | ||
{ | ||
name: "ValidIPv4", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints(withAddress("10.0.0.1")), | ||
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")), | ||
}, | ||
{ | ||
name: "InvalidIPv4", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints( | ||
withAddress("10.0.0.1"), | ||
withAddress("192.192.0.2.1"), | ||
), | ||
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")), | ||
expectErrors: true, | ||
}, | ||
{ | ||
name: "ValidIPv6", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints(withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C")), | ||
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")), | ||
}, | ||
{ | ||
name: "InvalidIPv6", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints( | ||
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C"), | ||
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C:invalid"), | ||
), | ||
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")), | ||
expectErrors: true, | ||
}, | ||
{ | ||
name: "FakeIPv4", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints( | ||
withAddress("10.0.0.1"), | ||
withAddress("192.0.2.1"), | ||
), | ||
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")), | ||
}, | ||
{ | ||
name: "FakeIPv6", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints( | ||
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C"), | ||
withAddress("2001:0DB8:0000:0CDE:1257:0000:211E:729C"), | ||
), | ||
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")), | ||
}, | ||
{ | ||
name: "ValidIPv4AsIPv6Literal", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints(withAddress("::ffff:a00:1")), | ||
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")), | ||
}, | ||
{ | ||
name: "FakeIPv4AsIPv6Literal", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints( | ||
withAddress("FE80:CD00:0000:0CDE:1257:0000:211E:729C"), | ||
withAddress("::ffff:c000:201"), | ||
), | ||
expected: observedConfig(withStorageURL("https://[fe80:cd00:0:cde:1257:0:211e:729c]:2379")), | ||
}, | ||
{ | ||
name: "NoAddressesFound", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints(), | ||
expected: observedConfig(withStorageURL("https://previous.url:2379")), | ||
expectErrors: true, | ||
}, | ||
{ | ||
name: "OnlyFakeAddressesFound", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints( | ||
withAddress("192.0.2.1"), | ||
withAddress("::ffff:c000:201"), | ||
), | ||
expected: observedConfig(withStorageURL("https://previous.url:2379")), | ||
expectErrors: true, | ||
}, | ||
{ | ||
name: "IgnoreBootstrap", | ||
currentConfig: observedConfig(withStorageURL("https://previous.url:2379")), | ||
endpoint: endpoints( | ||
withBootstrap("10.0.0.2"), | ||
withAddress("10.0.0.1"), | ||
), | ||
expected: observedConfig(withStorageURL("https://10.0.0.1:2379")), | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | ||
lister := configobservation.Listers{ | ||
ConfigmapLister: corev1listers.NewConfigMapLister(indexer), | ||
} | ||
if tt.endpoint != nil { | ||
if err := indexer.Add(tt.endpoint); err != nil { | ||
t.Fatalf("error adding endpoint to store: %#v", err) | ||
} | ||
} | ||
fallbackObserver = tt.fallback | ||
actual, errs := ObserveStorageURLs(lister, events.NewInMemoryRecorder("test"), tt.currentConfig) | ||
fallbackObserver = endpointsobserver.ObserveStorageURLs | ||
if tt.expectErrors && len(errs) == 0 { | ||
t.Errorf("errors expected") | ||
} | ||
if !tt.expectErrors && len(errs) != 0 { | ||
t.Errorf("unexpected errors: %v", errs) | ||
} | ||
if !reflect.DeepEqual(actual, tt.expected) { | ||
t.Errorf("ObserveStorageURLs() gotObservedConfig = %v, want %v", actual, tt.expected) | ||
} | ||
if t.Failed() { | ||
t.Log("\n" + mergepatch.ToYAMLOrError(actual)) | ||
for _, err := range errs { | ||
t.Log(err) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func observedConfig(configs ...func(map[string]interface{})) map[string]interface{} { | ||
observedConfig := map[string]interface{}{} | ||
for _, config := range configs { | ||
config(observedConfig) | ||
} | ||
return observedConfig | ||
} | ||
|
||
func withStorageURL(url string) func(map[string]interface{}) { | ||
return func(observedConfig map[string]interface{}) { | ||
urls, _, _ := unstructured.NestedStringSlice(observedConfig, "storageConfig", "urls") | ||
urls = append(urls, url) | ||
_ = unstructured.SetNestedStringSlice(observedConfig, urls, "storageConfig", "urls") | ||
} | ||
} | ||
|
||
func endpoints(configs ...func(endpoints *v1.ConfigMap)) *v1.ConfigMap { | ||
endpoints := &v1.ConfigMap{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "etcd-endpoints", | ||
Namespace: "openshift-etcd", | ||
}, | ||
Data: map[string]string{}, | ||
} | ||
for _, config := range configs { | ||
config(endpoints) | ||
} | ||
return endpoints | ||
} | ||
|
||
func withBootstrap(ip string) func(*v1.ConfigMap) { | ||
return func(endpoints *v1.ConfigMap) { | ||
if endpoints.Annotations == nil { | ||
endpoints.Annotations = map[string]string{} | ||
} | ||
endpoints.Annotations["alpha.installer.openshift.io/etcd-bootstrap"] = ip | ||
} | ||
} | ||
|
||
func withAddress(ip string) func(*v1.ConfigMap) { | ||
return func(endpoints *v1.ConfigMap) { | ||
endpoints.Data[base64.StdEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(ip))] = ip | ||
} | ||
} | ||
|
||
func fallback(observed map[string]interface{}, errs ...error) configobserver.ObserveConfigFunc { | ||
return func(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) { | ||
return observed, errs | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
reviewers: | ||
- ironcladlou | ||
- hexfusion | ||
- retroflexer | ||
approvers: | ||
- ironcladlou | ||
- hexfusion | ||
- retroflexer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters