Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1805872: 4.4 cherry-pick of switch from daemonset to deployment #325

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,23 +1,32 @@
apiVersion: apps/v1
kind: DaemonSet
kind: Deployment
metadata:
namespace: openshift-apiserver
name: apiserver
labels:
app: openshift-apiserver
apiserver: "true"
spec:
updateStrategy:
type: RollingUpdate
# The number of replicas will be set in code to the number of master nodes.
selector:
matchLabels:
app: openshift-apiserver
# Need to vary the app label from that used by the legacy
# daemonset ('openshift-apiserver') to avoid the legacy
# daemonset and its replacement deployment trying to try to
# manage the same pods.
#
# It's also necessary to use different labeling to ensure, via
# anti-affinity, at most one deployment-managed pod on each
# master node. Without label differentiation, anti-affinity
# would prevent a deployment-managed pod from running on a node
# that was already running a daemonset-managed pod.
app: openshift-apiserver-a
apiserver: "true"
template:
metadata:
name: openshift-apiserver
labels:
app: openshift-apiserver
app: openshift-apiserver-a
apiserver: "true"
spec:
serviceAccountName: openshift-apiserver-sa
Expand Down Expand Up @@ -119,4 +128,20 @@ spec:
nodeSelector:
node-role.kubernetes.io/master: ""
tolerations:
- operator: Exists
# Ensure pod can be scheduled on master nodes
- key: "node-role.kubernetes.io/master"
operator: "Exists"
effect: "NoSchedule"
# Ensure pod can be evicted if the node is unreachable
- key: "node.kubernetes.io/unreachable"
operator: "Exists"
effect: "NoExecute"
tolerationSeconds: 120
# Ensure scheduling is delayed until node readiness
# (i.e. network operator configures CNI on the node)
- key: "node.kubernetes.io/not-ready"
operator: "Exists"
effect: "NoExecute"
tolerationSeconds: 120
# Anti-affinity is configured in code due to the need to scope
# selection to the computed pod template.
10 changes: 5 additions & 5 deletions pkg/operator/apiservicecontroller/apiservice_controller.go
Expand Up @@ -220,20 +220,20 @@ func (c *APIServiceController) runWorker() {
}

func (c *APIServiceController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)
defer c.queue.Done(key)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
c.queue.AddRateLimited(key)

