Skip to content

Commit

Permalink
refecotr: nodelock
Browse files Browse the repository at this point in the history
Signed-off-by: googs1025 <googs1025@gmail.com>
  • Loading branch information
googs1025 committed Jun 24, 2024
1 parent 224c8d4 commit 0e1e8b4
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 120 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/util/k8s/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,5 @@ func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
}

func (s *Snapshot) IsPVCUsedByPods(key string) bool {
return false
panic("not implemented")
}
110 changes: 21 additions & 89 deletions pkg/scheduler/plugins/util/k8s/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,24 @@ package k8s

import (
"fmt"
"reflect"
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/framework"
"reflect"
"testing"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/util"
)

func TestSnapshot(t *testing.T) {
var (
nodeName = "test-node"
pod1 = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
}
pod2 = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
Labels: map[string]string{"test": "test"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
}
pod1 = util.BuildPod("node_info_cache_test", "test-1", nodeName,
v1.PodRunning, api.BuildResourceList("200m", "1Ki"), "pg2",
make(map[string]string), make(map[string]string))
pod2 = util.BuildPod("node_info_cache_test", "test-2", nodeName,
v1.PodRunning, api.BuildResourceList("200m", "1Ki"), "pg2",
map[string]string{"test": "test"}, make(map[string]string))
)

