Skip to content

Commit

Permalink
ceph: update PodDisruptionBudget from v1beta1 to v1
Browse files Browse the repository at this point in the history
This commit update the PodDisruptionBudget policy to use version v1
Updated to policy/v1 as policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+

Closes: #7917
Signed-off-by: parth-gr <paarora@redhat.com>
  • Loading branch information
parth-gr committed Jul 22, 2021
1 parent 5f30d53 commit 2333fdb
Show file tree
Hide file tree
Showing 10 changed files with 449 additions and 150 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/crash/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (r *ReconcileNode) deleteCrashCollector(deployment appsv1.Deployment) error
func (r *ReconcileNode) reconcileCrashRetention(namespace string, cephCluster cephv1.CephCluster, cephVersion *cephver.CephVersion) error {
k8sVersion, err := k8sutil.GetK8SVersion(r.context.Clientset)
if err != nil {
return errors.Wrap(err, "failed to fetch get k8s version")
return errors.Wrap(err, "failed to fetch k8s version")
}
useCronJobV1 := k8sVersion.AtLeast(version.MustParseSemantic(minVersionForCronV1))

Expand Down
38 changes: 29 additions & 9 deletions pkg/operator/ceph/cluster/mon/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pkg/errors"
"github.com/rook/rook/pkg/operator/k8sutil"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -52,17 +53,36 @@ func (c *Cluster) reconcileMonPDB() error {
}

func (c *Cluster) createOrUpdateMonPDB(maxUnavailable int32) (controllerutil.OperationResult, error) {
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: monPDBName,
Namespace: c.Namespace,
}}
usePDBV1Beta1, err := k8sutil.UsePDBV1Beta1Version(c.context.Clientset)
if err != nil {
return controllerutil.OperationResultNone, errors.Wrapf(err, "failed to fetch pdb version")
}
objectMeta := metav1.ObjectMeta{
Name: monPDBName,
Namespace: c.Namespace,
}
selector := &metav1.LabelSelector{
MatchLabels: map[string]string{k8sutil.AppAttr: AppName},
}
if usePDBV1Beta1 {
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: objectMeta}

mutateFunc := func() error {
pdb.Spec = policyv1beta1.PodDisruptionBudgetSpec{
Selector: selector,
MaxUnavailable: &intstr.IntOrString{IntVal: maxUnavailable},
}
return nil
}
return controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc)
}
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: objectMeta}

mutateFunc := func() error {
pdb.Spec = policyv1beta1.PodDisruptionBudgetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{k8sutil.AppAttr: AppName},
},
pdb.Spec = policyv1.PodDisruptionBudgetSpec{
Selector: selector,
MaxUnavailable: &intstr.IntOrString{IntVal: maxUnavailable},
}
return nil
Expand Down
97 changes: 75 additions & 22 deletions pkg/operator/ceph/cluster/mon/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/clusterd"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
"github.com/rook/rook/pkg/operator/test"
"github.com/stretchr/testify/assert"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,17 +38,18 @@ const (
mockNamespace = "test-ns"
)

