Skip to content

Commit

Permalink
Add WIP unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
davidz627 committed Oct 9, 2019
1 parent 37f6972 commit b936885
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 22 deletions.
10 changes: 4 additions & 6 deletions pkg/attacher/lister.go
Expand Up @@ -25,12 +25,10 @@ import (
"google.golang.org/grpc"
)

type publishedNodeList []string

// Lister implements list operations against a remote CSI driver.
type VolumeLister interface {
// TODO Comment
ListVolumes(ctx context.Context) (map[string]publishedNodeList, error)
// TODO(dyzz) Comment
ListVolumes(ctx context.Context) (map[string][]string, error)
}

type volumeLister struct {
Expand All @@ -47,10 +45,10 @@ func NewVolumeLister(conn *grpc.ClientConn) VolumeLister {
}
}

func (a *volumeLister) ListVolumes(ctx context.Context) (map[string](publishedNodeList), error) {
func (a *volumeLister) ListVolumes(ctx context.Context) (map[string]([]string), error) {
client := csi.NewControllerClient(a.conn)

p := map[string]publishedNodeList{}
p := map[string][]string{}

tok := ""
for {
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/csi_handler.go
Expand Up @@ -115,7 +115,12 @@ func (h *csiHandler) ReconcileVA() {
}

for _, va := range vas {
nodeID := va.Annotations[vaNodeIDAnnotation]
nodeID, ok := va.Annotations[vaNodeIDAnnotation]
if !ok {
klog.Errorf("Failed to find node ID in VolumeAttachment annotation")
continue
}
// TODO(dyzz): Maybe all of these should just be an "error" then continue.
pvSpec, err := h.getPVSpec(va)
if err != nil {
klog.Fatalf("Failed to get PV Spec: %v", err)
Expand All @@ -132,12 +137,10 @@ func (h *csiHandler) ReconcileVA() {
}
attachedStatus := va.Status.Attached

klog.Infof("original handle: %v, node ID: %v, driver: %v", volumeHandle, nodeID, source.Driver)
volumeHandle, err = h.translator.RepairVolumeHandle(source.Driver, volumeHandle, nodeID)
if err != nil {
klog.Fatalf("Failed to repair volume handle: %v", err)
}
klog.Infof("reconstructed handle: %v", volumeHandle)

// If ListVolumes Attached Status is different, update.
found := false
Expand Down
79 changes: 72 additions & 7 deletions pkg/controller/csi_handler_test.go
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
core "k8s.io/client-go/testing"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog"
)

Expand All @@ -51,35 +50,35 @@ var (

var timeout = 10 * time.Millisecond

func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler {
func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher, lister attacher.VolumeLister) Handler {
return NewCSIHandler(
client,
testAttacherName,
csi,
nil, // TODO(dyzz) write unit tests and fix current ones
lister,
informerFactory.Core().V1().PersistentVolumes().Lister(),
informerFactory.Core().V1().Nodes().Lister(),
informerFactory.Storage().V1beta1().CSINodes().Lister(),
informerFactory.Storage().V1beta1().VolumeAttachments().Lister(),
&timeout,
true, /* supports PUBLISH_READONLY */
csitrans.New(),
fakeInTreeToCSITranslator{},
)
}

func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler {
func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher, lister attacher.VolumeLister) Handler {
return NewCSIHandler(
client,
testAttacherName,
csi,
nil,
lister,
informerFactory.Core().V1().PersistentVolumes().Lister(),
informerFactory.Core().V1().Nodes().Lister(),
informerFactory.Storage().V1beta1().CSINodes().Lister(),
informerFactory.Storage().V1beta1().VolumeAttachments().Lister(),
&timeout,
false, /* does not support PUBLISH_READONLY */
csitrans.New(),
fakeInTreeToCSITranslator{},
)
}

Expand Down Expand Up @@ -1266,6 +1265,72 @@ func TestCSIHandler(t *testing.T) {
runTests(t, csiHandlerFactory, tests)
}

func TestCSIHandlerReconcileVA(t *testing.T) {
vaGroupResourceVersion := schema.GroupVersionResource{
Group: storage.GroupName,
Version: "v1beta1",
Resource: "volumeattachments",
}
nID := map[string]string{
vaNodeIDAnnotation: testNodeName,
}

tests := []testCase{
{
name: "va attached actual state not attached",
initialObjects: []runtime.Object{
va(true /*attached*/, "" /*finalizer*/, nID /*annotations*/),
pv(),
},
listerResponse: map[string][]string{
// Intentionally empty
},
expectedActions: []core.Action{
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(true /*attached*/, "" /*finalizer*/, nil /* annotations */),
va(false /*attached*/, "", nil))),
},
},
{
name: "va attached actual state attached",
initialObjects: []runtime.Object{
va(true /*attached*/, "" /*finalizer*/, nID /*annotations*/),
pv(),
},
listerResponse: map[string][]string{
testVolumeHandle: []string{testNodeName},
},
expectedActions: []core.Action{
// Intentionally empty
},
},
{
name: "va not attached actual state attached",
initialObjects: []runtime.Object{
va(false /*attached*/, "" /*finalizer*/, nID /*annotations*/),
pv(),
},
listerResponse: map[string][]string{
testVolumeHandle: []string{testNodeName},
},
expectedActions: []core.Action{
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false /*attached*/, "" /*finalizer*/, nil /* annotations */),
va(true /*attached*/, "", nil))),
},
},
{
name: "no vas",
initialObjects: []runtime.Object{},
listerResponse: map[string][]string{
testVolumeHandle: []string{testNodeName},
},
expectedActions: []core.Action{},
},
}
runTests(t, csiHandlerFactory, tests)
}

