Skip to content

Commit

Permalink
Add unit test for replica scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
JacieChao committed Jun 15, 2018
1 parent e30821d commit 51fc723
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 30 deletions.
2 changes: 1 addition & 1 deletion controller/instance_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *TestSuite) TestSyncStatusWithPod(c *C) {
h := newTestInstanceHandler(kubeInformerFactory, kubeClient)

status := &types.InstanceStatus{}
pod := newPod(tc.podPhase, TestPodName, TestNamespace)
pod := newPod(tc.podPhase, TestPodName, TestNamespace, TestNode1)
if pod != nil {
pod.Status.PodIP = tc.podIP
if tc.deleted {
Expand Down
22 changes: 4 additions & 18 deletions controller/replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,24 +371,10 @@ func (rc *ReplicaController) CreatePodSpec(obj interface{}) (*v1.Pod, error) {
// will pin it down to the same host because we have data on it
if r.Spec.NodeID != "" {
pod.Spec.NodeName = r.Spec.NodeID
} else {
pod.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 100,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
longhornReplicaKey: r.Spec.VolumeName,
},
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
}
}
// error out if NodeID and DataPath wasn't filled in scheduler
if r.Spec.NodeID == "" || r.Spec.DataPath == "" {
return nil, fmt.Errorf("There has no avaible node for replica %v", r)
}

if r.Spec.RestoreName != "" && r.Spec.RestoreFrom != "" {
Expand Down
36 changes: 25 additions & 11 deletions controller/replica_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newReplica(desireState, currentState types.InstanceState, failedAt string)
}
}

func newPod(phase v1.PodPhase, name, namespace string) *v1.Pod {
func newPod(phase v1.PodPhase, name, namespace, nodeID string) *v1.Pod {
if phase == "" {
return nil
}
Expand All @@ -66,6 +66,9 @@ func newPod(phase v1.PodPhase, name, namespace string) *v1.Pod {
Name: name,
Namespace: namespace,
},
Spec: v1.PodSpec{
NodeName: nodeID,
},
Status: v1.PodStatus{
Phase: phase,
PodIP: ip,
Expand Down Expand Up @@ -123,26 +126,35 @@ func (s *TestSuite) TestSyncReplica(c *C) {

//pod expection
expectedPods int

// replica exception
NodeID string
DataPath string
}{
"replica keep stopped": {
types.InstanceStateStopped, types.InstanceStateStopped,
types.InstanceStateStopped, false,
0,
0, TestNode1, TestDefaultDataPath,
},
"replica start": {
types.InstanceStateRunning, types.InstanceStateStopped,
types.InstanceStateRunning, false,
1,
1, TestNode1, TestDefaultDataPath,
},
"replica keep running": {
types.InstanceStateRunning, types.InstanceStateRunning,
types.InstanceStateRunning, false,
1,
1, TestNode1, TestDefaultDataPath,
},
"replica stop": {
types.InstanceStateStopped, types.InstanceStateRunning,
types.InstanceStateStopped, false,
0,
0, TestNode1, TestDefaultDataPath,
},
"replica error": {
types.InstanceStateRunning, types.InstanceStateStopped,
types.InstanceStateRunning, true,
1, "", "",
},
}

Expand All @@ -164,13 +176,15 @@ func (s *TestSuite) TestSyncReplica(c *C) {
// Need add to both indexer store and fake clientset, since they
// haven't connected yet
replica := newReplica(tc.desireState, tc.currentState, "")
replica.Spec.NodeID = tc.NodeID
replica.Spec.DataPath = tc.DataPath
err = rIndexer.Add(replica)
c.Assert(err, IsNil)
_, err = lhClient.LonghornV1alpha1().Replicas(replica.Namespace).Create(replica)
c.Assert(err, IsNil)

if tc.currentState == types.InstanceStateRunning {
pod := newPod(v1.PodRunning, replica.Name, replica.Namespace)
pod := newPod(v1.PodRunning, replica.Name, replica.Namespace, replica.Spec.NodeID)
err = pIndexer.Add(pod)
c.Assert(err, IsNil)
_, err = kubeClient.CoreV1().Pods(replica.Namespace).Create(pod)
Expand All @@ -182,12 +196,12 @@ func (s *TestSuite) TestSyncReplica(c *C) {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
}

// check fake clientset for resource update
podList, err := kubeClient.CoreV1().Pods(replica.Namespace).List(metav1.ListOptions{})
c.Assert(err, IsNil)
c.Assert(podList.Items, HasLen, tc.expectedPods)
// check fake clientset for resource update
podList, err := kubeClient.CoreV1().Pods(replica.Namespace).List(metav1.ListOptions{})
c.Assert(err, IsNil)
c.Assert(podList.Items, HasLen, tc.expectedPods)
}

// TODO State change won't work for now since pod state wasn't changed
//updatedReplica, err := lhClient.LonghornV1alpha1().Replicas(rc.namespace).Get(replica.Name, metav1.GetOptions{})
Expand Down
231 changes: 231 additions & 0 deletions scheduler/replica_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package scheduler

import (
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/controller"
"testing"

"github.com/rancher/longhorn-manager/datastore"
longhorn "github.com/rancher/longhorn-manager/k8s/pkg/apis/longhorn/v1alpha1"
lhfake "github.com/rancher/longhorn-manager/k8s/pkg/client/clientset/versioned/fake"
lhinformerfactory "github.com/rancher/longhorn-manager/k8s/pkg/client/informers/externalversions"
"github.com/rancher/longhorn-manager/types"
"github.com/rancher/longhorn-manager/util"
"github.com/stretchr/testify/require"
)

const (
TestNamespace = "default"
TestIP1 = "1.2.3.4"
TestIP2 = "5.6.7.8"
TestIP3 = "9.10.11.12"
TestNode1 = "test-node-name-1"
TestNode2 = "test-node-name-2"
TestNode3 = "test-node-name-3"

TestOwnerID1 = TestNode1
TestEngineImage = "longhorn-engine:latest"

TestVolumeName = "test-volume"
TestVolumeSize = 1073741824
TestVolumeStaleTimeout = 60

TestDefaultDataPath = "/var/lib/rancher/longhorn"

TestDaemon1 = "longhorn-manager-1"
TestDaemon2 = "longhorn-manager-2"
TestDaemon3 = "longhorn-manager-3"
)

func newReplicaScheduler(lhInformerFactory lhinformerfactory.SharedInformerFactory, kubeInformerFactory informers.SharedInformerFactory,
lhClient *lhfake.Clientset, kubeClient *fake.Clientset) *ReplicaScheduler {
fmt.Printf("testing NewReplicaScheduler\n")

volumeInformer := lhInformerFactory.Longhorn().V1alpha1().Volumes()
engineInformer := lhInformerFactory.Longhorn().V1alpha1().Engines()
replicaInformer := lhInformerFactory.Longhorn().V1alpha1().Replicas()
engineImageInformer := lhInformerFactory.Longhorn().V1alpha1().EngineImages()
nodeInformer := lhInformerFactory.Longhorn().V1alpha1().Nodes()

podInformer := kubeInformerFactory.Core().V1().Pods()
cronJobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs()
daemonSetInformer := kubeInformerFactory.Apps().V1beta2().DaemonSets()

ds := datastore.NewDataStore(volumeInformer, engineInformer, replicaInformer, engineImageInformer, lhClient,
podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace, nodeInformer)

return NewReplicaScheduler(ds)
}

func newDaemonPod(phase v1.PodPhase, name, namespace, nodeID, podIP string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"app": "longhorn-manager",
},
},
Spec: v1.PodSpec{
NodeName: nodeID,
},
Status: v1.PodStatus{
Phase: phase,
PodIP: podIP,
},
}
}

func newNode(name, namespace string, allowScheduling bool, nodeState types.NodeState) *longhorn.Node {
return &longhorn.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: types.NodeSpec{
AllowScheduling: allowScheduling,
Disks: map[string]types.DiskSpec{
TestDefaultDataPath: {
Path: TestDefaultDataPath,
},
},
},
Status: types.NodeStatus{
State: nodeState,
},
}
}

func newVolume(name string, replicaCount int) *longhorn.Volume {
return &longhorn.Volume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Finalizers: []string{
longhorn.SchemeGroupVersion.Group,
},
},
Spec: types.VolumeSpec{
NumberOfReplicas: replicaCount,
Size: TestVolumeSize,
OwnerID: TestOwnerID1,
StaleReplicaTimeout: TestVolumeStaleTimeout,
EngineImage: TestEngineImage,
},
}
}