return true
}
Expand Down
Expand Up @@ -359,7 +359,6 @@ func TestAvailableStatus(t *testing.T) {
expectedMessages []string
existingAPIServices []runtime.Object
apiServiceReactor kubetesting.ReactionFunc
daemonReactor kubetesting.ReactionFunc
}{
{
name: "Default",
Expand Down
Expand Up @@ -12,26 +12,26 @@ import (
"github.com/openshift/cluster-openshift-apiserver-operator/pkg/operator/operatorclient"
)

// DaemonSetNodeProvider returns the node list from nodes matching the node selector of a DaemonSet
type DaemonSetNodeProvider struct {
TargetNamespaceDaemonSetInformer appsv1informers.DaemonSetInformer
NodeInformer corev1informers.NodeInformer
// DeploymentNodeProvider returns the node list from nodes matching the node selector of a Deployment
type DeploymentNodeProvider struct {
TargetNamespaceDeploymentInformer appsv1informers.DeploymentInformer
NodeInformer corev1informers.NodeInformer
}

var (
_ encryptiondeployer.MasterNodeProvider = &DaemonSetNodeProvider{}
_ encryptiondeployer.MasterNodeProvider = &DeploymentNodeProvider{}
)

func (p DaemonSetNodeProvider) MasterNodeNames() ([]string, error) {
ds, err := p.TargetNamespaceDaemonSetInformer.Lister().DaemonSets(operatorclient.TargetNamespace).Get("apiserver")
func (p DeploymentNodeProvider) MasterNodeNames() ([]string, error) {
deploy, err := p.TargetNamespaceDeploymentInformer.Lister().Deployments(operatorclient.TargetNamespace).Get("apiserver")
if err != nil && errors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}

nodes, err := p.NodeInformer.Lister().List(labels.SelectorFromSet(ds.Spec.Template.Spec.NodeSelector))
nodes, err := p.NodeInformer.Lister().List(labels.SelectorFromSet(deploy.Spec.Template.Spec.NodeSelector))
if err != nil {
return nil, err
}
Expand All @@ -44,12 +44,12 @@ func (p DaemonSetNodeProvider) MasterNodeNames() ([]string, error) {
return ret, nil
}

func (p DaemonSetNodeProvider) AddEventHandler(handler cache.ResourceEventHandler) []cache.InformerSynced {
p.TargetNamespaceDaemonSetInformer.Informer().AddEventHandler(handler)
func (p DeploymentNodeProvider) AddEventHandler(handler cache.ResourceEventHandler) []cache.InformerSynced {
p.TargetNamespaceDeploymentInformer.Informer().AddEventHandler(handler)
p.NodeInformer.Informer().AddEventHandler(handler)

return []cache.InformerSynced{
p.TargetNamespaceDaemonSetInformer.Informer().HasSynced,
p.TargetNamespaceDeploymentInformer.Informer().HasSynced,
p.NodeInformer.Informer().HasSynced,
}
}
24 changes: 12 additions & 12 deletions pkg/operator/nsfinalizercontroller/finalizer_controller.go
Expand Up @@ -28,7 +28,7 @@ type finalizerController struct {

namespaceGetter v1.NamespacesGetter
podLister corev1listers.PodLister
dsLister appsv1lister.DaemonSetLister
deployLister appsv1lister.DeploymentLister
eventRecorder events.Recorder

preRunHasSynced []cache.InformerSynced
Expand All @@ -43,7 +43,7 @@ type finalizerController struct {
// so it deletes the rest and requeues. The ns controller starts again and does a complete discovery and.... fails. The
// failure means it refuses to complete the cleanup. Now, we don't actually want to delete the resoruces from our
// aggregated API, only the server plus config if we remove the apiservices to unstick it, GC will start cleaning
// everything. For now, we can unbork 4.0, but clearing the finalizer after the pod and daemonset we created are gone.
// everything. For now, we can unbork 4.0, but clearing the finalizer after the pod and deployment we created are gone.
func NewFinalizerController(
namespaceName string,
kubeInformersForTargetNamespace kubeinformers.SharedInformerFactory,
Expand All @@ -57,18 +57,18 @@ func NewFinalizerController(

namespaceGetter: namespaceGetter,
podLister: kubeInformersForTargetNamespace.Core().V1().Pods().Lister(),
dsLister: kubeInformersForTargetNamespace.Apps().V1().DaemonSets().Lister(),
deployLister: kubeInformersForTargetNamespace.Apps().V1().Deployments().Lister(),
eventRecorder: eventRecorder.WithComponentSuffix("finalizer-controller"),

preRunHasSynced: []cache.InformerSynced{
kubeInformersForTargetNamespace.Core().V1().Pods().Informer().HasSynced,
kubeInformersForTargetNamespace.Apps().V1().DaemonSets().Informer().HasSynced,
kubeInformersForTargetNamespace.Apps().V1().Deployments().Informer().HasSynced,
},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fullname),
}

kubeInformersForTargetNamespace.Core().V1().Pods().Informer().AddEventHandler(c.eventHandler())
kubeInformersForTargetNamespace.Apps().V1().DaemonSets().Informer().AddEventHandler(c.eventHandler())
kubeInformersForTargetNamespace.Apps().V1().Deployments().Informer().AddEventHandler(c.eventHandler())

return c
}
Expand Down Expand Up @@ -100,11 +100,11 @@ func (c finalizerController) sync() error {
if len(pods) > 0 {
return nil
}
dses, err := c.dsLister.DaemonSets(c.namespaceName).List(labels.Everything())
deployments, err := c.deployLister.Deployments(c.namespaceName).List(labels.Everything())
if err != nil {
return err
}
if len(dses) > 0 {
if len(deployments) > 0 {
return nil
}

Expand Down Expand Up @@ -153,20 +153,20 @@ func (c *finalizerController) runWorker() {
}

func (c *finalizerController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)
defer c.queue.Done(key)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
c.queue.AddRateLimited(key)

return true
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/operator/prunecontroller/prune_controller.go
Expand Up @@ -237,20 +237,20 @@ func (c *PruneController) runWorker() {
}

func (c *PruneController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)
defer c.queue.Done(key)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
c.queue.AddRateLimited(key)

return true
}
Expand Down
74 changes: 71 additions & 3 deletions pkg/operator/starter.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/openshift/library-go/pkg/operator/encryption"
"github.com/openshift/library-go/pkg/operator/encryption/controllers/migrators"
encryptiondeployer "github.com/openshift/library-go/pkg/operator/encryption/deployer"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/genericoperatorclient"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/revisioncontroller"
Expand All @@ -30,7 +31,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
apiregistrationinformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -133,6 +136,8 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
}
versionRecorder.SetVersion("operator", os.Getenv("OPERATOR_IMAGE_VERSION"))

nodeInformer := kubeInformersForNamespaces.InformersFor("").Core().V1().Nodes()

workloadController := workloadcontroller.NewWorkloadController(
os.Getenv("IMAGE"), os.Getenv("OPERATOR_IMAGE_VERSION"), os.Getenv("OPERATOR_IMAGE"),
operatorClient,
Expand All @@ -143,6 +148,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
kubeInformersForNamespaces.InformersFor(operatorclient.GlobalUserSpecifiedConfigNamespace),
apiregistrationInformers,
configInformers,
nodeInformer,
operatorConfigClient.OperatorV1(),
configClient.ConfigV1(),
kubeClient,
Expand Down Expand Up @@ -204,9 +210,9 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
controllerConfig.EventRecorder,
)

nodeProvider := DaemonSetNodeProvider{
TargetNamespaceDaemonSetInformer: kubeInformersForNamespaces.InformersFor(operatorclient.TargetNamespace).Apps().V1().DaemonSets(),
NodeInformer: kubeInformersForNamespaces.InformersFor("").Core().V1().Nodes(),
nodeProvider := DeploymentNodeProvider{
TargetNamespaceDeploymentInformer: kubeInformersForNamespaces.InformersFor(operatorclient.TargetNamespace).Apps().V1().Deployments(),
NodeInformer: nodeInformer,
}

deployer, err := encryptiondeployer.NewRevisionLabelPodDeployer("revision", operatorclient.TargetNamespace, kubeInformersForNamespaces, resourceSyncController, kubeClient.CoreV1(), kubeClient.CoreV1(), nodeProvider)
Expand Down Expand Up @@ -251,6 +257,10 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
"Progressing",
// in 4.1.0-4.3.0 this was used for indicating the apiserver daemonset was available
"Available",
// in 4.1.0-4.3.z this was used for indicating the conditions of the apiserver daemonset.
"APIServerDaemonSetAvailable",
"APIServerDaemonSetProgressing",
"APIServerDaemonSetDegraded",
},
operatorClient,
controllerConfig.EventRecorder,
Expand All @@ -260,6 +270,8 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
controllerConfig.Server.Handler.NonGoRestfulMux.Handle("/debug/controllers/resourcesync", debugHandler)
}

ensureDaemonSetCleanup(kubeClient, ctx, controllerConfig.EventRecorder)

operatorConfigInformers.Start(ctx.Done())
kubeInformersForNamespaces.Start(ctx.Done())
apiregistrationInformers.Start(ctx.Done())
Expand Down Expand Up @@ -331,3 +343,59 @@ func apiServicesReferences() []configv1.ObjectReference {
}
return ret
}

// ensureDaemonSetCleanup continually ensures the removal of the daemonset
// used to manage apiserver pods in releases prior to 4.5. The daemonset is
// removed once the deployment now managing apiserver pods reports at least
// one pod available.
func ensureDaemonSetCleanup(kubeClient *kubernetes.Clientset, ctx context.Context, eventRecorder events.Recorder) {
// daemonset and deployment both use the same name
resourceName := "apiserver"

dsClient := kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace)
deployClient := kubeClient.AppsV1().Deployments(operatorclient.TargetNamespace)

go wait.UntilWithContext(ctx, func(_ context.Context) {
// This function isn't expected to take long enough to suggest
// checking that the context is done. The wait method will do that
// checking.

// Check whether the legacy daemonset exists and is not marked for deletion
ds, err := dsClient.Get(resourceName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// Done - daemonset does not exist
return
}
if err != nil {
klog.Warningf("Error retrieving legacy daemonset: %v", err)
return
}
if ds.ObjectMeta.DeletionTimestamp != nil {
// Done - daemonset has been marked for deletion
return
}

// Check that the deployment managing the apiserver pods has at last one available replica
deploy, err := deployClient.Get(resourceName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// No available replicas if the deployment doesn't exist
return
}
if err != nil {
klog.Warningf("Error retrieving the deployment that manages apiserver pods: %v", err)
return
}
if deploy.Status.AvailableReplicas == 0 {
eventRecorder.Warning("LegacyDaemonSetCleanup", "the deployment replacing the daemonset does not have available replicas yet")
return
}

// Safe to remove legacy daemonset since the deployment has at least one available replica
err = dsClient.Delete(resourceName, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
klog.Warningf("Failed to delete legacy daemonset: %v", err)
return
}
eventRecorder.Event("LegacyDaemonSetCleanup", "legacy daemonset has been removed")
}, time.Minute)
}