Skip to content

Commit

Permalink
ceph: update PodDisruptionBudget from v1beta1 to v1
Browse files Browse the repository at this point in the history
This commit update the PodDisruptionBudget policy to use version v1
Updated to policy/v1 as policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+

Closes: #7917
Signed-off-by: parth-gr <paarora@redhat.com>
  • Loading branch information
parth-gr committed Jul 9, 2021
1 parent 319828a commit 6c09e99
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 117 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/crash/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (r *ReconcileNode) deleteCrashCollector(deployment appsv1.Deployment) error
func (r *ReconcileNode) reconcileCrashRetention(namespace string, cephCluster cephv1.CephCluster, cephVersion *cephver.CephVersion) error {
k8sVersion, err := k8sutil.GetK8SVersion(r.context.Clientset)
if err != nil {
return errors.Wrap(err, "failed to fetch get k8s version")
return errors.Wrap(err, "failed to fetch k8s version")
}
useCronJobV1 := k8sVersion.AtLeast(version.MustParseSemantic(minVersionForCronV1))

Expand Down
51 changes: 44 additions & 7 deletions pkg/operator/ceph/cluster/mon/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (

"github.com/pkg/errors"
"github.com/rook/rook/pkg/operator/k8sutil"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

Expand Down Expand Up @@ -52,17 +55,36 @@ func (c *Cluster) reconcileMonPDB() error {
}

func (c *Cluster) createOrUpdateMonPDB(maxUnavailable int32) (controllerutil.OperationResult, error) {
usePDBV1, err := UsePDBV1Version(c.context.Clientset)
if err != nil {
return controllerutil.OperationResultNone, err
}
objectMeta := metav1.ObjectMeta{
Name: monPDBName,
Namespace: c.Namespace,
}
selector := &metav1.LabelSelector{
MatchLabels: map[string]string{k8sutil.AppAttr: AppName},
}
if usePDBV1 {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: objectMeta}

mutateFunc := func() error {
pdb.Spec = policyv1.PodDisruptionBudgetSpec{
Selector: selector,
MaxUnavailable: &intstr.IntOrString{IntVal: maxUnavailable},
}
return nil
}
return controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc)
}
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: monPDBName,
Namespace: c.Namespace,
}}
ObjectMeta: objectMeta}

mutateFunc := func() error {
pdb.Spec = policyv1beta1.PodDisruptionBudgetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{k8sutil.AppAttr: AppName},
},
Selector: selector,
MaxUnavailable: &intstr.IntOrString{IntVal: maxUnavailable},
}
return nil
Expand Down Expand Up @@ -97,3 +119,18 @@ func (c *Cluster) allowMonDrain(request types.NamespacedName) error {
}
return nil
}

func UsePDBV1Version(Clientset kubernetes.Interface) (bool, error) {
k8sVersion, err := k8sutil.GetK8SVersion(Clientset)
if err != nil {
return false, errors.Wrap(err, "failed to fetch k8s version")
}
logger.Debugf("kubernetes version fetched %v", k8sVersion)
// minimum k8s version required for v1 PodDisruptionBudget is 'v1.21.0'. Apply v1 if k8s version is at least 'v1.21.0', else apply v1beta1 PodDisruptionBudget.
minVersionForPDBV1 := "1.21.0"
usePDBV1 := k8sVersion.AtLeast(version.MustParseSemantic(minVersionForPDBV1))
if usePDBV1 {
return true, nil
}
return false, nil
}
97 changes: 75 additions & 22 deletions pkg/operator/ceph/cluster/mon/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/clusterd"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
"github.com/rook/rook/pkg/operator/test"
"github.com/stretchr/testify/assert"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,17 +38,18 @@ const (
mockNamespace = "test-ns"
)