func newReplicaForVolume(v *longhorn.Volume) *longhorn.Replica {
return &longhorn.Replica{
ObjectMeta: metav1.ObjectMeta{
Name: v.Name + "-r-" + util.RandomID(),
Labels: map[string]string{
"longhornvolume": v.Name,
},
},
Spec: types.ReplicaSpec{
InstanceSpec: types.InstanceSpec{
OwnerID: v.Spec.OwnerID,
VolumeName: v.Name,
VolumeSize: v.Spec.Size,
EngineImage: TestEngineImage,
DesireState: types.InstanceStateStopped,
},
},
}
}

func TestReplicaScheduler_ScheduleReplica(t *testing.T) {
assert := require.New(t)

kubeClient := fake.NewSimpleClientset()
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())

lhClient := lhfake.NewSimpleClientset()
lhInformerFactory := lhinformerfactory.NewSharedInformerFactory(lhClient, controller.NoResyncPeriodFunc())

vIndexer := lhInformerFactory.Longhorn().V1alpha1().Volumes().Informer().GetIndexer()
rIndexer := lhInformerFactory.Longhorn().V1alpha1().Replicas().Informer().GetIndexer()
nIndexer := lhInformerFactory.Longhorn().V1alpha1().Nodes().Informer().GetIndexer()
pIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer()

