Skip to content

Commit

Permalink
Deploy one replica per master node
Browse files Browse the repository at this point in the history
Currently, we deploy only one replica for both operands
(webhook and controller). This apparently leads to the
operator going unavailable when draining the node where
the operand is running on.

To overcome this limitation we're setting the number of
replicas to the number of master nodes available. In
addition to that, we set anti-affinity rules to avoid two
replicas running on the same master node.

Also, we set maxSurge=0 and maxUnavailable=0 to make sure
nodes are drained one at a time and to prevent more than
one pod running on the same node when nodePorts or
hostNetwork are used.
  • Loading branch information
bertinatto committed May 21, 2021
1 parent b5b65e3 commit a54dfec
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 18 deletions.
15 changes: 14 additions & 1 deletion assets/csi_controller_deployment.yaml
Expand Up @@ -5,10 +5,14 @@ metadata:
namespace: openshift-cluster-storage-operator
spec:
serviceName: "csi-snapshot-controller"
replicas: 1
selector:
matchLabels:
app: csi-snapshot-controller
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 0
template:
metadata:
annotations:
Expand All @@ -29,6 +33,15 @@ spec:
# TODO: measure on a real cluster
cpu: 10m
memory: 50Mi
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: csi-snapshot-controller
topologyKey: kubernetes.io/hostname
priorityClassName: "system-cluster-critical"
nodeSelector:
node-role.kubernetes.io/master: ""
Expand Down
16 changes: 14 additions & 2 deletions assets/webhook_deployment.yaml
Expand Up @@ -5,10 +5,14 @@ metadata:
namespace: openshift-cluster-storage-operator
spec:
serviceName: "csi-snapshot-webhook"
replicas: 1
selector:
matchLabels:
app: csi-snapshot-webhook
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 0
template:
metadata:
annotations:
Expand All @@ -35,6 +39,15 @@ spec:
requests:
cpu: 10m
memory: 20Mi
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: csi-snapshot-webhook
topologyKey: kubernetes.io/hostname
priorityClassName: "system-cluster-critical"
restartPolicy: Always
nodeSelector:
Expand All @@ -55,4 +68,3 @@ spec:
- key: node-role.kubernetes.io/master
operator: Exists
effect: "NoSchedule"

