Skip to content

Commit

Permalink
pkg/cvo: update to new updatepayload structure
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavdahiya committed Sep 19, 2018
1 parent 08c865b commit 847f71b
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 231 deletions.
47 changes: 30 additions & 17 deletions pkg/cvo/cvo.go
Expand Up @@ -6,18 +6,11 @@ import (

"github.com/golang/glog"
"github.com/google/uuid"
"github.com/openshift/cluster-version-operator/lib/resourceapply"
cvv1 "github.com/openshift/cluster-version-operator/pkg/apis/clusterversion.openshift.io/v1"
osv1 "github.com/openshift/cluster-version-operator/pkg/apis/operatorstatus.openshift.io/v1"
clientset "github.com/openshift/cluster-version-operator/pkg/generated/clientset/versioned"
cvinformersv1 "github.com/openshift/cluster-version-operator/pkg/generated/informers/externalversions/clusterversion.openshift.io/v1"
osinformersv1 "github.com/openshift/cluster-version-operator/pkg/generated/informers/externalversions/operatorstatus.openshift.io/v1"
cvlistersv1 "github.com/openshift/cluster-version-operator/pkg/generated/listers/clusterversion.openshift.io/v1"
oslistersv1 "github.com/openshift/cluster-version-operator/pkg/generated/listers/operatorstatus.openshift.io/v1"
corev1 "k8s.io/api/core/v1"
apiextclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextinformersv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1beta1"
apiextlistersv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -30,6 +23,15 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"github.com/openshift/cluster-version-operator/lib/resourceapply"
cvv1 "github.com/openshift/cluster-version-operator/pkg/apis/clusterversion.openshift.io/v1"
osv1 "github.com/openshift/cluster-version-operator/pkg/apis/operatorstatus.openshift.io/v1"
clientset "github.com/openshift/cluster-version-operator/pkg/generated/clientset/versioned"
cvinformersv1 "github.com/openshift/cluster-version-operator/pkg/generated/informers/externalversions/clusterversion.openshift.io/v1"
osinformersv1 "github.com/openshift/cluster-version-operator/pkg/generated/informers/externalversions/operatorstatus.openshift.io/v1"
cvlistersv1 "github.com/openshift/cluster-version-operator/pkg/generated/listers/clusterversion.openshift.io/v1"
oslistersv1 "github.com/openshift/cluster-version-operator/pkg/generated/listers/operatorstatus.openshift.io/v1"
)