func createFakeCluster(t *testing.T, cephClusterObj *cephv1.CephCluster) *Cluster {
func createFakeCluster(t *testing.T, cephClusterObj *cephv1.CephCluster, k8sVersion string) *Cluster {
ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef()
scheme := scheme.Scheme
err := policyv1beta1.AddToScheme(scheme)
if err != nil {
assert.Fail(t, "failed to build scheme")
}
cl := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()

c := New(&clusterd.Context{Client: cl}, mockNamespace, cephClusterObj.Spec, ownerInfo, &sync.Mutex{})
err := policyv1.AddToScheme(scheme)
assert.NoError(t, err)
err = policyv1beta1.AddToScheme(scheme)
assert.NoError(t, err)

cl := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()
clientset := test.New(t, 3)
c := New(&clusterd.Context{Client: cl, Clientset: clientset}, mockNamespace, cephClusterObj.Spec, ownerInfo, &sync.Mutex{})
test.SetFakeKubernetesVersion(clientset, k8sVersion)
return c
}

Expand Down Expand Up @@ -105,17 +108,31 @@ func TestReconcileMonPDB(t *testing.T) {
}

for _, tc := range testCases {
c := createFakeCluster(t, tc.cephCluster)
// check for PDBV1Beta1 version
c := createFakeCluster(t, tc.cephCluster, "v1.20.0")
err := c.reconcileMonPDB()
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), types.NamespacedName{Name: monPDBName, Namespace: mockNamespace}, existingPDB)
existingPDBV1Beta1 := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), types.NamespacedName{Name: monPDBName, Namespace: mockNamespace}, existingPDBV1Beta1)
if tc.errorExpected {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
assert.Equalf(t, tc.expectedMaxUnAvailable, int32(existingPDB.Spec.MaxUnavailable.IntValue()), "[%s]: incorrect minAvailable count in pdb", tc.name)
assert.Equalf(t, tc.expectedMaxUnAvailable, int32(existingPDBV1Beta1.Spec.MaxUnavailable.IntValue()), "[%s]: incorrect minAvailable count in pdb", tc.name)

// check for PDBV1 version
c = createFakeCluster(t, tc.cephCluster, "v1.21.0")
err = c.reconcileMonPDB()
assert.NoError(t, err)
existingPDBV1 := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), types.NamespacedName{Name: monPDBName, Namespace: mockNamespace}, existingPDBV1)
if tc.errorExpected {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
assert.Equalf(t, tc.expectedMaxUnAvailable, int32(existingPDBV1.Spec.MaxUnavailable.IntValue()), "[%s]: incorrect minAvailable count in pdb", tc.name)

// reconcile mon PDB again to test update
err = c.reconcileMonPDB()
Expand All @@ -124,41 +141,77 @@ func TestReconcileMonPDB(t *testing.T) {
}

func TestAllowMonDrain(t *testing.T) {
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
// check for PDBV1 version
c := createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.21.0")
t.Run("allow mon drain for K8s version v1.21.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 1
err := c.allowMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDBV1 := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1)
assert.NoError(t, err)
assert.Equal(t, 1, int(existingPDBV1.Spec.MaxUnavailable.IntValue()))
})
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
t.Run("allow mon drain", func(t *testing.T) {
// check for PDBV1Beta1 version
c = createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.20.0")
t.Run("allow mon drain for K8s version v1.20.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 1
err := c.allowMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDB)
existingPDBV1Beta1 := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1Beta1)
assert.NoError(t, err)
assert.Equal(t, 1, int(existingPDB.Spec.MaxUnavailable.IntValue()))
assert.Equal(t, 1, int(existingPDBV1Beta1.Spec.MaxUnavailable.IntValue()))
})
}

func TestBlockMonDrain(t *testing.T) {
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
// check for PDBV1 version
c := createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.21.0")
t.Run("block mon drain for K8s version v1.21.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 0
err := c.blockMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDBV1 := &policyv1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1)
assert.NoError(t, err)
assert.Equal(t, 0, int(existingPDBV1.Spec.MaxUnavailable.IntValue()))
})
fakeNamespaceName := types.NamespacedName{Namespace: mockNamespace, Name: monPDBName}
t.Run("block mon drain", func(t *testing.T) {
// check for PDBV1Beta1 version
c = createFakeCluster(t, &cephv1.CephCluster{
Spec: cephv1.ClusterSpec{
DisruptionManagement: cephv1.DisruptionManagementSpec{
ManagePodBudgets: true,
},
},
}, "v1.20.0")
t.Run("block mon drain for K8s version v1.20.0", func(t *testing.T) {
// change MaxUnavailable mon PDB to 0
err := c.blockMonDrain(fakeNamespaceName)
assert.NoError(t, err)
existingPDB := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDB)
existingPDBV1Beta1 := &policyv1beta1.PodDisruptionBudget{}
err = c.context.Client.Get(context.TODO(), fakeNamespaceName, existingPDBV1Beta1)
assert.NoError(t, err)
assert.Equal(t, 0, int(existingPDB.Spec.MaxUnavailable.IntValue()))
assert.Equal(t, 0, int(existingPDBV1Beta1.Spec.MaxUnavailable.IntValue()))
})
}
37 changes: 36 additions & 1 deletion pkg/operator/ceph/disruption/clusterdisruption/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (

"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
mon "github.com/rook/rook/pkg/operator/ceph/cluster/mon"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -87,6 +89,11 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
return err
}

usePDBV1, err := mon.UsePDBV1Version(reconcileClusterDisruption.context.ClusterdContext.Clientset)
if err != nil {
return err
}

// Only reconcile for PDB update event when allowed disruptions for the main OSD PDB is 0.
// This means that one of the OSD is down due to node drain or any other reason
pdbPredicate := predicate.Funcs{
Expand All @@ -95,6 +102,13 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
if usePDBV1 {
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1.PodDisruptionBudget)
if !ok {
return false
}
return pdb.Name == osdPDBAppName && pdb.Status.DisruptionsAllowed == 0
}
pdb, ok := e.ObjectNew.DeepCopyObject().(*policyv1beta1.PodDisruptionBudget)
if !ok {
return false
Expand All @@ -108,13 +122,34 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {
}

// Watch for main PodDisruptionBudget and enqueue the CephCluster in the namespace
if usePDBV1 {
err = c.Watch(
&source.Kind{Type: &policyv1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Error("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: namespace}}
return []reconcile.Request{req}
}),
),
pdbPredicate,
)
if err != nil {
return err
}
}
err = c.Watch(
&source.Kind{Type: &policyv1beta1.PodDisruptionBudget{}},
handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
pdb, ok := obj.(*policyv1beta1.PodDisruptionBudget)
if !ok {
// Not a pdb, returning empty
logger.Errorf("PDB handler received non-PDB")
logger.Error("PDB handler received non-PDB")
return []reconcile.Request{}
}
namespace := pdb.GetNamespace()
Expand Down

0 comments on commit 6c09e99

Please sign in to comment.