Skip to content

Commit

Permalink
ceph: update PodDisruptionBudget from v1beta1 to v1
Browse files Browse the repository at this point in the history
This commit update the PodDisruptionBudget policy to use version v1
Updated to policy/v1 as policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+

Closes: #7917

Signed-off-by: parth-gr <paarora@redhat.com>
  • Loading branch information
parth-gr committed Jun 29, 2021
1 parent e1a6333 commit c89fd4b
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 158 deletions.
51 changes: 38 additions & 13 deletions pkg/operator/ceph/cluster/mon/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (

"github.com/pkg/errors"
"github.com/rook/rook/pkg/operator/k8sutil"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/version"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

Expand Down Expand Up @@ -52,22 +54,45 @@ func (c *Cluster) reconcileMonPDB() error {
}

func (c *Cluster) createOrUpdateMonPDB(maxUnavailable int32) (controllerutil.OperationResult, error) {
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: monPDBName,
Namespace: c.Namespace,
}}
k8sVersion, err := k8sutil.GetK8SVersion(c.context.Clientset)
if err != nil {
return controllerutil.OperationResultNone, errors.Wrap(err, "failed to fetch get k8s version")
}
// minimum k8s version required for v1 PodDisruptionBudget is 'v1.21.0'. Apply v1 if k8s version is at least 'v1.21.0', else apply v1beta1 cronPodDisruptionBudgetJob.
minVersionForPDBV1 := "1.21.0"
usePDBV1 := k8sVersion.AtLeast(version.MustParseSemantic(minVersionForPDBV1))
objectMeta := metav1.ObjectMeta{
Name: monPDBName,
Namespace: c.Namespace,
}
selector := &metav1.LabelSelector{
MatchLabels: map[string]string{k8sutil.AppAttr: AppName},
}
if usePDBV1 {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: objectMeta}

mutateFunc := func() error {
pdb.Spec = policyv1beta1.PodDisruptionBudgetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{k8sutil.AppAttr: AppName},
},
MaxUnavailable: &intstr.IntOrString{IntVal: maxUnavailable},
mutateFunc := func() error {
pdb.Spec = policyv1.PodDisruptionBudgetSpec{
Selector: selector,
MaxUnavailable: &intstr.IntOrString{IntVal: maxUnavailable},
}
return nil
}
return nil
return controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc)
} else {
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: objectMeta}

mutateFunc := func() error {
pdb.Spec = policyv1beta1.PodDisruptionBudgetSpec{
Selector: selector,
MaxUnavailable: &intstr.IntOrString{IntVal: maxUnavailable},
}
return nil
}
return controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc)
}
return controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc)
}

// blockMonDrain makes MaxUnavailable in mon PDB to 0 to block any voluntary mon drains
Expand Down
10 changes: 5 additions & 5 deletions pkg/operator/ceph/cluster/mon/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/rook/rook/pkg/clusterd"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
"github.com/stretchr/testify/assert"
policyv1beta1 "k8s.io/api/policy/v1beta1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -39,7 +39,7 @@ const (
func createFakeCluster(t *testing.T, cephClusterObj *cephv1.CephCluster) *Cluster {
ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef()
scheme := scheme.Scheme
err := policyv1beta1.AddToScheme(scheme)
err := policyv1.AddToScheme(scheme)
if err != nil {
assert.Fail(t, "failed to build scheme")
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestReconcileMonPDB(t *testing.T) {
c := createFakeCluster(t, tc.cephCluster)
err := c.reconcileMonPDB()
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
existingPDB := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), types.NamespacedName{Name: monPDBName, Namespace: mockNamespace}, existingPDB)
if tc.errorExpected {
assert.Error(t, err)
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestAllowMonDrain(t *testing.T) {
// change MaxUnavailable mon PDB to 1
err := c.allowMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
existingPDB := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDB)
assert.NoError(t, err)
assert.Equal(t, 1, int(existingPDB.Spec.MaxUnavailable.IntValue()))
Expand All @@ -156,7 +156,7 @@ func TestBlockMonDrain(t *testing.T) {
// change MaxUnavailable mon PDB to 0
err := c.blockMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
existingPDB := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDB)
assert.NoError(t, err)
assert.Equal(t, 0, int(existingPDB.Spec.MaxUnavailable.IntValue()))
Expand Down
81 changes: 58 additions & 23 deletions pkg/operator/ceph/disruption/clusterdisruption/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -87,6 +88,11 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
return err
}

usePDBV1, err := usePDBV1Version(reconcileClusterDisruption)
if err != nil {
return err
}

// Only reconcile for PDB update event when allowed disruptions for the main OSD PDB is 0.
// This means that one of the OSD is down due to node drain or any other reason
pdbPredicate := predicate.Funcs{
Expand All @@ -95,11 +101,19 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1beta1.PodDisruptionBudget)
if !ok {
return false
if usePDBV1 {
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1.PodDisruptionBudget)
if !ok {
return false
}
return pdb.Name == osdPDBAppName && pdb.Status.DisruptionsAllowed == 0
} else {
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1beta1.PodDisruptionBudget)
if !ok {
return false
}
return pdb.Name == osdPDBAppName && pdb.Status.DisruptionsAllowed == 0
}
return pdb.Name == osdPDBAppName && pdb.Status.DisruptionsAllowed == 0
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Do not reconcile when PDB is deleted
Expand All @@ -108,26 +122,47 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
}

// Watch for main PodDisruptionBudget and enqueue the CephCluster in the namespace
err = c.Watch(
&source.Kind{Type: &policyv1beta1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1beta1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Errorf("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: namespace}}
return []reconcile.Request{req}
}),
),
pdbPredicate,
)
if err != nil {
return err
if usePDBV1 {
err = c.Watch(
&source.Kind{Type: &policyv1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Errorf("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: namespace}}
return []reconcile.Request{req}
}),
),
pdbPredicate,
)
if err != nil {
return err
}
} else {
err = c.Watch(
&source.Kind{Type: &policyv1beta1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1beta1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Errorf("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: namespace}}
return []reconcile.Request{req}
}),
),
pdbPredicate,
)
if err != nil {
return err
}
}

// enqueues with an empty name that is populated by the reconciler.
// There is a one-per-namespace limit on CephClusters
enqueueByNamespace := handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
Expand Down

0 comments on commit c89fd4b

Please sign in to comment.