Skip to content

Commit

Permalink
Fix conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
bertinatto committed May 20, 2021
1 parent f606681 commit 09a536a
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 7 deletions.
9 changes: 9 additions & 0 deletions assets/csi_controller_deployment.yaml
Expand Up @@ -29,6 +29,15 @@ spec:
# TODO: measure on a real cluster
cpu: 10m
memory: 50Mi
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: csi-snapshot-controller
topologyKey: kubernetes.io/hostname
priorityClassName: "system-cluster-critical"
nodeSelector:
node-role.kubernetes.io/master: ""
Expand Down
10 changes: 9 additions & 1 deletion assets/webhook_deployment.yaml
Expand Up @@ -35,6 +35,15 @@ spec:
requests:
cpu: 10m
memory: 20Mi
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: csi-snapshot-webhook
topologyKey: kubernetes.io/hostname
priorityClassName: "system-cluster-critical"
restartPolicy: Always
nodeSelector:
Expand All @@ -55,4 +64,3 @@ spec:
- key: node-role.kubernetes.io/master
operator: Exists
effect: "NoSchedule"

19 changes: 18 additions & 1 deletion pkg/generated/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/operator/operator.go
Expand Up @@ -15,7 +15,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformersv1 "k8s.io/client-go/informers/apps/v1"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelistersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -57,6 +59,7 @@ type csiSnapshotOperator struct {

syncHandler func() error

nodeLister corelistersv1.NodeLister
crdLister apiextlistersv1.CustomResourceDefinitionLister
crdListerSynced cache.InformerSynced
crdClient apiextclient.Interface
Expand All @@ -72,6 +75,7 @@ type csiSnapshotOperator struct {

func NewCSISnapshotControllerOperator(
client operatorclient.OperatorClient,
nodeInformer coreinformersv1.NodeInformer,
crdInformer apiextinformersv1.CustomResourceDefinitionInformer,
crdClient apiextclient.Interface,
deployInformer appsinformersv1.DeploymentInformer,
Expand All @@ -84,6 +88,7 @@ func NewCSISnapshotControllerOperator(
) *csiSnapshotOperator {
csiOperator := &csiSnapshotOperator{
client: client,
nodeLister: nodeInformer.Lister(),
crdClient: crdClient,
kubeClient: kubeClient,
versionGetter: versionGetter,
Expand Down
5 changes: 4 additions & 1 deletion pkg/operator/starter.go
Expand Up @@ -67,6 +67,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller

operator := NewCSISnapshotControllerOperator(
*operatorClient,
ctrlctx.KubeNamespacedInformerFactory.Core().V1().Nodes(),
ctrlctx.APIExtInformerFactory.Apiextensions().V1().CustomResourceDefinitions(),
ctrlctx.ClientBuilder.APIExtClientOrDie(targetName),
ctrlctx.KubeNamespacedInformerFactory.Apps().V1().Deployments(),
Expand All @@ -78,7 +79,9 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
os.Getenv(operandImageEnvName),
)

webhookOperator := webhookdeployment.NewCSISnapshotWebhookController(*operatorClient,
webhookOperator := webhookdeployment.NewCSISnapshotWebhookController(
*operatorClient,
ctrlctx.KubeNamespacedInformerFactory.Core().V1().Nodes(),
ctrlctx.KubeNamespacedInformerFactory.Apps().V1().Deployments(),
ctrlctx.KubeNamespacedInformerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations(),
kubeClient,
Expand Down
21 changes: 17 additions & 4 deletions pkg/operator/sync.go
Expand Up @@ -8,6 +8,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"

operatorv1 "github.com/openshift/api/operator/v1"
Expand Down Expand Up @@ -108,9 +109,12 @@ func (c *csiSnapshotOperator) checkAlphaCRDs() error {
}

func (c *csiSnapshotOperator) syncDeployment(instance *operatorv1.CSISnapshotController) (*appsv1.Deployment, error) {
deploy := c.getExpectedDeployment(instance)
deploy, err := c.getExpectedDeployment(instance)
if err != nil {
return nil, err
}

deploy, _, err := resourceapply.ApplyDeployment(
deploy, _, err = resourceapply.ApplyDeployment(
c.kubeClient.AppsV1(),
c.eventRecorder,
deploy,
Expand All @@ -121,7 +125,7 @@ func (c *csiSnapshotOperator) syncDeployment(instance *operatorv1.CSISnapshotCon
return deploy, nil
}

func (c *csiSnapshotOperator) getExpectedDeployment(instance *operatorv1.CSISnapshotController) *appsv1.Deployment {
func (c *csiSnapshotOperator) getExpectedDeployment(instance *operatorv1.CSISnapshotController) (*appsv1.Deployment, error) {
deployment := resourceread.ReadDeploymentV1OrDie(generated.MustAsset(deployment))
deployment.Spec.Template.Spec.Containers[0].Image = c.csiSnapshotControllerImage

Expand All @@ -132,7 +136,16 @@ func (c *csiSnapshotOperator) getExpectedDeployment(instance *operatorv1.CSISnap
}
}

return deployment
// Set the number of replicas according to the number of nodes available
nodeSelector := deployment.Spec.Template.Spec.NodeSelector
nodes, err := c.nodeLister.List(labels.SelectorFromSet(nodeSelector))
if err != nil {
return nil, err
}
replicas := int32(len(nodes))
deployment.Spec.Replicas = &replicas

return deployment, nil
}

func getLogLevel(logLevel operatorv1.LogLevel) int {
Expand Down
17 changes: 17 additions & 0 deletions pkg/operator/webhookdeployment/webhook.go
Expand Up @@ -19,17 +19,21 @@ import (
admissionv1 "k8s.io/api/admissionregistration/v1"
appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
admissionnformersv1 "k8s.io/client-go/informers/admissionregistration/v1"
appsinformersv1 "k8s.io/client-go/informers/apps/v1"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelistersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/workqueue"
)

type csiSnapshotWebhookController struct {
client operatorclient.OperatorClient
kubeClient kubernetes.Interface
nodeLister corelistersv1.NodeLister
eventRecorder events.Recorder

queue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -59,6 +63,7 @@ func init() {
// NewCSISnapshotWebhookController returns a controller that creates and manages Deployment with CSI snapshot webhook.
func NewCSISnapshotWebhookController(
client operatorclient.OperatorClient,
nodeInformer coreinformersv1.NodeInformer,
deployInformer appsinformersv1.DeploymentInformer,
webhookInformer admissionnformersv1.ValidatingWebhookConfigurationInformer,
kubeClient kubernetes.Interface,
Expand All @@ -68,6 +73,7 @@ func NewCSISnapshotWebhookController(
c := &csiSnapshotWebhookController{
client: client,
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
eventRecorder: eventRecorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshot-controller"),
csiSnapshotWebhookImage: csiSnapshotWebhookImage,
Expand Down Expand Up @@ -97,6 +103,17 @@ func (c *csiSnapshotWebhookController) sync(ctx context.Context, syncCtx factory
// This will set Degraded condition
return err
}

// Set the number of replicas according to the number of nodes available
nodeSelector := deployment.Spec.Template.Spec.NodeSelector
nodes, err := c.nodeLister.List(labels.SelectorFromSet(nodeSelector))
if err != nil {
// This will set Degraded condition
return err
}
replicas := int32(len(nodes))
deployment.Spec.Replicas = &replicas

lastGeneration := resourcemerge.ExpectedDeploymentGeneration(deployment, opStatus.Generations)
deployment, _, err = resourceapply.ApplyDeployment(c.kubeClient.AppsV1(), syncCtx.Recorder(), deployment, lastGeneration)
if err != nil {
Expand Down

0 comments on commit 09a536a

Please sign in to comment.