s := newReplicaScheduler(lhInformerFactory, kubeInformerFactory, lhClient, kubeClient)

// create daemon pod
daemon1 := newDaemonPod(v1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1)
p, err := kubeClient.CoreV1().Pods(TestNamespace).Create(daemon1)
assert.Nil(err)
pIndexer.Add(p)
daemon2 := newDaemonPod(v1.PodRunning, TestDaemon2, TestNamespace, TestNode2, TestIP2)
p, err = kubeClient.CoreV1().Pods(TestNamespace).Create(daemon2)
assert.Nil(err)
pIndexer.Add(p)
daemon3 := newDaemonPod(v1.PodRunning, TestDaemon3, TestNamespace, TestNode3, TestIP3)
p, err = kubeClient.CoreV1().Pods(TestNamespace).Create(daemon3)
assert.Nil(err)
pIndexer.Add(p)

// create test node
node1 := newNode(TestNode1, TestNamespace, true, types.NodeStateUp)
n1, err := lhClient.Longhorn().Nodes(TestNamespace).Create(node1)
assert.Nil(err)
assert.NotNil(n1)
nIndexer.Add(n1)

// node2 is not available for scheduler
node2 := newNode(TestNode2, TestNamespace, false, types.NodeStateUp)
n2, err := lhClient.Longhorn().Nodes(TestNamespace).Create(node2)
assert.Nil(err)
assert.NotNil(n2)
nIndexer.Add(n2)

// node3 is not available for scheduler
node3 := newNode(TestNode3, TestNamespace, true, types.NodeStateDown)
n3, err := lhClient.Longhorn().Nodes(TestNamespace).Create(node3)
assert.Nil(err)
assert.NotNil(n3)
nIndexer.Add(n3)

// create volume and replica for scheduler
v := newVolume(TestVolumeName, 2)
volume, err := lhClient.LonghornV1alpha1().Volumes(TestNamespace).Create(v)
assert.Nil(err)
assert.NotNil(volume)
vIndexer.Add(volume)

replica1 := newReplicaForVolume(volume)
r1, err := lhClient.LonghornV1alpha1().Replicas(TestNamespace).Create(replica1)
assert.Nil(err)
assert.NotNil(r1)
rIndexer.Add(r1)

replica2 := newReplicaForVolume(volume)
r2, err := lhClient.LonghornV1alpha1().Replicas(TestNamespace).Create(replica2)
assert.Nil(err)
assert.NotNil(r2)
rIndexer.Add(r2)

// validate scheduler
r, err := s.ScheduleReplica(r1)
assert.Nil(err)
assert.NotNil(r)
// assert could not scheduler to node2 and node3
assert.NotEqual(node2.Name, r.Spec.NodeID)
assert.NotEqual(node3.Name, r.Spec.NodeID)
assert.Equal(node1.Name, r.Spec.NodeID)

assert.Contains(r.Spec.DataPath, TestDefaultDataPath)

r, err = s.ScheduleReplica(r2)
assert.Nil(err)
assert.NotNil(r)
// assert could not scheduler to node2 and node3
assert.NotEqual(node2.Name, r.Spec.NodeID)
assert.NotEqual(node3.Name, r.Spec.NodeID)
assert.Equal(node1.Name, r.Spec.NodeID)

assert.Contains(r.Spec.DataPath, TestDefaultDataPath)
}

0 comments on commit 51fc723

Please sign in to comment.