Skip to content

Commit

Permalink
Upgrade script for patching broker annotation (#2910)
Browse files Browse the repository at this point in the history
* upgrade script for patching broker annotation

* Add job spec + readme

* add to release artifacts

* refactor + tests

* appease the robot overlords

* last robot complaint addressed

* remove the gcp import that I left while testing

* address pr feedback
  • Loading branch information
vaikas committed Apr 6, 2020
1 parent 8edb67b commit 9bf9bb5
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 0 deletions.
46 changes: 46 additions & 0 deletions cmd/upgrade/v0.14.0/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"context"
"fmt"
"os"

"k8s.io/client-go/kubernetes"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"

versioned "knative.dev/eventing/pkg/client/clientset/versioned"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
broker "knative.dev/eventing/pkg/upgrader/broker/v0.14.0"
)

func main() {
ctx := signals.NewContext()
cfg := sharedmain.ParseAndGetConfigOrDie()
ctx = context.WithValue(ctx, kubeclient.Key{}, kubernetes.NewForConfigOrDie(cfg))
ctx = context.WithValue(ctx, eventingclient.Key{}, versioned.NewForConfigOrDie(cfg))
if err := broker.Upgrade(ctx); err != nil {
fmt.Printf("Broker Upgrade failed with: %v\n", err)
os.Exit(1)
}
}
26 changes: 26 additions & 0 deletions config/upgrade/v0.14.0/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Upgrade script required to upgrade to v0.14.0 of Eventing

This directory contains a job that upgrades (patches) all the Brokers to have an
annotation that specifies which BrokerClass should be reconciling it. Prior to
v0.13.0 there was only the ChannelBasedBroker and it was not necessary. However
in v0.14.0 we introduced BrokerClass and in order for existing Brokers to
continue to be reconciled, they need to be patched to include the BrokerClass
annotation like so.

```
annotations:
eventing.knative.dev/broker.class: ChannelBasedBroker
```

To run the upgrade script:

```shell
ko apply -f ./upgrade.yaml
```

It will create a job called v0.14.0-upgrade in the knative-eventing namespace.
If you installed to different namespace, you need to modify the upgrade.yaml
appropriately. Also the job by default runs as `eventing-controller` service
account, you can also modify that but the service account will need to have
permissions to list `Namespace`s, list and patch `Broker`s.
16 changes: 16 additions & 0 deletions config/upgrade/v0.14.0/upgrade.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: batch/v1
kind: Job
metadata:
name: v0.14.0-upgrade
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
spec:
template:
spec:
serviceAccountName: eventing-controller
restartPolicy: Never
containers:
- name: upgrade-brokers
image: ko://knative.dev/eventing/cmd/upgrade/v0.14.0/

1 change: 1 addition & 0 deletions hack/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ COMPONENTS=(
["channel-broker.yaml"]="config/brokers/channel-broker"
["mt-channel-broker.yaml"]="config/brokers/mt-channel-broker"
["in-memory-channel.yaml"]="config/channels/in-memory-channel"
["upgrade-to-v0.14.0.yaml"]="config/upgrade/v0.14.0"
)
readonly COMPONENTS

Expand Down
118 changes: 118 additions & 0 deletions pkg/upgrader/broker/v0.14.0/upgrader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package broker

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/pkg/apis/duck"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
)

// Upgrade upgrades all the brokers by applying the Broker Class to all the
// ones that do not have them. This is necessary to ensure that existing Brokers
// that do not have the eventing.knative.dev/broker.class will continue to be
// reconciled by the existing ChannelBasedBroker
func Upgrade(ctx context.Context) error {
logger := logging.FromContext(ctx)

nsClient := kubeclient.Get(ctx).CoreV1().Namespaces()
namespaces, err := nsClient.List(metav1.ListOptions{})
if err != nil {
logger.Warnf("Failed to list namespaces: %v", err)
return err
}
for _, ns := range namespaces.Items {
err = processNamespace(ctx, ns.Name)
if err != nil {
return err
}
}
return nil
}

func processNamespace(ctx context.Context, ns string) error {
logger := logging.FromContext(ctx)
logger.Infof("Processing Brokers in namespace: %q", ns)

eventingClient := eventingclient.Get(ctx)
brokerClient := eventingClient.EventingV1alpha1().Brokers(ns)
brokers, err := brokerClient.List(metav1.ListOptions{})
if err != nil {
logger.Warnf("Failed to list brokers for namespace %q: %v", ns, err)
return err
}
for _, broker := range brokers.Items {
patch, err := processBroker(ctx, broker)
if err != nil {
logger.Warnf("Failed to process a Broker \"%s/%s\" : %v", broker.Namespace, broker.Name, err)
return err
}
if len(patch) == 0 {
logger.Infof("Broker \"%s/%s\" has annotation already", broker.Namespace, broker.Name)
continue
}

// Ok, there are differences, apply the patch
logger.Infof("Patching Broker \"%s/%s\" with %q", broker.Namespace, broker.Name, string(patch))
patched, err := brokerClient.Patch(broker.Name, types.MergePatchType, patch)
if err != nil {
logger.Warnf("Failed to patch \"%s/%s\" : %v", broker.Namespace, broker.Name, err)
return err
}
logger.Infof("Patched \"%s/%s\" successfully new Annotations: %+v", broker.Namespace, broker.Name, patched.ObjectMeta.GetAnnotations())
}
return nil
}

// Process a single Broker to see if it needs a patch applied to it or not.
// Returns non-empty patch bytes if a patch is necessary.
func processBroker(ctx context.Context, broker v1alpha1.Broker) ([]byte, error) {
logger := logging.FromContext(ctx)

modified := broker.DeepCopy()
annotations := modified.ObjectMeta.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string, 1)
}
if brokerClass, present := annotations[eventing.BrokerClassKey]; present {
logger.Infof("Annotation found \"%s/%s\" => %q", broker.Namespace, broker.Name, brokerClass)
return []byte{}, nil
}
if _, present := annotations[eventing.BrokerClassKey]; !present {
annotations[eventing.BrokerClassKey] = eventing.ChannelBrokerClassValue
modified.ObjectMeta.SetAnnotations(annotations)
}
patch, err := duck.CreateMergePatch(broker, modified)
if err != nil {
logger.Warnf("Failed to create patch for \"%s/%s\" : %v", broker.Namespace, broker.Name, err)
return []byte{}, err
}
logger.Infof("Patched \"%s/%s\": %q", broker.Namespace, broker.Name, string(patch))
// If there is nothing to patch, we are good, just return.
// Empty patch is {}, hence we check for that.
if len(patch) <= 2 {
return []byte{}, nil
}
return patch, nil
}
Loading

0 comments on commit 9bf9bb5

Please sign in to comment.