31 changes: 28 additions & 3 deletions pkg/generated/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/operator/operator.go
Expand Up @@ -15,7 +15,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformersv1 "k8s.io/client-go/informers/apps/v1"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelistersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -57,6 +59,7 @@ type csiSnapshotOperator struct {

syncHandler func() error

nodeLister corelistersv1.NodeLister
crdLister apiextlistersv1.CustomResourceDefinitionLister
crdListerSynced cache.InformerSynced
crdClient apiextclient.Interface
Expand All @@ -72,6 +75,7 @@ type csiSnapshotOperator struct {

func NewCSISnapshotControllerOperator(
client operatorclient.OperatorClient,
nodeInformer coreinformersv1.NodeInformer,
crdInformer apiextinformersv1.CustomResourceDefinitionInformer,
crdClient apiextclient.Interface,
deployInformer appsinformersv1.DeploymentInformer,
Expand All @@ -84,6 +88,7 @@ func NewCSISnapshotControllerOperator(
) *csiSnapshotOperator {
csiOperator := &csiSnapshotOperator{
client: client,
nodeLister: nodeInformer.Lister(),
crdClient: crdClient,
kubeClient: kubeClient,
versionGetter: versionGetter,
Expand All @@ -94,6 +99,7 @@ func NewCSISnapshotControllerOperator(
csiSnapshotControllerImage: csiSnapshotControllerImage,
}

nodeInformer.Informer().AddEventHandler(csiOperator.eventHandler("node"))
crdInformer.Informer().AddEventHandler(csiOperator.eventHandler("crd"))
deployInformer.Informer().AddEventHandler(csiOperator.eventHandler("deployment"))
client.Informer().AddEventHandler(csiOperator.eventHandler("csisnapshotcontroller"))
Expand Down
60 changes: 55 additions & 5 deletions pkg/operator/operator_test.go
Expand Up @@ -2,6 +2,7 @@ package operator

import (
"context"
"fmt"
"sort"
"testing"
"time"
Expand All @@ -16,7 +17,9 @@ import (
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/resource/resourceread"
"github.com/openshift/library-go/pkg/operator/status"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
fakeextapi "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
apiextinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -48,6 +51,7 @@ type testContext struct {
}

type testObjects struct {
nodes []*v1.Node
deployment *appsv1.Deployment
crds []*apiextv1.CustomResourceDefinition
csiSnapshotController *opv1.CSISnapshotController
Expand All @@ -65,14 +69,25 @@ type testReactors struct {

const testVersion = "0.0.1" // Version of the operator for testing purposes (instead of getenv)

var masterNodeLabels = map[string]string{"node-role.kubernetes.io/master": ""}

func newOperator(test operatorTest) *testContext {
// Convert to []runtime.Object
var initialDeployments []runtime.Object
var initialObjects []runtime.Object
if len(test.initialObjects.nodes) == 0 {
test.initialObjects.nodes = []*v1.Node{makeNode("A", masterNodeLabels)}
}
for _, node := range test.initialObjects.nodes {
initialObjects = append(initialObjects, node)
}
if test.initialObjects.deployment != nil {
initialDeployments = []runtime.Object{test.initialObjects.deployment}
initialObjects = append(initialObjects, test.initialObjects.deployment)
}
coreClient := fakecore.NewSimpleClientset(initialDeployments...)
coreClient := fakecore.NewSimpleClientset(initialObjects...)
coreInformerFactory := coreinformers.NewSharedInformerFactory(coreClient, 0 /*no resync */)
for _, node := range test.initialObjects.nodes {
coreInformerFactory.Core().V1().Nodes().Informer().GetIndexer().Add(node)
}
// Fill the informer
if test.initialObjects.deployment != nil {
coreInformerFactory.Apps().V1().Deployments().Informer().GetIndexer().Add(test.initialObjects.deployment)
Expand Down Expand Up @@ -125,6 +140,7 @@ func newOperator(test operatorTest) *testContext {

recorder := events.NewInMemoryRecorder("operator")
op := NewCSISnapshotControllerOperator(client,
coreInformerFactory.Core().V1().Nodes(),
extAPIInformerFactory.Apiextensions().V1().CustomResourceDefinitions(),
extAPIClient,
coreInformerFactory.Apps().V1().Deployments(),
Expand Down Expand Up @@ -670,7 +686,31 @@ func TestSync(t *testing.T) {
crds: addCRDEstablishedRector,
},
},

{
// Deployment replicas is adjusted according to number of node selector
name: "number of replicas is set accordingly",
image: defaultImage,
initialObjects: testObjects{
nodes: []*v1.Node{ // 3 master nodes
makeNode("A", masterNodeLabels),
makeNode("B", masterNodeLabels),
makeNode("C", masterNodeLabels),
},
crds: getCRDs(withEstablishedConditions),
deployment: getDeployment(argsLevel2, defaultImage,
withDeploymentReplicas(1), // just 1 replica
withDeploymentGeneration(1, 1),
withDeploymentStatus(replica1, replica1, replica1)),
csiSnapshotController: csiSnapshotController(withGenerations(1)),
},
expectedObjects: testObjects{
crds: getCRDs(withEstablishedConditions),
deployment: getDeployment(argsLevel2, defaultImage,
withDeploymentReplicas(3), // The operator fixed replica count
withDeploymentGeneration(2, 1), // ... which bumps generation again
withDeploymentStatus(replica1, replica1, replica1)),
},
},
// TODO: more error cases? Deployment creation fails and things like that?
}

Expand Down Expand Up @@ -729,7 +769,8 @@ func TestSync(t *testing.T) {
sanitizeDeployment(actualDeployment)
sanitizeDeployment(test.expectedObjects.deployment)
if !equality.Semantic.DeepEqual(test.expectedObjects.deployment, actualDeployment) {
t.Errorf("Unexpected Deployment %+v content:\n%s", targetName, cmp.Diff(test.expectedObjects.deployment, actualDeployment))
// fmt.Printf("1 -> %+v\n", test.expectedObjects.deployment.Annotations)
t.Fatalf("Unexpected Deployment %+v content:\n%s", targetName, cmp.Diff(test.expectedObjects.deployment, actualDeployment))
}
}
// Check expectedObjects.csiSnapshotController
Expand Down Expand Up @@ -774,3 +815,12 @@ func sanitizeCSISnapshotController(instance *opv1.CSISnapshotController) {
return instance.Status.Conditions[i].Type < instance.Status.Conditions[j].Type
})
}

func makeNode(suffix string, labels map[string]string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node-%s", suffix),
Labels: labels,
},
}
}
5 changes: 4 additions & 1 deletion pkg/operator/starter.go
Expand Up @@ -67,6 +67,7 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller

operator := NewCSISnapshotControllerOperator(
*operatorClient,
ctrlctx.KubeNamespacedInformerFactory.Core().V1().Nodes(),
ctrlctx.APIExtInformerFactory.Apiextensions().V1().CustomResourceDefinitions(),
ctrlctx.ClientBuilder.APIExtClientOrDie(targetName),
ctrlctx.KubeNamespacedInformerFactory.Apps().V1().Deployments(),
Expand All @@ -78,7 +79,9 @@ func RunOperator(ctx context.Context, controllerConfig *controllercmd.Controller
os.Getenv(operandImageEnvName),
)

webhookOperator := webhookdeployment.NewCSISnapshotWebhookController(*operatorClient,
webhookOperator := webhookdeployment.NewCSISnapshotWebhookController(
*operatorClient,
ctrlctx.KubeNamespacedInformerFactory.Core().V1().Nodes(),
ctrlctx.KubeNamespacedInformerFactory.Apps().V1().Deployments(),
ctrlctx.KubeNamespacedInformerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations(),
kubeClient,
Expand Down
21 changes: 17 additions & 4 deletions pkg/operator/sync.go
Expand Up @@ -8,6 +8,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"

operatorv1 "github.com/openshift/api/operator/v1"
Expand Down Expand Up @@ -108,9 +109,12 @@ func (c *csiSnapshotOperator) checkAlphaCRDs() error {
}

func (c *csiSnapshotOperator) syncDeployment(instance *operatorv1.CSISnapshotController) (*appsv1.Deployment, error) {
deploy := c.getExpectedDeployment(instance)
deploy, err := c.getExpectedDeployment(instance)
if err != nil {
return nil, err
}

deploy, _, err := resourceapply.ApplyDeployment(
deploy, _, err = resourceapply.ApplyDeployment(
c.kubeClient.AppsV1(),
c.eventRecorder,
deploy,
Expand All @@ -121,7 +125,7 @@ func (c *csiSnapshotOperator) syncDeployment(instance *operatorv1.CSISnapshotCon
return deploy, nil
}

func (c *csiSnapshotOperator) getExpectedDeployment(instance *operatorv1.CSISnapshotController) *appsv1.Deployment {
func (c *csiSnapshotOperator) getExpectedDeployment(instance *operatorv1.CSISnapshotController) (*appsv1.Deployment, error) {
deployment := resourceread.ReadDeploymentV1OrDie(generated.MustAsset(deployment))
deployment.Spec.Template.Spec.Containers[0].Image = c.csiSnapshotControllerImage

Expand All @@ -132,7 +136,16 @@ func (c *csiSnapshotOperator) getExpectedDeployment(instance *operatorv1.CSISnap
}
}

return deployment
// Set the number of replicas according to the number of nodes available
nodeSelector := deployment.Spec.Template.Spec.NodeSelector
nodes, err := c.nodeLister.List(labels.SelectorFromSet(nodeSelector))
if err != nil {
return nil, err
}
replicas := int32(len(nodes))
deployment.Spec.Replicas = &replicas

return deployment, nil
}

func getLogLevel(logLevel operatorv1.LogLevel) int {
Expand Down

0 comments on commit a54dfec

Please sign in to comment.