Skip to content

Commit

Permalink
support CRDs at v1 throughout the apply stack
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Jan 23, 2020
1 parent 0c97ab8 commit cade8eb
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 31 deletions.
45 changes: 37 additions & 8 deletions pkg/operator/resource/resourceapply/apiextensions.go
@@ -1,19 +1,48 @@
package resourceapply

import (
"k8s.io/klog"

"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apiextclientv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
"k8s.io/klog"
)

// ApplyCustomResourceDefinition applies the required CustomResourceDefinition to the cluster.
func ApplyCustomResourceDefinition(client apiextclientv1beta1.CustomResourceDefinitionsGetter, recorder events.Recorder, required *apiextv1beta1.CustomResourceDefinition) (*apiextv1beta1.CustomResourceDefinition, bool, error) {
// ApplyCustomResourceDefinitionV1Beta1 applies the required CustomResourceDefinition to the cluster.
func ApplyCustomResourceDefinitionV1Beta1(client apiextclientv1beta1.CustomResourceDefinitionsGetter, recorder events.Recorder, required *apiextv1beta1.CustomResourceDefinition) (*apiextv1beta1.CustomResourceDefinition, bool, error) {
existing, err := client.CustomResourceDefinitions().Get(required.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
actual, err := client.CustomResourceDefinitions().Create(required)
reportCreateEvent(recorder, required, err)
return actual, true, err
}
if err != nil {
return nil, false, err
}

modified := resourcemerge.BoolPtr(false)
existingCopy := existing.DeepCopy()
resourcemerge.EnsureCustomResourceDefinitionV1Beta1(modified, existingCopy, *required)
if !*modified {
return existing, false, nil
}

if klog.V(4) {
klog.Infof("CustomResourceDefinition %q changes: %s", existing.Name, JSONPatchNoError(existing, existingCopy))
}

actual, err := client.CustomResourceDefinitions().Update(existingCopy)
reportUpdateEvent(recorder, required, err)

return actual, true, err
}

// ApplyCustomResourceDefinitionV1 applies the required CustomResourceDefinition to the cluster.
func ApplyCustomResourceDefinitionV1(client apiextclientv1.CustomResourceDefinitionsGetter, recorder events.Recorder, required *apiextensionsv1.CustomResourceDefinition) (*apiextensionsv1.CustomResourceDefinition, bool, error) {
existing, err := client.CustomResourceDefinitions().Get(required.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
actual, err := client.CustomResourceDefinitions().Create(required)
Expand All @@ -26,7 +55,7 @@ func ApplyCustomResourceDefinition(client apiextclientv1beta1.CustomResourceDefi

modified := resourcemerge.BoolPtr(false)
existingCopy := existing.DeepCopy()
resourcemerge.EnsureCustomResourceDefinition(modified, existingCopy, *required)
resourcemerge.EnsureCustomResourceDefinitionV1(modified, existingCopy, *required)
if !*modified {
return existing, false, nil
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/operator/resource/resourceapply/generic.go
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/openshift/library-go/pkg/operator/events"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -125,11 +125,16 @@ func ApplyDirectly(clients *ClientHolder, recorder events.Recorder, manifests As
result.Error = fmt.Errorf("missing kubeClient")
}
result.Result, result.Changed, result.Error = ApplyRoleBinding(clients.kubeClient.RbacV1(), recorder, t)
case *v1beta1.CustomResourceDefinition:
case *apiextensionsv1beta1.CustomResourceDefinition:
if clients.apiExtensionsClient == nil {
result.Error = fmt.Errorf("missing apiExtensionsClient")
}
result.Result, result.Changed, result.Error = ApplyCustomResourceDefinition(clients.apiExtensionsClient.ApiextensionsV1beta1(), recorder, t)
result.Result, result.Changed, result.Error = ApplyCustomResourceDefinitionV1Beta1(clients.apiExtensionsClient.ApiextensionsV1beta1(), recorder, t)
case *apiextensionsv1.CustomResourceDefinition:
if clients.apiExtensionsClient == nil {
result.Error = fmt.Errorf("missing apiExtensionsClient")
}
result.Result, result.Changed, result.Error = ApplyCustomResourceDefinitionV1(clients.apiExtensionsClient.ApiextensionsV1(), recorder, t)
default:
result.Error = fmt.Errorf("unhandled type %T", requiredObj)
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/operator/resource/resourcemerge/apiextensions.go
@@ -1,13 +1,26 @@
package resourcemerge

import (
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
)

// EnsureCustomResourceDefinition ensures that the existing matches the required.
// EnsureCustomResourceDefinitionV1Beta1 ensures that the existing matches the required.
// modified is set to true when existing had to be updated with required.
func EnsureCustomResourceDefinition(modified *bool, existing *apiextv1beta1.CustomResourceDefinition, required apiextv1beta1.CustomResourceDefinition) {
func EnsureCustomResourceDefinitionV1Beta1(modified *bool, existing *apiextensionsv1beta1.CustomResourceDefinition, required apiextensionsv1beta1.CustomResourceDefinition) {
EnsureObjectMeta(modified, &existing.ObjectMeta, required.ObjectMeta)

// we stomp everything
if !equality.Semantic.DeepEqual(existing.Spec, required.Spec) {
*modified = true
existing.Spec = required.Spec
}
}

// EnsureCustomResourceDefinitionV1 ensures that the existing matches the required.
// modified is set to true when existing had to be updated with required.
func EnsureCustomResourceDefinitionV1(modified *bool, existing *apiextensionsv1.CustomResourceDefinition, required apiextensionsv1.CustomResourceDefinition) {
EnsureObjectMeta(modified, &existing.ObjectMeta, required.ObjectMeta)

// we stomp everything
Expand Down
15 changes: 12 additions & 3 deletions pkg/operator/resource/resourceread/apiextensions.go
@@ -1,9 +1,11 @@
package resourceread

import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

var (
Expand All @@ -12,9 +14,8 @@ var (
)

func init() {
if err := apiextensionsv1beta1.AddToScheme(apiExtensionsScheme); err != nil {
panic(err)
}
utilruntime.Must(apiextensionsv1beta1.AddToScheme(apiExtensionsScheme))
utilruntime.Must(apiextensionsv1.AddToScheme(apiExtensionsScheme))
}

func ReadCustomResourceDefinitionV1Beta1OrDie(objBytes []byte) *apiextensionsv1beta1.CustomResourceDefinition {
Expand All @@ -24,3 +25,11 @@ func ReadCustomResourceDefinitionV1Beta1OrDie(objBytes []byte) *apiextensionsv1b
}
return requiredObj.(*apiextensionsv1beta1.CustomResourceDefinition)
}

func ReadCustomResourceDefinitionV1OrDie(objBytes []byte) *apiextensionsv1.CustomResourceDefinition {
requiredObj, err := runtime.Decode(apiExtensionsCodecs.UniversalDecoder(apiextensionsv1beta1.SchemeGroupVersion), objBytes)
if err != nil {
panic(err)
}
return requiredObj.(*apiextensionsv1.CustomResourceDefinition)
}
30 changes: 16 additions & 14 deletions pkg/operator/staticresourcecontroller/static_resource_controller.go
Expand Up @@ -31,6 +31,16 @@ const (
workQueueKey = "key"
)

var (
genericScheme = runtime.NewScheme()
genericCodecs = serializer.NewCodecFactory(genericScheme)
genericCodec = genericCodecs.UniversalDeserializer()
)

func init() {
utilruntime.Must(api.InstallKube(genericScheme))
}

type StaticResourceController struct {
name string
manifests resourceapply.AssetFunc
Expand Down Expand Up @@ -129,6 +139,8 @@ func (c *StaticResourceController) AddKubeInformers(kubeInformersByNamespace v1h
case *rbacv1.RoleBinding:
ret = ret.AddInformer(informer.Rbac().V1().RoleBindings().Informer())
default:
// if there's a missing case, the caller can add an informer or count on a time based trigger.
// if the controller doesn't handle it, then there will be failure from the underlying apply.
klog.V(4).Infof("unhandled type %T", requiredObj)
}
}
Expand Down Expand Up @@ -253,10 +265,10 @@ func (c *StaticResourceController) Run(ctx context.Context, workers int) {
go wait.Until(c.runWorker, time.Second, ctx.Done())

// add time based trigger
go wait.Until(func() { c.queue.Add(workQueueKey) }, time.Minute, ctx.Done())

// trigger once quickly at least once
c.queue.Add(workQueueKey)
go wait.PollImmediateUntil(time.Minute, func() (bool, error) {
c.queue.Add(workQueueKey)
return false, nil
}, ctx.Done())

<-ctx.Done()
}
Expand Down Expand Up @@ -293,13 +305,3 @@ func (c *StaticResourceController) eventHandler() cache.ResourceEventHandler {
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
}
}

var (
genericScheme = runtime.NewScheme()
genericCodecs = serializer.NewCodecFactory(genericScheme)
genericCodec = genericCodecs.UniversalDeserializer()
)

func init() {
utilruntime.Must(api.InstallKube(genericScheme))
}

0 comments on commit cade8eb

Please sign in to comment.