Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ovspinning: Set affinity of each thread #4470

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions go-controller/pkg/node/ovspinning/ovspinning_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,18 @@ func setProcessCPUAffinity(targetPIDStr string) error {
return nil
}

klog.Infof("Setting CPU affinity of PID(%d) to %s, was %s", targetPID, printCPUSet(currentProcessCPUs), printCPUSet(targetProcessCPUs))

err = unix.SchedSetaffinity(targetPID, &currentProcessCPUs)
taskIDs, err := getThreadsOfProcess(targetPID)
if err != nil {
return fmt.Errorf("can't set CPU affinity of PID(%d) to %s: %w", targetPID, printCPUSet(currentProcessCPUs), err)
return fmt.Errorf("can't get tasks of PID(%d):%w", targetPID, err)
}

klog.Infof("Setting CPU affinity of PID(%d) (ntasks=%d) to %s, was %s", targetPID, len(taskIDs), printCPUSet(currentProcessCPUs), printCPUSet(targetProcessCPUs))
for _, taskID := range taskIDs {
err = unix.SchedSetaffinity(taskID, &currentProcessCPUs)
if err != nil {
// The task may have been stopped, don't break the loop and continue setting CPU affinity on other tasks.
klog.Warningf("Error while setting CPU affinity of task(%d) PID(%d) to %s: %v", taskID, targetPID, printCPUSet(currentProcessCPUs), err)
}
}

return nil
Expand Down Expand Up @@ -237,3 +244,23 @@ func printCPUSet(cpus unix.CPUSet) string {
}
return strings.TrimRight(result.String(), ",")
}

// getThreadsOfProcess returns the list of thread IDs of the given process
func getThreadsOfProcess(pid int) ([]int, error) {
taskFolders, err := os.ReadDir(fmt.Sprintf("/proc/%d/task", pid))
if err != nil {
return nil, fmt.Errorf("unable to find %d tasks: %v", pid, err)
}

ret := []int{}
for _, taskFolder := range taskFolders {
taskID, err := strconv.Atoi(taskFolder.Name())
if err != nil {
return nil, fmt.Errorf("unable to get task ID of %d: %s, %v", pid, taskFolder.Name(), err)
}

ret = append(ret, taskID)
}

return ret, nil
}
33 changes: 22 additions & 11 deletions go-controller/pkg/node/ovspinning/ovspinning_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package ovspinning
import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"runtime"
Expand All @@ -20,7 +19,6 @@ import (
)

func TestAlignCPUAffinity(t *testing.T) {

ovsDBPid, ovsDBStop := mockOvsdbProcess(t)
defer ovsDBStop()

Expand Down Expand Up @@ -69,7 +67,7 @@ func TestAlignCPUAffinity(t *testing.T) {
}

// Disable the feature by making the enabler file empty
ioutil.WriteFile(featureEnablerFile, []byte(""), 0)
os.WriteFile(featureEnablerFile, []byte(""), 0)
assert.NoError(t, err)

var tmpCPUset unix.CPUSet
Expand All @@ -82,14 +80,13 @@ func TestAlignCPUAffinity(t *testing.T) {
}

func TestIsFileNotEmpty(t *testing.T) {

defer mockFeatureEnableFile(t, "")()

result, err := isFileNotEmpty(featureEnablerFile)
assert.NoError(t, err)
assert.False(t, result)

ioutil.WriteFile(featureEnablerFile, []byte("1"), 0)
os.WriteFile(featureEnablerFile, []byte("1"), 0)
result, err = isFileNotEmpty(featureEnablerFile)
assert.NoError(t, err)
assert.True(t, result)
Expand Down Expand Up @@ -135,7 +132,6 @@ func TestPrintCPUSetRanges(t *testing.T) {

func mockOvsdbProcess(t *testing.T) (int, func()) {
ctx, stopCmd := context.WithCancel(context.Background())
defer stopCmd()

cmd := exec.CommandContext(ctx, "sleep", "10")

Expand All @@ -155,10 +151,8 @@ func mockOvsdbProcess(t *testing.T) (int, func()) {

func mockOvsVSwitchdProcess(t *testing.T) (int, func()) {
ctx, stopCmd := context.WithCancel(context.Background())
defer stopCmd()

cmd := exec.CommandContext(ctx, "sleep", "10")

cmd := exec.CommandContext(ctx, "go", "run", "testdata/fake_thread_process.go")
err := cmd.Start()
assert.NoError(t, err)

Expand All @@ -167,6 +161,13 @@ func mockOvsVSwitchdProcess(t *testing.T) (int, func()) {
return fmt.Sprintf("%d", cmd.Process.Pid), nil
}

// Ensure the fake process has some thread
assert.Eventually(t, func() bool {
tasks, err := getThreadsOfProcess(cmd.Process.Pid)
assert.NoError(t, err)
return len(tasks) > 1
}, time.Second, 100*time.Millisecond, "ovs-vswitchd fake process does not have enough threads")

return cmd.Process.Pid, func() {
stopCmd()
getOvsVSwitchdPIDFn = previousGetter
Expand All @@ -184,13 +185,13 @@ func setTickDuration(d time.Duration) func() {

func mockFeatureEnableFile(t *testing.T, data string) func() {

f, err := ioutil.TempFile("", "enable_dynamic_cpu_affinity")
f, err := os.CreateTemp("", "enable_dynamic_cpu_affinity")
assert.NoError(t, err)

previousValue := featureEnablerFile
featureEnablerFile = f.Name()

ioutil.WriteFile(featureEnablerFile, []byte(data), 0)
os.WriteFile(featureEnablerFile, []byte(data), 0)
assert.NoError(t, err)

return func() {
Expand All @@ -207,6 +208,16 @@ func assertPIDHasSchedAffinity(t *testing.T, pid int, expectedCPUSet unix.CPUSet

return actual == expectedCPUSet
}, time.Second, 10*time.Millisecond, "pid[%d] Expected CPUSet %0x != Actual CPUSet %0x", pid, expectedCPUSet, actual)

tasks, err := getThreadsOfProcess(pid)
assert.NoError(t, err)

for _, task := range tasks {
err := unix.SchedGetaffinity(task, &actual)
assert.NoError(t, err)
assert.Equal(t, expectedCPUSet, actual,
"task[%d] of process[%d] Expected CPUSet %0x != Actual CPUSet %0x", task, pid, expectedCPUSet, actual)
}
}

func assertNeverPIDHasSchedAffinity(t *testing.T, pid int, targetCPUSet unix.CPUSet) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build testing_ovspinning
// +build testing_ovspinning

package main

// This file is meant to be used in unit tests to validate the behavior of the ovspinnging package.
// The purpose is to simulate a process with multiple threads, like the ovs-vswitchd daemon and Go programs
// spawns a pool of thread by default

import (
"time"
)

func main() {
time.Sleep(100 * time.Second)
}
Loading