tests := []struct {
Expand All @@ -87,14 +36,8 @@ func TestSnapshot(t *testing.T) {
NonZeroRequested: &framework.Resource{},
Allocatable: &framework.Resource{},
Generation: 2,
UsedPorts: framework.HostPortInfo{
"127.0.0.1": map[framework.ProtocolPort]struct{}{
{Protocol: "TCP", Port: 80}: {},
{Protocol: "TCP", Port: 8080}: {},
},
},
ImageStates: map[string]*framework.ImageStateSummary{},
PVCRefCounts: map[string]int{},
ImageStates: map[string]*framework.ImageStateSummary{},
PVCRefCounts: map[string]int{},
Pods: []*framework.PodInfo{
{
Pod: pod1,
Expand Down Expand Up @@ -125,14 +68,8 @@ func TestSnapshot(t *testing.T) {
NonZeroRequested: &framework.Resource{},
Allocatable: &framework.Resource{},
Generation: 2,
UsedPorts: framework.HostPortInfo{
"127.0.0.1": map[framework.ProtocolPort]struct{}{
{Protocol: "TCP", Port: 80}: {},
{Protocol: "TCP", Port: 8080}: {},
},
},
ImageStates: map[string]*framework.ImageStateSummary{},
PVCRefCounts: map[string]int{},
ImageStates: map[string]*framework.ImageStateSummary{},
PVCRefCounts: map[string]int{},
Pods: []*framework.PodInfo{
{
Pod: pod1,
Expand Down Expand Up @@ -168,45 +105,40 @@ func TestSnapshot(t *testing.T) {
snapshot := NewSnapshot(tc.nodeInfoMap)
nodeInfoList, err := snapshot.List()
if !reflect.DeepEqual(nodeInfoList, tc.expectedNodeInfos) || err != nil {
t.Errorf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", tc.expectedNodeInfos, nodeInfoList, err)
t.Fatalf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", tc.expectedNodeInfos, nodeInfoList, err)
}

_, err = snapshot.Get(nodeName)
if !reflect.DeepEqual(tc.expectErr, err) {
t.Errorf("unexpected get nodeInfos by nodeName value (+got: %T/-want: %T)", err, tc.expectErr)
t.Fatalf("unexpected get nodeInfos by nodeName value (+got: %T/-want: %T)", err, tc.expectErr)
}

nodeInfoList, err = snapshot.HavePodsWithAffinityList()
if !reflect.DeepEqual(tc.expectedNodeInfos, nodeInfoList) || err != nil {
t.Errorf("unexpected list HavePodsWithAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
t.Fatalf("unexpected list HavePodsWithAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
}

nodeInfoList, err = snapshot.HavePodsWithRequiredAntiAffinityList()
if !reflect.DeepEqual(tc.expectedNodeInfos, nodeInfoList) || err != nil {
t.Errorf("unexpected list PodsWithRequiredAntiAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
t.Fatalf("unexpected list PodsWithRequiredAntiAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
}

sel, err := labels.Parse("test==test")
pods, err := snapshot.Pods().List(sel)
if !reflect.DeepEqual(tc.expectedPods, pods) || err != nil {
t.Errorf("unexpected list pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedNodeInfos, err)
t.Fatalf("unexpected list pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedNodeInfos, err)
}

pods, err = snapshot.Pods().FilteredList(func(pod *v1.Pod) bool {
return true
}, sel)
if !reflect.DeepEqual(tc.expectedPods, pods) || err != nil {
t.Errorf("unexpected list filtered pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedPods, err)
t.Fatalf("unexpected list filtered pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedPods, err)
}

nodeInfos, err := snapshot.NodeInfos().List()
if !reflect.DeepEqual(tc.expectedNodeInfos, nodeInfos) || err != nil {
t.Errorf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfos, tc.expectedNodeInfos, err)
}

getBool := snapshot.StorageInfos().IsPVCUsedByPods("test")
if !reflect.DeepEqual(false, getBool) {
t.Errorf("unexpected get StorageInfos PVCUsed value (+got: %v/-want: %v)", false, getBool)
t.Fatalf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfos, tc.expectedNodeInfos, err)
}
})
}
Expand Down
61 changes: 31 additions & 30 deletions pkg/scheduler/plugins/util/nodelock/nodelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -62,20 +63,11 @@ func UseClient(client kubernetes.Interface) error {
return nil
}

func setNodeLock(nodeName string, lockName string) error {
ctx := context.Background()
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Errorln("get node failed", err.Error())
return err
}
if _, ok := node.ObjectMeta.Annotations[lockName]; ok {
klog.V(3).Infof("node %s is locked", nodeName)
return fmt.Errorf("node %s is locked", nodeName)
}
func updateNodeAnnotations(ctx context.Context, node *v1.Node, updateFunc func(annotations map[string]string)) error {
newNode := node.DeepCopy()
newNode.ObjectMeta.Annotations[lockName] = time.Now().Format(time.RFC3339)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
updateFunc(newNode.ObjectMeta.Annotations)
nodeName := newNode.Name
_, err := kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
for i := 0; i < MaxLockRetry && err != nil; i++ {
klog.ErrorS(err, "Failed to update node", "node", nodeName, "retry", i)
time.Sleep(100 * time.Millisecond)
Expand All @@ -84,18 +76,38 @@ func setNodeLock(nodeName string, lockName string) error {
klog.ErrorS(err, "Failed to get node when retry to update", "node", nodeName)
continue
}
newNode := node.DeepCopy()
newNode.ObjectMeta.Annotations[lockName] = time.Now().Format(time.RFC3339)
newNode = node.DeepCopy()
updateFunc(newNode.ObjectMeta.Annotations)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
}
if err != nil {
klog.ErrorS(err, "Failed to update node", "node", nodeName)
return fmt.Errorf("failed to update node %s, exceeded retry count %d", nodeName, MaxLockRetry)
}
return nil
}

func setNodeLock(nodeName string, lockName string) error {
ctx := context.Background()
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return err
}
if _, ok := node.ObjectMeta.Annotations[lockName]; ok {
klog.V(3).Infof("node %s is locked", nodeName)
return fmt.Errorf("node %s is locked", nodeName)
}
updateFunc := func(annotations map[string]string) {
annotations[lockName] = time.Now().Format(time.RFC3339)
}
err = updateNodeAnnotations(ctx, node, updateFunc)
if err != nil {
return fmt.Errorf("setNodeLock exceeds retry count %d", MaxLockRetry)
}
klog.InfoS("Node lock set", "node", nodeName)
return nil
}

// ReleaseNodeLock release a certain device lock on a certain node
func ReleaseNodeLock(nodeName string, lockName string) error {
ctx := context.Background()
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
Expand All @@ -106,21 +118,10 @@ func ReleaseNodeLock(nodeName string, lockName string) error {
klog.V(3).InfoS("Node lock not set", "node", nodeName)
return nil
}
newNode := node.DeepCopy()
delete(newNode.ObjectMeta.Annotations, lockName)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
for i := 0; i < MaxLockRetry && err != nil; i++ {
klog.ErrorS(err, "Failed to update node", "node", nodeName, "retry", i)
time.Sleep(100 * time.Millisecond)
node, err = kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get node when retry to update", "node", nodeName)
continue
}
newNode := node.DeepCopy()
delete(newNode.ObjectMeta.Annotations, lockName)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
updateFunc := func(annotations map[string]string) {
delete(annotations, lockName)
}
err = updateNodeAnnotations(ctx, node, updateFunc)
if err != nil {
return fmt.Errorf("releaseNodeLock exceeds retry count %d", MaxLockRetry)
}
Expand Down

0 comments on commit 0e1e8b4

Please sign in to comment.