func TestCSIHandlerReadOnly(t *testing.T) {
vaGroupResourceVersion := schema.GroupVersionResource{
Group: storage.GroupName,
Expand Down
40 changes: 36 additions & 4 deletions pkg/controller/framework_test.go
Expand Up @@ -30,7 +30,8 @@ import (
"k8s.io/klog"

"encoding/json"
"k8s.io/api/core/v1"

v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -39,6 +40,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"
csitrans "k8s.io/csi-translation-lib"
)

// This is an unit test framework. It is heavily inspired by serviceaccount
Expand Down Expand Up @@ -72,6 +74,8 @@ type testCase struct {
expectedActions []core.Action
// List of expected CSI calls
expectedCSICalls []csiCall
// Expected lister response
listerResponse map[string][]string
// Function to perform additional checks after the test finishes
additionalCheck func(t *testing.T, test testCase)
}
Expand Down Expand Up @@ -100,7 +104,7 @@ type csiCall struct {
delay time.Duration
}

type handlerFactory func(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler
type handlerFactory func(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher, lister attacher.VolumeLister) Handler

func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
for _, test := range tests {
Expand Down Expand Up @@ -176,8 +180,9 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {

// Construct controller
csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls}
handler := handlerFactory(client, informers, csiConnection)
ctrl := NewCSIAttachController(client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter())
lister := &fakeLister{t: t, publishedNodes: test.listerResponse}
handler := handlerFactory(client, informers, csiConnection, lister)
ctrl := NewCSIAttachController(client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter(), test.listerResponse != nil)

// Start the test by enqueueing the right event
if test.addedVA != nil {
Expand Down Expand Up @@ -213,6 +218,10 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
// There is still some work in the queue, process it now
continue
}
if test.listerResponse != nil {
// Reconcile VA with the actual state
ctrl.reconcileVA()
}
currentActionCount := len(client.Actions())
if currentActionCount < len(test.expectedActions) {
if lastReportedActionCount < currentActionCount {
Expand Down Expand Up @@ -380,6 +389,15 @@ func vaWithDetachError(va *storage.VolumeAttachment, message string) *storage.Vo
return va
}

type fakeLister struct {
t *testing.T
publishedNodes map[string][]string
}

func (l *fakeLister) ListVolumes(ctx context.Context) (map[string][]string, error) {
return l.publishedNodes, nil
}

// Fake CSIConnection implementation that check that Attach/Detach is called
// with the right parameters and it returns proper error code and metadata.
type fakeCSIConnection struct {
Expand Down Expand Up @@ -494,3 +512,17 @@ func (f *fakeCSIConnection) Close() error {
func (f *fakeCSIConnection) Probe(timeout time.Duration) error {
return nil
}

// TODO: Remove hardcoding for GCE tests and make more general
type fakeInTreeToCSITranslator struct{}

func (f fakeInTreeToCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
t := csitrans.New()
return t.TranslateInTreePVToCSI(pv)
}
func (f fakeInTreeToCSITranslator) IsPVMigratable(pv *v1.PersistentVolume) bool {
return pv.Spec.GCEPersistentDisk != nil
}
func (f fakeInTreeToCSITranslator) RepairVolumeHandle(pluginName, volumeHandle, nodeID string) (string, error) {
return volumeHandle, nil
}
2 changes: 1 addition & 1 deletion pkg/controller/trivial_handler.go
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package controller

import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/trivial_handler_test.go
Expand Up @@ -33,7 +33,7 @@ import (
core "k8s.io/client-go/testing"
)

func trivialHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler {
func trivialHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher, lister attacher.VolumeLister) Handler {
return NewTrivialHandler(client)
}

Expand Down

0 comments on commit b936885

Please sign in to comment.