diff --git a/manifests/07-operator-ibm-cloud-managed.yaml b/manifests/07-operator-ibm-cloud-managed.yaml index fcfb2e438..0750c694a 100644 --- a/manifests/07-operator-ibm-cloud-managed.yaml +++ b/manifests/07-operator-ibm-cloud-managed.yaml @@ -40,6 +40,8 @@ spec: fieldPath: metadata.name - name: OPERATOR_NAME value: cluster-image-registry-operator + - name: OPERATOR_IMAGE + value: docker.io/openshift/origin-cluster-image-registry-operator:latest - name: IMAGE value: docker.io/openshift/origin-docker-registry:latest - name: IMAGE_PRUNER diff --git a/manifests/07-operator.yaml b/manifests/07-operator.yaml index af3f5986c..811a03b68 100644 --- a/manifests/07-operator.yaml +++ b/manifests/07-operator.yaml @@ -68,6 +68,8 @@ spec: fieldPath: metadata.name - name: OPERATOR_NAME value: "cluster-image-registry-operator" + - name: OPERATOR_IMAGE + value: docker.io/openshift/origin-cluster-image-registry-operator:latest - name: IMAGE value: docker.io/openshift/origin-docker-registry:latest - name: IMAGE_PRUNER diff --git a/pkg/operator/azurepathfixcontroller.go b/pkg/operator/azurepathfixcontroller.go new file mode 100644 index 000000000..48b29e8cd --- /dev/null +++ b/pkg/operator/azurepathfixcontroller.go @@ -0,0 +1,310 @@ +package operator + +import ( + "context" + "fmt" + "time" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + batchv1informers "k8s.io/client-go/informers/batch/v1" + corev1informers "k8s.io/client-go/informers/core/v1" + batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1" + batchv1listers "k8s.io/client-go/listers/batch/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + configapiv1 "github.com/openshift/api/config/v1" + operatorv1 "github.com/openshift/api/operator/v1" + configv1informers "github.com/openshift/client-go/config/informers/externalversions/config/v1" + configlisters "github.com/openshift/client-go/config/listers/config/v1" + imageregistryv1informers "github.com/openshift/client-go/imageregistry/informers/externalversions/imageregistry/v1" + imageregistryv1listers "github.com/openshift/client-go/imageregistry/listers/imageregistry/v1" + "github.com/openshift/library-go/pkg/operator/v1helpers" + + "github.com/openshift/cluster-image-registry-operator/pkg/defaults" + "github.com/openshift/cluster-image-registry-operator/pkg/resource" + "github.com/openshift/cluster-image-registry-operator/pkg/storage/util" +) + +type AzurePathFixController struct { + batchClient batchv1client.BatchV1Interface + operatorClient v1helpers.OperatorClient + jobLister batchv1listers.JobNamespaceLister + imageRegistryConfigLister imageregistryv1listers.ConfigLister + secretLister corev1listers.SecretNamespaceLister + podLister corev1listers.PodNamespaceLister + infrastructureLister configlisters.InfrastructureLister + proxyLister configlisters.ProxyLister + kubeconfig *restclient.Config + + cachesToSync []cache.InformerSynced + queue workqueue.RateLimitingInterface +} + +func NewAzurePathFixController( + kubeconfig *restclient.Config, + batchClient batchv1client.BatchV1Interface, + operatorClient v1helpers.OperatorClient, + jobInformer batchv1informers.JobInformer, + imageRegistryConfigInformer imageregistryv1informers.ConfigInformer, + infrastructureInformer configv1informers.InfrastructureInformer, + secretInformer corev1informers.SecretInformer, + proxyInformer configv1informers.ProxyInformer, + podInformer corev1informers.PodInformer, +) (*AzurePathFixController, error) { + c := &AzurePathFixController{ + batchClient: batchClient, + operatorClient: operatorClient, + jobLister: jobInformer.Lister().Jobs(defaults.ImageRegistryOperatorNamespace), + imageRegistryConfigLister: imageRegistryConfigInformer.Lister(), + infrastructureLister: infrastructureInformer.Lister(), + secretLister: secretInformer.Lister().Secrets(defaults.ImageRegistryOperatorNamespace), + podLister: podInformer.Lister().Pods(defaults.ImageRegistryOperatorNamespace), + proxyLister: proxyInformer.Lister(), + kubeconfig: kubeconfig, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AzurePathFixController"), + } + + if _, err := jobInformer.Informer().AddEventHandler(c.eventHandler()); err != nil { + return nil, err + } + c.cachesToSync = append(c.cachesToSync, jobInformer.Informer().HasSynced) + + if _, err := imageRegistryConfigInformer.Informer().AddEventHandler(c.eventHandler()); err != nil { + return nil, err + } + c.cachesToSync = append(c.cachesToSync, imageRegistryConfigInformer.Informer().HasSynced) + + if _, err := infrastructureInformer.Informer().AddEventHandler(c.eventHandler()); err != nil { + return nil, err + } + c.cachesToSync = append(c.cachesToSync, infrastructureInformer.Informer().HasSynced) + + if _, err := secretInformer.Informer().AddEventHandler(c.eventHandler()); err != nil { + return nil, err + } + c.cachesToSync = append(c.cachesToSync, secretInformer.Informer().HasSynced) + + if _, err := podInformer.Informer().AddEventHandler(c.eventHandler()); err != nil { + return nil, err + } + c.cachesToSync = append(c.cachesToSync, podInformer.Informer().HasSynced) + + if _, err := proxyInformer.Informer().AddEventHandler(c.eventHandler()); err != nil { + return nil, err + } + c.cachesToSync = append(c.cachesToSync, proxyInformer.Informer().HasSynced) + + // bootstrap the job if it doesn't exist + c.queue.Add("instance") + + return c, nil +} + +func (c *AzurePathFixController) eventHandler() cache.ResourceEventHandler { + const workQueueKey = "instance" + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) }, + UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) }, + DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) }, + } +} + +func (c *AzurePathFixController) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *AzurePathFixController) processNextWorkItem() bool { + obj, shutdown := c.queue.Get() + if shutdown { + return false + } + defer c.queue.Done(obj) + + klog.V(4).Infof("get event from workqueue: %s", obj) + + // the workqueueKey we reference here is different than the one we use in eventHandler + // use that to identify we are processing an item that was added back to the queue + // can remove if not useful but curious why this didn't seem to be working for the + // caches not synced error + if obj == workqueueKey { + klog.V(2).Infof("AzurePathFixController processing requeued item %s", obj) + } + + if err := c.sync(); err != nil { + c.queue.AddRateLimited(workqueueKey) + klog.Errorf("AzurePathFixController: unable to sync: %s, requeuing", err) + } else { + c.queue.Forget(obj) + klog.V(4).Infof("AzurePathFixController: event from workqueue successfully processed") + } + return true +} + +func (c *AzurePathFixController) sync() error { + // this controller was made to run specifically on Azure, + // so if we detect a different cloud, skip it. + infra, err := util.GetInfrastructure(c.infrastructureLister) + if err != nil { + return fmt.Errorf("unable to get infrastructure object: %s", err) + } + if infra.Status.PlatformStatus.Type != configapiv1.AzurePlatformType { + return nil + } + + ctx := context.TODO() + imageRegistryConfig, err := c.imageRegistryConfigLister.Get("cluster") + if err != nil { + return err + } + azureStorage := imageRegistryConfig.Status.Storage.Azure + if azureStorage == nil || len(azureStorage.AccountName) == 0 { + return fmt.Errorf("storage account not yet provisioned") + } + if azureStorage == nil || len(azureStorage.Container) == 0 { + return fmt.Errorf("storage container not yet provisioned") + } + + gen := resource.NewGeneratorAzurePathFixJob( + c.jobLister, + c.batchClient, + c.secretLister, + c.infrastructureLister, + c.proxyLister, + imageRegistryConfig, + c.kubeconfig, + ) + + progressingCondition := operatorv1.OperatorCondition{ + Type: "AzurePathFixProgressing", + Status: operatorv1.ConditionUnknown, + } + degradedCondition := operatorv1.OperatorCondition{ + Type: "AzurePathFixControllerDegraded", + Status: operatorv1.ConditionFalse, + Reason: "AsExpected", + } + + jobObj, err := gen.Get() + if errors.IsNotFound(err) { + progressingCondition.Status = operatorv1.ConditionTrue + progressingCondition.Reason = "NotFound" + progressingCondition.Message = "The job does not exist" + } else if err != nil { + progressingCondition.Reason = "Unknown" + progressingCondition.Message = fmt.Sprintf("Unable to check job progress: %s", err) + } else { + job := jobObj.(*batchv1.Job) + jobProgressing := true + var jobCondition batchv1.JobConditionType + for _, cond := range job.Status.Conditions { + if (cond.Type == batchv1.JobComplete || cond.Type == batchv1.JobFailed) && cond.Status == corev1.ConditionTrue { + jobProgressing = false + jobCondition = cond.Type + break + } + } + + if jobProgressing { + progressingCondition.Reason = "Migrating" + progressingCondition.Message = fmt.Sprintf("Azure path fix job is progressing: %d pods active; %d pods failed", job.Status.Active, job.Status.Failed) + progressingCondition.Status = operatorv1.ConditionTrue + } + + if jobCondition == batchv1.JobComplete { + progressingCondition.Reason = "AsExpected" + progressingCondition.Status = operatorv1.ConditionFalse + } + + if jobCondition == batchv1.JobFailed { + progressingCondition.Reason = "Failed" + progressingCondition.Status = operatorv1.ConditionFalse + degradedCondition.Reason = "Failed" + degradedCondition.Status = operatorv1.ConditionTrue + + // if the job still executing (i.e there are attempts left before backoff), + // we don't want to report degraded, but we let users know that some attempt(s) + // failed, and the job is still progressing. + + requirement, err := labels.NewRequirement("batch.kubernetes.io/job-name", selection.Equals, []string{gen.GetName()}) + if err != nil { + // this is extremely unlikely to happen + return err + } + pods, err := c.podLister.List(labels.NewSelector().Add(*requirement)) + if err != nil { + // there's not much that can be done about an error here, + // the next reconciliation(s) are likely to succeed. + return err + } + + if len(pods) == 0 { + msg := "Migration failed but no job pods are left to inspect" + progressingCondition.Message = msg + degradedCondition.Message = msg + } + + if len(pods) > 0 { + mostRecentPod := pods[0] + for _, pod := range pods { + if mostRecentPod.CreationTimestamp.Before(&pod.CreationTimestamp) { + mostRecentPod = pod + } + } + + if len(mostRecentPod.Status.ContainerStatuses) > 0 { + status := mostRecentPod.Status.ContainerStatuses[0] + msg := fmt.Sprintf("Migration failed: %s", status.State.Terminated.Message) + progressingCondition.Message = msg + degradedCondition.Message = msg + } + } + } + } + + err = resource.ApplyMutator(gen) + if err != nil { + _, _, updateError := v1helpers.UpdateStatus( + ctx, + c.operatorClient, + v1helpers.UpdateConditionFn(progressingCondition), + v1helpers.UpdateConditionFn(degradedCondition), + ) + return utilerrors.NewAggregate([]error{err, updateError}) + } + + _, _, err = v1helpers.UpdateStatus( + ctx, + c.operatorClient, + v1helpers.UpdateConditionFn(progressingCondition), + v1helpers.UpdateConditionFn(degradedCondition), + ) + return err +} + +func (c *AzurePathFixController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Infof("Starting AzurePathFixController") + if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) { + return + } + + go wait.Until(c.runWorker, time.Second, stopCh) + + klog.Infof("Started AzurePathFixController") + <-stopCh + klog.Infof("Shutting down AzurePathFixController") +} diff --git a/pkg/operator/starter.go b/pkg/operator/starter.go index c12412160..2b1d6d778 100644 --- a/pkg/operator/starter.go +++ b/pkg/operator/starter.go @@ -168,6 +168,21 @@ func RunOperator(ctx context.Context, kubeconfig *restclient.Config) error { return err } + azurePathFixController, err := NewAzurePathFixController( + kubeconfig, + kubeClient.BatchV1(), + configOperatorClient, + kubeInformers.Batch().V1().Jobs(), + imageregistryInformers.Imageregistry().V1().Configs(), + configInformers.Config().V1().Infrastructures(), + kubeInformers.Core().V1().Secrets(), + configInformers.Config().V1().Proxies(), + kubeInformers.Core().V1().Pods(), + ) + if err != nil { + return err + } + metricsController := NewMetricsController(imageInformers.Image().V1().ImageStreams()) kubeInformers.Start(ctx.Done()) @@ -187,6 +202,7 @@ func RunOperator(ctx context.Context, kubeconfig *restclient.Config) error { go imagePrunerController.Run(ctx.Done()) go loggingController.Run(ctx, 1) go azureStackCloudController.Run(ctx) + go azurePathFixController.Run(ctx.Done()) go metricsController.Run(ctx) <-ctx.Done() diff --git a/pkg/resource/azurepathfixjob.go b/pkg/resource/azurepathfixjob.go new file mode 100644 index 000000000..78e5e6e15 --- /dev/null +++ b/pkg/resource/azurepathfixjob.go @@ -0,0 +1,241 @@ +package resource + +import ( + "context" + "fmt" + "os" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + kcorev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + batchset "k8s.io/client-go/kubernetes/typed/batch/v1" + batchlisters "k8s.io/client-go/listers/batch/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + restclient "k8s.io/client-go/rest" + + configapiv1 "github.com/openshift/api/config/v1" + imageregistryv1 "github.com/openshift/api/imageregistry/v1" + configlisters "github.com/openshift/client-go/config/listers/config/v1" + "github.com/openshift/cluster-image-registry-operator/pkg/defaults" + "github.com/openshift/cluster-image-registry-operator/pkg/storage/azure" +) + +var _ Mutator = &generatorAzurePathFixJob{} + +type generatorAzurePathFixJob struct { + lister batchlisters.JobNamespaceLister + secretLister corev1listers.SecretNamespaceLister + infrastructureLister configlisters.InfrastructureLister + proxyLister configlisters.ProxyLister + client batchset.BatchV1Interface + cr *imageregistryv1.Config + kubeconfig *restclient.Config +} + +func NewGeneratorAzurePathFixJob( + lister batchlisters.JobNamespaceLister, + client batchset.BatchV1Interface, + secretLister corev1listers.SecretNamespaceLister, + infrastructureLister configlisters.InfrastructureLister, + proxyLister configlisters.ProxyLister, + cr *imageregistryv1.Config, + kubeconfig *restclient.Config, +) *generatorAzurePathFixJob { + return &generatorAzurePathFixJob{ + lister: lister, + client: client, + cr: cr, + infrastructureLister: infrastructureLister, + secretLister: secretLister, + proxyLister: proxyLister, + kubeconfig: kubeconfig, + } +} + +func (gapfj *generatorAzurePathFixJob) Type() runtime.Object { + return &batchv1.Job{} +} + +func (gapfj *generatorAzurePathFixJob) GetNamespace() string { + return defaults.ImageRegistryOperatorNamespace +} + +func (gapfj *generatorAzurePathFixJob) GetName() string { + return "azure-path-fix" +} + +func (gapfj *generatorAzurePathFixJob) expected() (runtime.Object, error) { + azureCfg, err := azure.GetConfig(gapfj.secretLister, gapfj.infrastructureLister) + if err != nil { + return nil, err + } + clusterProxy, err := gapfj.proxyLister.Get(defaults.ClusterProxyResourceName) + if errors.IsNotFound(err) { + clusterProxy = &configapiv1.Proxy{} + } else if err != nil { + // TODO: should we report Degraded? + return nil, fmt.Errorf("unable to get cluster proxy configuration: %v", err) + } + + azureStorage := gapfj.cr.Status.Storage.Azure + if azureStorage == nil { + return nil, fmt.Errorf("storage not yet provisioned") + } + + envs := []corev1.EnvVar{ + {Name: "AZURE_STORAGE_ACCOUNT_NAME", Value: azureStorage.AccountName}, + {Name: "AZURE_CONTAINER_NAME", Value: azureStorage.Container}, + {Name: "AZURE_CLIENT_ID", Value: azureCfg.ClientID}, + {Name: "AZURE_TENANT_ID", Value: azureCfg.TenantID}, + {Name: "AZURE_CLIENT_SECRET", Value: azureCfg.ClientSecret}, + {Name: "AZURE_FEDERATED_TOKEN_FILE", Value: azureCfg.FederatedTokenFile}, + {Name: "AZURE_ACCOUNTKEY", ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: defaults.ImageRegistryPrivateConfiguration, + }, + Key: "REGISTRY_STORAGE_AZURE_ACCOUNTKEY", + }, + }}, + } + + if len(azureStorage.CloudName) > 0 { + envs = append(envs, corev1.EnvVar{Name: "AZURE_ENVIRONMENT", Value: azureStorage.CloudName}) + } + + if gapfj.cr.Spec.Proxy.HTTP != "" { + envs = append(envs, corev1.EnvVar{Name: "HTTP_PROXY", Value: gapfj.cr.Spec.Proxy.HTTP}) + } else if clusterProxy.Status.HTTPProxy != "" { + envs = append(envs, corev1.EnvVar{Name: "HTTP_PROXY", Value: clusterProxy.Status.HTTPProxy}) + } + + if gapfj.cr.Spec.Proxy.HTTPS != "" { + envs = append(envs, corev1.EnvVar{Name: "HTTPS_PROXY", Value: gapfj.cr.Spec.Proxy.HTTPS}) + } else if clusterProxy.Status.HTTPSProxy != "" { + envs = append(envs, corev1.EnvVar{Name: "HTTPS_PROXY", Value: clusterProxy.Status.HTTPSProxy}) + } + + if gapfj.cr.Spec.Proxy.NoProxy != "" { + envs = append(envs, corev1.EnvVar{Name: "NO_PROXY", Value: gapfj.cr.Spec.Proxy.NoProxy}) + } else if clusterProxy.Status.NoProxy != "" { + envs = append(envs, corev1.EnvVar{Name: "NO_PROXY", Value: clusterProxy.Status.NoProxy}) + } + + // Cluster trusted certificate authorities - mount to /usr/share/pki/ca-trust-source/ to add + // CAs as low-priority trust sources. Registry runs update-ca-trust extract on startup, which + // merges the registry CAs with the cluster's trusted CAs into a single CA bundle. + // + // See man update-ca-trust for more information. + optional := true + trustedCAVolume := corev1.Volume{ + Name: "trusted-ca", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: defaults.TrustedCAName, + }, + // Trust bundle is in PEM format - needs to be mounted to /anchors so that + // update-ca-trust extract knows that these CAs should always be trusted. + // This also ensures that no other low-priority trust is present in the container. + // + // See man update-ca-trust for more information. + Items: []corev1.KeyToPath{ + { + Key: "ca-bundle.crt", + Path: "anchors/ca-bundle.crt", + }, + }, + Optional: &optional, + }, + }, + } + trustedCAMount := corev1.VolumeMount{ + Name: trustedCAVolume.Name, + MountPath: "/usr/share/pki/ca-trust-source", + } + caTrustExtractedVolume := corev1.Volume{ + Name: "ca-trust-extracted", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + } + caTrustExtractedMount := corev1.VolumeMount{ + Name: "ca-trust-extracted", + MountPath: "/etc/pki/ca-trust/extracted", + } + + backoffLimit := int32(0) + cj := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: gapfj.GetName(), + Namespace: gapfj.GetNamespace(), + }, + Spec: batchv1.JobSpec{ + BackoffLimit: &backoffLimit, + Template: kcorev1.PodTemplateSpec{ + Spec: kcorev1.PodSpec{ + RestartPolicy: kcorev1.RestartPolicyNever, + ServiceAccountName: defaults.ServiceAccountName, + PriorityClassName: "system-cluster-critical", + Containers: []kcorev1.Container{ + { + Image: os.Getenv("OPERATOR_IMAGE"), + Resources: kcorev1.ResourceRequirements{ + Requests: kcorev1.ResourceList{ + kcorev1.ResourceCPU: resource.MustParse("100m"), + kcorev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + TerminationMessagePolicy: kcorev1.TerminationMessageFallbackToLogsOnError, + Env: envs, + VolumeMounts: []corev1.VolumeMount{trustedCAMount, caTrustExtractedMount}, + Name: gapfj.GetName(), + Command: []string{"/bin/sh"}, + Args: []string{ + "-c", + "mkdir -p /etc/pki/ca-trust/extracted/edk2 /etc/pki/ca-trust/extracted/java /etc/pki/ca-trust/extracted/openssl /etc/pki/ca-trust/extracted/pem && update-ca-trust extract && /usr/bin/move-blobs", + }, + }, + }, + Volumes: []corev1.Volume{trustedCAVolume, caTrustExtractedVolume}, + }, + }, + }, + } + + return cj, nil +} + +func (gapfj *generatorAzurePathFixJob) Get() (runtime.Object, error) { + return gapfj.lister.Get(gapfj.GetName()) +} + +func (gapfj *generatorAzurePathFixJob) Create() (runtime.Object, error) { + return commonCreate(gapfj, func(obj runtime.Object) (runtime.Object, error) { + return gapfj.client.Jobs(gapfj.GetNamespace()).Create( + context.TODO(), obj.(*batchv1.Job), metav1.CreateOptions{}, + ) + }) +} + +func (gapfj *generatorAzurePathFixJob) Update(o runtime.Object) (runtime.Object, bool, error) { + return commonUpdate(gapfj, o, func(obj runtime.Object) (runtime.Object, error) { + return gapfj.client.Jobs(gapfj.GetNamespace()).Update( + context.TODO(), obj.(*batchv1.Job), metav1.UpdateOptions{}, + ) + }) +} + +func (gapfj *generatorAzurePathFixJob) Delete(opts metav1.DeleteOptions) error { + return gapfj.client.Jobs(gapfj.GetNamespace()).Delete( + context.TODO(), gapfj.GetName(), opts, + ) +} + +func (gapfj *generatorAzurePathFixJob) Owned() bool { + return true +}