Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregated used ports at the NodeInfo level. #42524

Merged
merged 1 commit into from Apr 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Expand Up @@ -814,8 +814,7 @@ func PodFitsHostPorts(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.No
return true, nil, nil
}

// TODO: Aggregate it at the NodeInfo level.
existingPorts := GetUsedPorts(nodeInfo.Pods()...)
existingPorts := nodeInfo.UsedPorts()
for wport := range wantPorts {
if wport != 0 && existingPorts[wport] {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
Expand Down
11 changes: 11 additions & 0 deletions plugin/pkg/scheduler/schedulercache/cache_test.go
Expand Up @@ -67,6 +67,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
},
}, {
pods: []*v1.Pod{testPods[1], testPods[2]},
Expand All @@ -81,6 +82,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1], testPods[2]},
usedPorts: map[int]bool{80: true, 8080: true},
},
}, { // test non-zero request
pods: []*v1.Pod{testPods[3]},
Expand All @@ -95,6 +97,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[3]},
usedPorts: map[int]bool{80: true},
},
}}

Expand Down Expand Up @@ -169,6 +172,7 @@ func TestExpirePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{80: false, 8080: true},
},
}}

Expand Down Expand Up @@ -217,6 +221,7 @@ func TestAddPodWillConfirm(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true, 8080: false},
},
}}

Expand Down Expand Up @@ -261,6 +266,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: map[int]bool{80: true},
},
}}

Expand Down Expand Up @@ -313,6 +319,7 @@ func TestUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{8080: true},
}, {
requestedResource: &Resource{
MilliCPU: 100,
Expand All @@ -324,6 +331,7 @@ func TestUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
}},
}}

Expand Down Expand Up @@ -378,6 +386,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{8080: true},
}, {
requestedResource: &Resource{
MilliCPU: 100,
Expand All @@ -389,6 +398,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
}},
}}

Expand Down Expand Up @@ -443,6 +453,7 @@ func TestRemovePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: map[int]bool{80: true},
},
}}

Expand Down
40 changes: 39 additions & 1 deletion plugin/pkg/scheduler/schedulercache/node_info.go
Expand Up @@ -36,6 +36,7 @@ type NodeInfo struct {

pods []*v1.Pod
podsWithAffinity []*v1.Pod
usedPorts map[int]bool

// Total requested resource of all pods on this node.
// It includes assumed pods which scheduler sends binding to apiserver but
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
allocatableResource: &Resource{},
allowedPodNumber: 0,
generation: 0,
usedPorts: make(map[int]bool),
}
for _, pod := range pods {
ni.addPod(pod)
Expand All @@ -123,6 +125,13 @@ func (n *NodeInfo) Pods() []*v1.Pod {
return n.pods
}

func (n *NodeInfo) UsedPorts() map[int]bool {
if n == nil {
return nil
}
return n.usedPorts
}

// PodsWithAffinity return all pods with (anti)affinity constraints on this node.
func (n *NodeInfo) PodsWithAffinity() []*v1.Pod {
if n == nil {
Expand Down Expand Up @@ -198,6 +207,12 @@ func (n *NodeInfo) Clone() *NodeInfo {
if len(n.pods) > 0 {
clone.pods = append([]*v1.Pod(nil), n.pods...)
}
if len(n.usedPorts) > 0 {
clone.usedPorts = make(map[int]bool)
for k, v := range n.usedPorts {
clone.usedPorts[k] = v
}
}
if len(n.podsWithAffinity) > 0 {
clone.podsWithAffinity = append([]*v1.Pod(nil), n.podsWithAffinity...)
}
Expand All @@ -213,7 +228,7 @@ func (n *NodeInfo) String() string {
for i, pod := range n.pods {
podKeys[i] = pod.Name
}
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v}", podKeys, n.requestedResource, n.nonzeroRequest)
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v}", podKeys, n.requestedResource, n.nonzeroRequest, n.usedPorts)
}

func hasPodAffinityConstraints(pod *v1.Pod) bool {
Expand All @@ -239,6 +254,10 @@ func (n *NodeInfo) addPod(pod *v1.Pod) {
if hasPodAffinityConstraints(pod) {
n.podsWithAffinity = append(n.podsWithAffinity, pod)
}

// Consume ports when pods added.
n.updateUsedPorts(pod, true)

n.generation++
}

Expand Down Expand Up @@ -286,7 +305,12 @@ func (n *NodeInfo) removePod(pod *v1.Pod) error {
}
n.nonzeroRequest.MilliCPU -= non0_cpu
n.nonzeroRequest.Memory -= non0_mem

// Release ports when remove Pods.
n.updateUsedPorts(pod, false)

n.generation++

return nil
}
}
Expand Down Expand Up @@ -318,6 +342,20 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6
return
}

func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) {
for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j]
for k := range container.Ports {
podPort := &container.Ports[k]
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort.HostPort != 0 {
n.usedPorts[int(podPort.HostPort)] = used
}
}
}
}

// Sets the overall node information.
func (n *NodeInfo) SetNode(node *v1.Node) error {
n.node = node
Expand Down