Skip to content

Commit

Permalink
PWX-29714 Controller to reconcile PortworxDiag objects
Browse files Browse the repository at this point in the history
Signed-off-by: Piyush Nimbalkar <pnimbalkar@purestorage.com>
  • Loading branch information
piyush-nimbalkar committed Apr 14, 2023
1 parent 158207f commit c2a317c
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 10 deletions.
33 changes: 31 additions & 2 deletions cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
_ "github.com/libopenstorage/operator/drivers/storage/portworx"
"github.com/libopenstorage/operator/pkg/apis"
"github.com/libopenstorage/operator/pkg/controller/csr"
"github.com/libopenstorage/operator/pkg/controller/portworxdiag"
"github.com/libopenstorage/operator/pkg/controller/storagecluster"
"github.com/libopenstorage/operator/pkg/controller/storagenode"
_ "github.com/libopenstorage/operator/pkg/log"
Expand Down Expand Up @@ -49,6 +50,7 @@ const (
flagRateLimiterQPS = "rate-limiter-qps"
flagRateLimiterBurst = "rate-limiter-burst"
flagEnableProfiling = "pprof"
flagEnableDiagController = "diag-controller"
flagDisableCacheFor = "disable-cache-for"
defaultLockObjectName = "openstorage-operator"
defaultResyncPeriod = 30 * time.Second
Expand Down Expand Up @@ -109,6 +111,10 @@ func main() {
Name: flagEnableProfiling,
Usage: "Enable Portworx Operator profiling using pprof (default: false)",
},
cli.BoolFlag{
Name: flagEnableDiagController,
Usage: "Enable Portworx Diag Controller (default: false)",
},
cli.StringFlag{
Name: flagDisableCacheFor,
Usage: "Comma separated object types to disable from cache to reduce memory usage, for example \"Pod,ConfigMap,Deployment,PersistentVolume\"",
Expand Down Expand Up @@ -145,6 +151,8 @@ func run(c *cli.Context) {
}()
}

diagControllerEnabled := c.Bool(flagEnableDiagController)

config, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("Error getting cluster config: %v", err)
Expand Down Expand Up @@ -173,13 +181,22 @@ func run(c *cli.Context) {
storageClusterController := storagecluster.Controller{Driver: d}
err = storageClusterController.RegisterCRD()
if err != nil {
log.Fatalf("Error registering CRD's for StorageCluster controller: %v", err)
log.Fatalf("Error registering CRDs for StorageCluster controller: %v", err)
}

storageNodeController := storagenode.Controller{Driver: d}
err = storageNodeController.RegisterCRD()
if err != nil {
log.Fatalf("Error registering CRD's for StorageNode controller: %v", err)
log.Fatalf("Error registering CRDs for StorageNode controller: %v", err)
}

var diagController portworxdiag.Controller
if diagControllerEnabled {
diagController = portworxdiag.Controller{Driver: d}
err = diagController.RegisterCRD()
if err != nil {
log.Fatalf("Error registering CRDs for PortworxDiag controller: %v", err)
}
}

// TODO: Don't move createManager above register CRD section. This part will be refactored because of a bug,
Expand Down Expand Up @@ -252,6 +269,12 @@ func run(c *cli.Context) {
log.Fatalf("Error initializing certificate signing request controller: %v", err)
}

if diagControllerEnabled {
if err := diagController.Init(mgr); err != nil {
log.Fatalf("Error initializing portworx diag controller: %v", err)
}
}

if err := storageClusterController.StartWatch(); err != nil {
log.Fatalf("Error start watch on storage cluster controller: %v", err)
}
Expand All @@ -264,6 +287,12 @@ func run(c *cli.Context) {
log.Fatalf("Error starting watch on certificate signing request controller: %v", err)
}

if diagControllerEnabled {
if err := diagController.StartWatch(); err != nil {
log.Fatalf("Error starting watch on portworx diag controller: %v", err)
}
}

if c.BoolT(flagMigration) {
log.Info("Migration is enabled")
migrationHandler := migration.New(&storageClusterController)
Expand Down
7 changes: 0 additions & 7 deletions pkg/apis/portworx/v1/portworxdiag.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// PortworxDiagResourceName is name for "portworxdiag" resource.
PortworxDiagResourceName = "portworxdiag"
// PortworxDiagResourcePlural is plural for "portworxdiag" resource.
PortworxDiagResourcePlural = "portworxdiags"
)

// PortworxDiagSpec is the spec used to define a portworx diag.
type PortworxDiagSpec struct {
// Configuration for diags collection of the main Portworx component.
Expand Down
200 changes: 200 additions & 0 deletions pkg/controller/portworxdiag/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package portworxdiag

import (
"context"
"reflect"
"testing"
"time"

"github.com/golang/mock/gomock"
portworxv1 "github.com/libopenstorage/operator/pkg/apis/portworx/v1"
"github.com/libopenstorage/operator/pkg/mock"
apiextensionsops "github.com/portworx/sched-ops/k8s/apiextensions"
coreops "github.com/portworx/sched-ops/k8s/core"
"github.com/stretchr/testify/require"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
fakeextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
kversion "k8s.io/apimachinery/pkg/version"
fakediscovery "k8s.io/client-go/discovery/fake"
fakek8sclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/libopenstorage/operator/pkg/client/clientset/versioned/scheme"
testutil "github.com/libopenstorage/operator/pkg/util/test"
)

func TestInit(t *testing.T) {
mockCtrl := gomock.NewController(t)

fakeClient := fakek8sclient.NewSimpleClientset()
k8sClient := testutil.FakeK8sClient()
coreops.SetInstance(coreops.New(fakeClient))
recorder := record.NewFakeRecorder(10)

mgr := mock.NewMockManager(mockCtrl)
mgr.EXPECT().GetClient().Return(k8sClient).AnyTimes()
mgr.EXPECT().GetScheme().Return(scheme.Scheme).AnyTimes()
mgr.EXPECT().GetEventRecorderFor(gomock.Any()).Return(recorder).AnyTimes()
mgr.EXPECT().SetFields(gomock.Any()).Return(nil).AnyTimes()
mgr.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes()
mgr.EXPECT().GetLogger().Return(log.Log.WithName("test")).AnyTimes()

controller := Controller{
client: k8sClient,
recorder: recorder,
}
err := controller.Init(mgr)
require.NoError(t, err)

ctrl := mock.NewMockController(mockCtrl)
controller.ctrl = ctrl
ctrl.EXPECT().Watch(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

err = controller.StartWatch()
require.NoError(t, err)
}

func TestRegisterCRD(t *testing.T) {
fakeClient := fakek8sclient.NewSimpleClientset()
fakeClient.Discovery().(*fakediscovery.FakeDiscovery).FakedServerVersion = &kversion.Info{
GitVersion: "v1.23.0",
}
fakeExtClient := fakeextclient.NewSimpleClientset()
coreops.SetInstance(coreops.New(fakeClient))
apiextensionsops.SetInstance(apiextensionsops.New(fakeExtClient))
group := portworxv1.SchemeGroupVersion.Group
portworxDiagCRDName := "portworxdiags" + "." + group

// When the CRDs are created, just updated their status so the validation
// does not get stuck until timeout.
go func() {
err := testutil.ActivateCRDWhenCreated(fakeExtClient, portworxDiagCRDName)
require.NoError(t, err)
}()

controller := Controller{}

// Should fail if the CRD specs are not found
err := controller.RegisterCRD()
require.Error(t, err)

// Set the correct crd path
crdBaseDir = func() string {
return "../../../deploy/crds"
}
defer func() {
crdBaseDir = getCRDBasePath
}()

err = controller.RegisterCRD()
require.NoError(t, err)

crds, err := fakeExtClient.ApiextensionsV1().
CustomResourceDefinitions().
List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, crds.Items, 1)

pdCRD, err := fakeExtClient.ApiextensionsV1().
CustomResourceDefinitions().
Get(context.TODO(), portworxDiagCRDName, metav1.GetOptions{})
require.NoError(t, err)
require.Equal(t, portworxDiagCRDName, pdCRD.Name)
require.Equal(t, portworxv1.SchemeGroupVersion.Group, pdCRD.Spec.Group)
require.Len(t, pdCRD.Spec.Versions, 1)
require.Equal(t, portworxv1.SchemeGroupVersion.Version, pdCRD.Spec.Versions[0].Name)
require.True(t, pdCRD.Spec.Versions[0].Served)
require.True(t, pdCRD.Spec.Versions[0].Storage)
subresource := &apiextensionsv1.CustomResourceSubresources{
Status: &apiextensionsv1.CustomResourceSubresourceStatus{},
}
require.Equal(t, subresource, pdCRD.Spec.Versions[0].Subresources)
require.NotEmpty(t, pdCRD.Spec.Versions[0].Schema.OpenAPIV3Schema.Properties)
require.Equal(t, apiextensionsv1.NamespaceScoped, pdCRD.Spec.Scope)
require.Equal(t, "portworxdiag", pdCRD.Spec.Names.Singular)
require.Equal(t, "portworxdiags", pdCRD.Spec.Names.Plural)
require.Equal(t, reflect.TypeOf(portworxv1.PortworxDiag{}).Name(), pdCRD.Spec.Names.Kind)
require.Equal(t, reflect.TypeOf(portworxv1.PortworxDiagList{}).Name(), pdCRD.Spec.Names.ListKind)
require.Equal(t, []string{"pxdiag"}, pdCRD.Spec.Names.ShortNames)

// If CRDs are already present, then should update it
pdCRD.ResourceVersion = "1000"
_, err = fakeExtClient.ApiextensionsV1().
CustomResourceDefinitions().
Update(context.TODO(), pdCRD, metav1.UpdateOptions{})
require.NoError(t, err)

// The fake client overwrites the status in Update call which real client
// does not. This will keep the CRD activated so validation does not get stuck.
go func() {
err := keepCRDActivated(fakeExtClient, portworxDiagCRDName)
require.NoError(t, err)
}()

// If CRDs are already present, then should not fail
err = controller.RegisterCRD()
require.NoError(t, err)

crds, err = fakeExtClient.ApiextensionsV1().
CustomResourceDefinitions().
List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, crds.Items, 1)
require.Equal(t, portworxDiagCRDName, crds.Items[0].Name)

pdCRD, err = fakeExtClient.ApiextensionsV1().
CustomResourceDefinitions().
Get(context.TODO(), portworxDiagCRDName, metav1.GetOptions{})
require.NoError(t, err)
require.Equal(t, "1000", pdCRD.ResourceVersion)
}

func TestReconcileOfDeletedDiag(t *testing.T) {
k8sClient := testutil.FakeK8sClient()
recorder := record.NewFakeRecorder(1)
controller := Controller{
client: k8sClient,
recorder: recorder,
}

request := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: "does-not-exist",
Namespace: "test-ns",
},
}
result, err := controller.Reconcile(context.TODO(), request)
require.NoError(t, err)
require.Empty(t, result)
require.Len(t, recorder.Events, 0)
}

func keepCRDActivated(fakeClient *fakeextclient.Clientset, crdName string) error {
return wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
crd, err := fakeClient.ApiextensionsV1().
CustomResourceDefinitions().
Get(context.TODO(), crdName, metav1.GetOptions{})
if err != nil {
return false, err
}
if len(crd.Status.Conditions) == 0 {
crd.Status.Conditions = []apiextensionsv1.CustomResourceDefinitionCondition{{
Type: apiextensionsv1.Established,
Status: apiextensionsv1.ConditionTrue,
}}
_, err = fakeClient.ApiextensionsV1().
CustomResourceDefinitions().
UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
if err != nil {
return false, err
}
return true, nil
}
return false, nil
})
}
Loading

0 comments on commit c2a317c

Please sign in to comment.