Skip to content

Commit

Permalink
Use a delegation model for configmap migration
Browse files Browse the repository at this point in the history
This ensures only one observer impl is ever used at a time for etcd discovery
  • Loading branch information
ironcladlou committed May 18, 2020
1 parent d414073 commit 206d702
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
Expand Up @@ -115,7 +115,7 @@ func NewConfigObserver(
// static path at which we expect to find the encryption config secret
"/etc/kubernetes/static-pod-resources/secrets/encryption-config/encryption-config",
),
etcdendpoints.ObserveStorageURLs,
etcdendpoints.BackwardsCompatibleObserveStorageURLs,
cloudprovider.NewCloudProviderObserver(
"openshift-kube-apiserver",
[]string{"apiServerArguments", "cloud-provider"},
Expand Down
Expand Up @@ -12,14 +12,24 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/configobservation"
endpointobserver "github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/configobservation/etcd"
endpointsobserver "github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/configobservation/etcd"
)

const (
etcdEndpointNamespace = "openshift-etcd"
etcdEndpointName = "etcd-endpoints"
)

func BackwardsCompatibleObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) {
listers := genericListers.(configobservation.Listers)
if _, err := listers.ConfigmapLister.ConfigMaps(etcdEndpointNamespace).Get(etcdEndpointName); err == nil {
return ObserveStorageURLs(genericListers, recorder, currentConfig)
} else if !errors.IsNotFound(err) {
return nil, []error{err}
}
return endpointsobserver.ObserveStorageURLs(genericListers, recorder, currentConfig)
}

// ObserveStorageURLs observes the storage config URLs. If there is a problem observing the current storage config URLs,
// then the previously observed storage config URLs will be re-used.
func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.Recorder, currentConfig map[string]interface{}) (map[string]interface{}, []error) {
Expand All @@ -42,20 +52,19 @@ func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.R
etcdEndpoints, err := listers.ConfigmapLister.ConfigMaps(etcdEndpointNamespace).Get(etcdEndpointName)
if errors.IsNotFound(err) {
recorder.Warningf("ObserveStorageFailed", "Required %s/%s configmap not found", etcdEndpointNamespace, etcdEndpointName)

// Fall back to the old endpoint resource
return endpointobserver.ObserveStorageURLs(genericListers, recorder, currentConfig)
return previouslyObservedConfig, append(errs, fmt.Errorf("configmaps/%s.%s: not found", etcdEndpointName, etcdEndpointNamespace))
}
if err != nil {
recorder.Warningf("ObserveStorageFailed", "Error getting %s/%s endpoint: %v", etcdEndpointNamespace, etcdEndpointName, err)
recorder.Warningf("ObserveStorageFailed", "Error getting %s/%s configmap: %v", etcdEndpointNamespace, etcdEndpointName, err)
return previouslyObservedConfig, append(errs, err)
}

// note: etcd bootstrap should never be added to the in-cluster kube-apiserver
// this can result in some early pods crashlooping, but ensures that we never contact the bootstrap machine from
// the in-cluster kube-apiserver so we can safely teardown out of order.

for address := range etcdEndpoints.Data {
for k := range etcdEndpoints.Data {
address := etcdEndpoints.Data[k]
ip := net.ParseIP(address)
if ip == nil {
ipErr := fmt.Errorf("configmaps/%s in the %s namespace: %v is not a valid IP address", etcdEndpointName, etcdEndpointNamespace, address)
Expand All @@ -76,7 +85,7 @@ func ObserveStorageURLs(genericListers configobserver.Listers, recorder events.R
}

if len(etcdURLs) == 0 {
emptyURLErr := fmt.Errorf("endpoints %s/%s: no etcd endpoint addresses found", etcdEndpointNamespace, etcdEndpointName)
emptyURLErr := fmt.Errorf("configmaps %s/%s: no etcd endpoint addresses found", etcdEndpointNamespace, etcdEndpointName)
recorder.Warning("ObserveStorageFailed", emptyURLErr.Error())
errs = append(errs, emptyURLErr)
}
Expand Down
@@ -1,6 +1,7 @@
package etcdendpoints

import (
"encoding/base64"
"reflect"
"testing"

Expand All @@ -15,8 +16,6 @@ import (
"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/configobservation"
)

const clusterFQDN = "foo.bar"

func TestObserveStorageURLs(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -203,6 +202,6 @@ func withBootstrap(ip string) func(*v1.ConfigMap) {

func withAddress(ip string) func(*v1.ConfigMap) {
return func(endpoints *v1.ConfigMap) {
endpoints.Data[ip] = ""
endpoints.Data[base64.StdEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(ip))] = ip
}
}

0 comments on commit 206d702

Please sign in to comment.