const (
Expand All @@ -39,11 +41,6 @@ const (
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15

// installconfigKey is the key in ConfigMap that stores the InstallConfig.
installconfigKey = "installconfig"

workQueueKey = "kube-system/installconfig"
)

// ownerKind contains the schema.GroupVersionKind for type that owns objects managed by CVO.
Expand Down Expand Up @@ -146,6 +143,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {
}

func (optr *Operator) eventHandler() cache.ResourceEventHandler {
workQueueKey := fmt.Sprintf("%s/%s", optr.namespace, optr.name)
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { optr.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { optr.queue.Add(workQueueKey) },
Expand Down Expand Up @@ -197,20 +195,36 @@ func (optr *Operator) sync(key string) error {
}()

// We always run this to make sure CVOConfig can be synced.
if err := optr.syncCVOCRDs(); err != nil {
if err := optr.syncCustomResourceDefinitions(); err != nil {
return err
}

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

config, err := optr.getConfig()
var obj *cvv1.CVOConfig
obj, err = optr.cvoConfigLister.CVOConfigs(namespace).Get(name)
if apierrors.IsNotFound(err) {
obj, err = optr.getConfig()
}
if err != nil {
return err
}

config := &cvv1.CVOConfig{}
obj.DeepCopyInto(config)

if err := optr.syncStatus(config, osv1.OperatorStatusCondition{Type: osv1.OperatorStatusConditionTypeWorking, Message: fmt.Sprintf("Working towards %s", config)}); err != nil {
return err
}

payload, err := optr.syncUpdatePayloadContents(updatePayloadsPathPrefix, config)
payloadDir, err := optr.updatePayloadDir(config)
if err != nil {
return err
}
payload, err := optr.loadUpdatePayload(payloadDir)
if err != nil {
return err
}
Expand All @@ -223,7 +237,6 @@ func (optr *Operator) sync(key string) error {
}

func (optr *Operator) getConfig() (*cvv1.CVOConfig, error) {
// XXX: fetch upstream, channel, cluster ID from InstallConfig
upstream := cvv1.URL("http://localhost:8080/graph")
channel := "fast"
id, _ := uuid.NewRandom()
Expand Down
235 changes: 21 additions & 214 deletions pkg/cvo/sync.go
Expand Up @@ -2,190 +2,45 @@ package cvo

import (
"fmt"
"os"
"path/filepath"
"sort"
"time"

"github.com/golang/glog"
"github.com/openshift/cluster-version-operator/lib"
"github.com/openshift/cluster-version-operator/lib/resourceapply"
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
"github.com/openshift/cluster-version-operator/pkg/apis"
"github.com/openshift/cluster-version-operator/pkg/apis/clusterversion.openshift.io/v1"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
randutil "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/pointer"

"github.com/openshift/cluster-version-operator/lib/resourceapply"
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
"github.com/openshift/cluster-version-operator/pkg/apis"
cvv1 "github.com/openshift/cluster-version-operator/pkg/apis/clusterversion.openshift.io/v1"
)

func (optr *Operator) syncUpdatePayload(config *v1.CVOConfig, payload [][]lib.Manifest) error {
for _, manifests := range payload {
for _, manifest := range manifests {
glog.V(4).Infof("Running sync for (%s) %s/%s", manifest.GVK.String(), manifest.Object().GetNamespace(), manifest.Object().GetName())
b, err := resourcebuilder.New(optr.restConfig, manifest)
if err != nil {
return err
}
if err := b.Do(ownerRefModifier(config)); err != nil {
return err
}
glog.V(4).Infof("Done syncing for (%s) %s/%s", manifest.GVK.String(), manifest.Object().GetNamespace(), manifest.Object().GetName())
func (optr *Operator) syncUpdatePayload(config *cvv1.CVOConfig, payload *updatePayload) error {
for _, manifest := range payload.manifests {
taskName := fmt.Sprintf("(%s) %s/%s", manifest.GVK.String(), manifest.Object().GetNamespace(), manifest.Object().GetName())
glog.V(4).Infof("Running sync for %s", taskName)
glog.V(4).Infof("Manifest: %s", string(manifest.Raw))
b, err := resourcebuilder.New(resourcebuilder.Mapper, optr.restConfig, manifest)
if err != nil {
return fmt.Errorf("error creating New resourcebuilder for %s: %v", taskName, err)
}
if err := b.WithModifier(ownerRefModifier(config)).Do(); err != nil {
return fmt.Errorf("error running apply for %s: %v", taskName, err)
}
glog.V(4).Infof("Done syncing for %s", taskName)
}
return nil
}

func ownerRefModifier(config *v1.CVOConfig) resourcebuilder.MetaV1ObjectModifierFunc {
func ownerRefModifier(config *cvv1.CVOConfig) resourcebuilder.MetaV1ObjectModifierFunc {
oref := metav1.NewControllerRef(config, ownerKind)
return func(obj metav1.Object) {
obj.SetOwnerReferences([]metav1.OwnerReference{*oref})
}
}

const (
updatePayloadsPathPrefix = "/etc/cvo/update-payloads"
)

func (optr *Operator) syncUpdatePayloadContents(pathprefix string, config *v1.CVOConfig) ([][]lib.Manifest, error) {
if !updatedesired(config.DesiredUpdate) {
return nil, nil
}
dv := config.DesiredUpdate.Version
dp := config.DesiredUpdate.Payload

dirpath := filepath.Join(pathprefix, dv)
_, err := os.Stat(dirpath)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
if os.IsNotExist(err) {
if err := optr.fetchUpdatePayloadToPath(pathprefix, config); err != nil {
return nil, err
}
}

// read dirpath to manifests:
// For each operator in payload the manifests should be ordered as:
// 1. CRDs
// 2. others
mmap, err := lib.LoadManifests(dirpath)
if err != nil {
return nil, err
}

if len(mmap) == 0 {
return nil, fmt.Errorf("empty update payload %s", dp)
}

sortedkeys := []string{}
for k := range mmap {
sortedkeys = append(sortedkeys, k)
}
sort.Strings(sortedkeys)

payload := [][]lib.Manifest{}
for _, k := range sortedkeys {
ordered := orderManifests(mmap[k])
payload = append(payload, ordered)
}
return payload, nil
}

func (optr *Operator) fetchUpdatePayloadToPath(pathprefix string, config *v1.CVOConfig) error {
if !updatedesired(config.DesiredUpdate) {
return fmt.Errorf("error DesiredUpdate is empty")
}
var (
version = config.DesiredUpdate.Version
payload = config.DesiredUpdate.Payload
name = fmt.Sprintf("%s-%s-%s", optr.name, version, randutil.String(5))
namespace = optr.namespace
deadline = pointer.Int64Ptr(2 * 60)
nodeSelectorKey = "node-role.kubernetes.io/master"
nodename = optr.nodename
vpath = filepath.Join(pathprefix, version)
cmd = []string{"/bin/sh"}
args = []string{"-c", fmt.Sprintf("mv /manifests %s", vpath)}
)

job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: deadline,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "payload",
Image: payload,
Command: cmd,
Args: args,
VolumeMounts: []corev1.VolumeMount{{
MountPath: pathprefix,
Name: "payload",
}},
}},
Volumes: []corev1.Volume{{
Name: "payload",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: pathprefix,
},
},
}},
NodeName: nodename,
NodeSelector: map[string]string{
nodeSelectorKey: "",
},
Tolerations: []corev1.Toleration{{
Key: nodeSelectorKey,
}},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
},
},
}

_, err := optr.kubeClient.BatchV1().Jobs(job.Namespace).Create(job)
if err != nil {
return err
}
return resourcebuilder.WaitForJobCompletion(optr.kubeClient.BatchV1(), job)
}

func orderManifests(manifests []lib.Manifest) []lib.Manifest {
var crds, others []lib.Manifest
for _, m := range manifests {
group, version, kind := m.GVK.Group, m.GVK.Version, m.GVK.Kind
switch {
case group == apiextv1beta1.SchemeGroupVersion.Group && version == apiextv1beta1.SchemeGroupVersion.Version && kind == resourcebuilder.CRDKind:
crds = append(crds, m)
default:
others = append(others, m)
}
}

out := []lib.Manifest{}
out = append(out, crds...)
out = append(out, others...)
return out
}

func updatedesired(desired v1.Update) bool {
return desired.Payload != "" &&
desired.Version != ""
}

func (optr *Operator) syncCVOCRDs() error {
func (optr *Operator) syncCustomResourceDefinitions() error {
crds := []*apiextv1beta1.CustomResourceDefinition{{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("operatorstatuses.%s", apis.OperatorStatusGroupName),
Expand All @@ -202,22 +57,6 @@ func (optr *Operator) syncCVOCRDs() error {
ListKind: "OperatorStatusList",
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cvoconfigs.%s", apis.ClusterVersionGroupName),
Namespace: metav1.NamespaceDefault,
},
Spec: apiextv1beta1.CustomResourceDefinitionSpec{
Group: apis.ClusterVersionGroupName,
Version: "v1",
Scope: "Namespaced",
Names: apiextv1beta1.CustomResourceDefinitionNames{
Plural: "cvoconfigs",
Singular: "cvoconfig",
Kind: "CVOConfig",
ListKind: "CVOConfigList",
},
},
}}

for _, crd := range crds {
Expand All @@ -235,10 +74,8 @@ func (optr *Operator) syncCVOCRDs() error {
}

const (
deploymentRolloutPollInterval = time.Second
deploymentRolloutTimeout = 5 * time.Minute
customResourceReadyInterval = time.Second
customResourceReadyTimeout = 1 * time.Minute
customResourceReadyInterval = time.Second
customResourceReadyTimeout = 1 * time.Minute
)

func (optr *Operator) waitForCustomResourceDefinition(resource *apiextv1beta1.CustomResourceDefinition) error {
Expand All @@ -262,33 +99,3 @@ func (optr *Operator) waitForCustomResourceDefinition(resource *apiextv1beta1.Cu
return false, nil
})
}

func (optr *Operator) waitForDeploymentRollout(resource *appsv1.Deployment) error {
return wait.Poll(deploymentRolloutPollInterval, deploymentRolloutTimeout, func() (bool, error) {
d, err := optr.deployLister.Deployments(resource.Namespace).Get(resource.Name)
if errors.IsNotFound(err) {
// exit early to recreate the deployment.
return false, err
}
if err != nil {
// Do not return error here, as we could be updating the API Server itself, in which case we
// want to continue waiting.
glog.Errorf("error getting Deployment %s during rollout: %v", resource.Name, err)
return false, nil
}

if d.DeletionTimestamp != nil {
return false, fmt.Errorf("Deployment %s is being deleted", resource.Name)
}

if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 {
return true, nil
}
glog.V(4).Infof("Deployment %s is not ready. status: (replicas: %d, updated: %d, ready: %d, unavailable: %d)", d.Name, d.Status.Replicas, d.Status.UpdatedReplicas, d.Status.ReadyReplicas, d.Status.UnavailableReplicas)
return false, nil
})
}

func intOrStringPtr(v intstr.IntOrString) *intstr.IntOrString {
return &v
}

0 comments on commit 847f71b

Please sign in to comment.