func createFakeCluster(t *testing.T, cephClusterObj *cephv1.CephCluster) *Cluster {
func createFakeCluster(t *testing.T, cephClusterObj *cephv1.CephCluster, k8sVersion string) *Cluster {
ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef()
scheme := scheme.Scheme
err := policyv1beta1.AddToScheme(scheme)
if err != nil {
assert.Fail(t, "failed to build scheme")
}
cl := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()

c := New(&clusterd.Context{Client: cl}, mockNamespace, cephClusterObj.Spec, ownerInfo, &sync.Mutex{})
err := policyv1.AddToScheme(scheme)
assert.NoError(t, err)
err = policyv1beta1.AddToScheme(scheme)
assert.NoError(t, err)

cl := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()
clientset := test.New(t, 3)
c := New(&clusterd.Context{Client: cl, Clientset: clientset}, mockNamespace, cephClusterObj.Spec, ownerInfo, &sync.Mutex{})
test.SetFakeKubernetesVersion(clientset, k8sVersion)
return c
}

Expand Down Expand Up @@ -105,17 +108,31 @@ func TestReconcileMonPDB(t *testing.T) {
}

for _, tc := range testCases {
c := createFakeCluster(t, tc.cephCluster)
// check for PDBV1Beta1 version
c := createFakeCluster(t, tc.cephCluster, "v1.20.0")
err := c.reconcileMonPDB()
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), types.NamespacedName{Name: monPDBName, Namespace: mockNamespace}, existingPDB)
existingPDBV1Beta1 := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), types.NamespacedName{Name: monPDBName, Namespace: mockNamespace}, existingPDBV1Beta1)
if tc.errorExpected {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
assert.Equalf(t, tc.expectedMaxUnAvailable, int32(existingPDB.Spec.MaxUnavailable.IntValue()), "[%s]: incorrect minAvailable count in pdb", tc.name)
assert.Equalf(t, tc.expectedMaxUnAvailable, int32(existingPDBV1Beta1.Spec.MaxUnavailable.IntValue()), "[%s]: incorrect minAvailable count in pdb", tc.name)

// check for PDBV1 version
c = createFakeCluster(t, tc.cephCluster, "v1.21.0")
err = c.reconcileMonPDB()
assert.NoError(t, err)
existingPDBV1 := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), types.NamespacedName{Name: monPDBName, Namespace: mockNamespace}, existingPDBV1)
if tc.errorExpected {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
assert.Equalf(t, tc.expectedMaxUnAvailable, int32(existingPDBV1.Spec.MaxUnavailable.IntValue()), "[%s]: incorrect minAvailable count in pdb", tc.name)

// reconcile mon PDB again to test update
err = c.reconcileMonPDB()
Expand All @@ -124,41 +141,77 @@ func TestReconcileMonPDB(t *testing.T) {
}

func TestAllowMonDrain(t *testing.T) {
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
// check for PDBV1 version
c := createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.21.0")
t.Run("allow mon drain for K8s version v1.21.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 1
err := c.allowMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDBV1 := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1)
assert.NoError(t, err)
assert.Equal(t, 1, int(existingPDBV1.Spec.MaxUnavailable.IntValue()))
})
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
t.Run("allow mon drain", func(t *testing.T) {
// check for PDBV1Beta1 version
c = createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.20.0")
t.Run("allow mon drain for K8s version v1.20.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 1
err := c.allowMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDB)
existingPDBV1Beta1 := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1Beta1)
assert.NoError(t, err)
assert.Equal(t, 1, int(existingPDB.Spec.MaxUnavailable.IntValue()))
assert.Equal(t, 1, int(existingPDBV1Beta1.Spec.MaxUnavailable.IntValue()))
})
}

func TestBlockMonDrain(t *testing.T) {
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
// check for PDBV1 version
c := createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.21.0")
t.Run("block mon drain for K8s version v1.21.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 0
err := c.blockMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDBV1 := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1)
assert.NoError(t, err)
assert.Equal(t, 0, int(existingPDBV1.Spec.MaxUnavailable.IntValue()))
})
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
t.Run("block mon drain", func(t *testing.T) {
// check for PDBV1Beta1 version
c = createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.20.0")
t.Run("block mon drain for K8s version v1.20.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 0
err := c.blockMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDB)
existingPDBV1Beta1 := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1Beta1)
assert.NoError(t, err)
assert.Equal(t, 0, int(existingPDB.Spec.MaxUnavailable.IntValue()))
assert.Equal(t, 0, int(existingPDBV1Beta1.Spec.MaxUnavailable.IntValue()))
})
}
74 changes: 55 additions & 19 deletions pkg/operator/ceph/disruption/clusterdisruption/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (

"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/operator/k8sutil"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -87,6 +89,11 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
return err
}

usePDBV1Beta1, err := k8sutil.UsePDBV1Beta1Version(reconcileClusterDisruption.context.ClusterdContext.Clientset)
if err != nil {
return errors.Wrapf(err, "failed to fetch pdb version")
}

// Only reconcile for PDB update event when allowed disruptions for the main OSD PDB is 0.
// This means that one of the OSD is down due to node drain or any other reason
pdbPredicate := predicate.Funcs{
Expand All @@ -95,7 +102,14 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1beta1.PodDisruptionBudget)
if usePDBV1Beta1 {
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1beta1.PodDisruptionBudget)
if !ok {
return false
}
return pdb.Name == osdPDBAppName && pdb.Status.DisruptionsAllowed == 0
}
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1.PodDisruptionBudget)
if !ok {
return false
}
Expand All @@ -108,24 +122,46 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
}

// Watch for main PodDisruptionBudget and enqueue the CephCluster in the namespace
err = c.Watch(
&source.Kind{Type: &policyv1beta1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1beta1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Errorf("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: namespace}}
return []reconcile.Request{req}
}),
),
pdbPredicate,
)
if err != nil {
return err
if usePDBV1Beta1 {
err = c.Watch(
&source.Kind{Type: &policyv1beta1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1beta1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Error("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: namespace}}
return []reconcile.Request{req}
}),
),
pdbPredicate,
)
if err != nil {
return err
}
} else {
err = c.Watch(
&source.Kind{Type: &policyv1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Error("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: namespace}}
return []reconcile.Request{req}
}),
),
pdbPredicate,
)
if err != nil {
return err
}
}

// enqueues with an empty name that is populated by the reconciler.
Expand Down

0 comments on commit 2333fdb

Please sign in to comment.