Skip to content

Commit

Permalink
refecotr: nodelock
Browse files Browse the repository at this point in the history
Signed-off-by: googs1025 <googs1025@gmail.com>
  • Loading branch information
googs1025 committed May 8, 2024
1 parent 224c8d4 commit 673650c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/util/k8s/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,5 @@ func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
}

func (s *Snapshot) IsPVCUsedByPods(key string) bool {
return false
panic("not implemented")
}
16 changes: 8 additions & 8 deletions pkg/scheduler/plugins/util/k8s/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,45 +168,45 @@ func TestSnapshot(t *testing.T) {
snapshot := NewSnapshot(tc.nodeInfoMap)
nodeInfoList, err := snapshot.List()
if !reflect.DeepEqual(nodeInfoList, tc.expectedNodeInfos) || err != nil {
t.Errorf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", tc.expectedNodeInfos, nodeInfoList, err)
t.Fatalf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", tc.expectedNodeInfos, nodeInfoList, err)
}

_, err = snapshot.Get(nodeName)
if !reflect.DeepEqual(tc.expectErr, err) {
t.Errorf("unexpected get nodeInfos by nodeName value (+got: %T/-want: %T)", err, tc.expectErr)
t.Fatalf("unexpected get nodeInfos by nodeName value (+got: %T/-want: %T)", err, tc.expectErr)
}

nodeInfoList, err = snapshot.HavePodsWithAffinityList()
if !reflect.DeepEqual(tc.expectedNodeInfos, nodeInfoList) || err != nil {
t.Errorf("unexpected list HavePodsWithAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
t.Fatalf("unexpected list HavePodsWithAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
}

nodeInfoList, err = snapshot.HavePodsWithRequiredAntiAffinityList()
if !reflect.DeepEqual(tc.expectedNodeInfos, nodeInfoList) || err != nil {
t.Errorf("unexpected list PodsWithRequiredAntiAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
t.Fatalf("unexpected list PodsWithRequiredAntiAffinity nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfoList, tc.expectedNodeInfos, err)
}

sel, err := labels.Parse("test==test")
pods, err := snapshot.Pods().List(sel)
if !reflect.DeepEqual(tc.expectedPods, pods) || err != nil {
t.Errorf("unexpected list pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedNodeInfos, err)
t.Fatalf("unexpected list pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedNodeInfos, err)
}

pods, err = snapshot.Pods().FilteredList(func(pod *v1.Pod) bool {
return true
}, sel)
if !reflect.DeepEqual(tc.expectedPods, pods) || err != nil {
t.Errorf("unexpected list filtered pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedPods, err)
t.Fatalf("unexpected list filtered pods value (+got: %s/-want: %s), err: %s", pods, tc.expectedPods, err)
}

nodeInfos, err := snapshot.NodeInfos().List()
if !reflect.DeepEqual(tc.expectedNodeInfos, nodeInfos) || err != nil {
t.Errorf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfos, tc.expectedNodeInfos, err)
t.Fatalf("unexpected list nodeInfos value (+got: %s/-want: %s), err: %s", nodeInfos, tc.expectedNodeInfos, err)
}

getBool := snapshot.StorageInfos().IsPVCUsedByPods("test")
if !reflect.DeepEqual(false, getBool) {
t.Errorf("unexpected get StorageInfos PVCUsed value (+got: %v/-want: %v)", false, getBool)
t.Fatalf("unexpected get StorageInfos PVCUsed value (+got: %v/-want: %v)", false, getBool)
}
})
}
Expand Down
61 changes: 31 additions & 30 deletions pkg/scheduler/plugins/util/nodelock/nodelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -62,20 +63,11 @@ func UseClient(client kubernetes.Interface) error {
return nil
}

func setNodeLock(nodeName string, lockName string) error {
ctx := context.Background()
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Errorln("get node failed", err.Error())
return err
}
if _, ok := node.ObjectMeta.Annotations[lockName]; ok {
klog.V(3).Infof("node %s is locked", nodeName)
return fmt.Errorf("node %s is locked", nodeName)
}
func updateNodeAnnotations(ctx context.Context, node *v1.Node, updateFunc func(annotations map[string]string)) error {
newNode := node.DeepCopy()
newNode.ObjectMeta.Annotations[lockName] = time.Now().Format(time.RFC3339)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
updateFunc(newNode.ObjectMeta.Annotations)
nodeName := newNode.Name
_, err := kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
for i := 0; i < MaxLockRetry && err != nil; i++ {
klog.ErrorS(err, "Failed to update node", "node", nodeName, "retry", i)
time.Sleep(100 * time.Millisecond)
Expand All @@ -84,18 +76,38 @@ func setNodeLock(nodeName string, lockName string) error {
klog.ErrorS(err, "Failed to get node when retry to update", "node", nodeName)
continue
}
newNode := node.DeepCopy()
newNode.ObjectMeta.Annotations[lockName] = time.Now().Format(time.RFC3339)
newNode = node.DeepCopy()
updateFunc(newNode.ObjectMeta.Annotations)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
}
if err != nil {
klog.ErrorS(err, "Failed to update node", "node", nodeName)
return fmt.Errorf("failed to update node %s, exceeded retry count %d", nodeName, MaxLockRetry)
}
return nil
}

func setNodeLock(nodeName string, lockName string) error {
ctx := context.Background()
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return err
}
if _, ok := node.ObjectMeta.Annotations[lockName]; ok {
klog.V(3).Infof("node %s is locked", nodeName)
return fmt.Errorf("node %s is locked", nodeName)
}
updateFunc := func(annotations map[string]string) {
annotations[lockName] = time.Now().Format(time.RFC3339)
}
err = updateNodeAnnotations(ctx, node, updateFunc)
if err != nil {
return fmt.Errorf("setNodeLock exceeds retry count %d", MaxLockRetry)
}
klog.InfoS("Node lock set", "node", nodeName)
return nil
}

// ReleaseNodeLock release a certain device lock on a certain node
func ReleaseNodeLock(nodeName string, lockName string) error {
ctx := context.Background()
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
Expand All @@ -106,21 +118,10 @@ func ReleaseNodeLock(nodeName string, lockName string) error {
klog.V(3).InfoS("Node lock not set", "node", nodeName)
return nil
}
newNode := node.DeepCopy()
delete(newNode.ObjectMeta.Annotations, lockName)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
for i := 0; i < MaxLockRetry && err != nil; i++ {
klog.ErrorS(err, "Failed to update node", "node", nodeName, "retry", i)
time.Sleep(100 * time.Millisecond)
node, err = kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get node when retry to update", "node", nodeName)
continue
}
newNode := node.DeepCopy()
delete(newNode.ObjectMeta.Annotations, lockName)
_, err = kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
updateFunc := func(annotations map[string]string) {
delete(annotations, lockName)
}
err = updateNodeAnnotations(ctx, node, updateFunc)
if err != nil {
return fmt.Errorf("releaseNodeLock exceeds retry count %d", MaxLockRetry)
}
Expand Down

0 comments on commit 673650c

Please sign in to comment.