Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable file back state in static policy #54409

Merged
merged 2 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type NodeConfig struct {
CgroupsPerQOS bool
CgroupRoot string
CgroupDriver string
KubeletRootDir string
ProtectKernelDefaults bool
NodeAllocatableConfig
ExperimentalQOSReserved map[v1.ResourceName]int64
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
)
if err != nil {
glog.Errorf("failed to initialize cpu manager: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/cm/cpumanager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_test(
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down
11 changes: 10 additions & 1 deletion pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
"path"
)

// ActivePodsFunc is a function that returns a list of pods to reconcile.
Expand All @@ -44,6 +45,9 @@ type runtimeService interface {

type policyName string

// CPUManagerStateFileName is the name file name where cpu manager stores it's state
const CPUManagerStateFileName = "cpu_manager_state"

// Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface {
// Start is called during Kubelet initialization.
Expand Down Expand Up @@ -99,13 +103,16 @@ func NewManager(
reconcilePeriod time.Duration,
machineInfo *cadvisorapi.MachineInfo,
nodeAllocatableReservation v1.ResourceList,
stateFileDirecory string,
) (Manager, error) {
var policy Policy
var stateHandle state.State

switch policyName(cpuPolicyName) {

case PolicyNone:
policy = NewNonePolicy()
stateHandle = state.NewMemoryState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a small edge case when testing this patch manually:

  1. Run some pinned containers with the static policy
  2. Stop kubelet
  3. Terminate containers
  4. Restart Kubelet with the none policy
  5. Restart Kubelet with the static policy

The RemoveContainer calls that we use to clean the state file are delivered to the None policy, which drops them on the floor. When the static policy comes up again, the previously pinned CPUs are effectively leaked. Since this is a subtle issue in itself, I prefer we land this patch as-is and create an issue to fix this edge case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flyingcougar if you want to fix this as part of this PR, that's OK too. @balajismaniam and I discussed it offline and think it would make sense to also use the file backed state for the none policy. That way, in step 4 above, we panic and tell the operator to delete the state file. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we need to use the file state in the none policy to execute that code! Otherwise we'll never read/write the policy name in the file. Please correct me if I'm wrong, were you able to reproduce the issue described above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back LGTM; let's take this up in a new PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking this in issue #55736


case PolicyStatic:
topo, err := topology.Discover(machineInfo)
Expand Down Expand Up @@ -134,16 +141,18 @@ func NewManager(
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy = NewStaticPolicy(topo, numReservedCPUs)
stateHandle = state.NewFileState(path.Join(stateFileDirecory, CPUManagerStateFileName), policy.Name())

default:
glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone)
policy = NewNonePolicy()
stateHandle = state.NewMemoryState()
}

manager := &manager{
policy: policy,
reconcilePeriod: reconcilePeriod,
state: state.NewMemoryState(),
state: stateHandle,
machineInfo: machineInfo,
nodeAllocatableReservation: nodeAllocatableReservation,
}
Expand Down
126 changes: 126 additions & 0 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ package cpumanager
import (
"fmt"
"reflect"
"strings"
"testing"
"time"

cadvisorapi "github.com/google/cadvisor/info/v1"
"io/ioutil"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -29,6 +33,7 @@ import (
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"os"
)

type mockState struct {
Expand Down Expand Up @@ -223,6 +228,127 @@ func TestCPUManagerAdd(t *testing.T) {
}
}

func TestCPUManagerGenerate(t *testing.T) {
testCases := []struct {
description string
cpuPolicyName string
nodeAllocatableReservation v1.ResourceList
isTopologyBroken bool
panicMsg string
expectedPolicy string
expectedError error
skipIfPermissionsError bool
}{
{
description: "set none policy",
cpuPolicyName: "none",
nodeAllocatableReservation: nil,
expectedPolicy: "none",
},
{
description: "invalid policy name",
cpuPolicyName: "invalid",
nodeAllocatableReservation: nil,
expectedPolicy: "none",
},
{
description: "static policy",
cpuPolicyName: "static",
nodeAllocatableReservation: v1.ResourceList{v1.ResourceCPU: *resource.NewQuantity(3, resource.DecimalSI)},
expectedPolicy: "static",
skipIfPermissionsError: true,
},
{
description: "static policy - broken topology",
cpuPolicyName: "static",
nodeAllocatableReservation: v1.ResourceList{},
isTopologyBroken: true,
expectedError: fmt.Errorf("could not detect number of cpus"),
skipIfPermissionsError: true,
},
{
description: "static policy - broken reservation",
cpuPolicyName: "static",
nodeAllocatableReservation: v1.ResourceList{},
panicMsg: "unable to determine reserved CPU resources for static policy",
skipIfPermissionsError: true,
},
{
description: "static policy - no CPU resources",
cpuPolicyName: "static",
nodeAllocatableReservation: v1.ResourceList{v1.ResourceCPU: *resource.NewQuantity(0, resource.DecimalSI)},
panicMsg: "the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero",
skipIfPermissionsError: true,
},
}

mockedMachineInfo := cadvisorapi.MachineInfo{
NumCores: 4,
Topology: []cadvisorapi.Node{
{
Cores: []cadvisorapi.Core{
{
Id: 0,
Threads: []int{0},
},
{
Id: 1,
Threads: []int{1},
},
{
Id: 2,
Threads: []int{2},
},
{
Id: 3,
Threads: []int{3},
},
},
},
},
}

for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
machineInfo := &mockedMachineInfo
if testCase.isTopologyBroken {
machineInfo = &cadvisorapi.MachineInfo{}
}
sDir, err := ioutil.TempDir("/tmp/", "cpu_manager_test")
if err != nil {
t.Errorf("cannot create state file: %s", err.Error())
}
defer os.RemoveAll(sDir)
defer func() {
if err := recover(); err != nil {
if testCase.panicMsg != "" {
if !strings.Contains(err.(string), testCase.panicMsg) {
t.Errorf("Unexpected panic message. Have: %q wants %q", err, testCase.panicMsg)
}
} else {
t.Errorf("Unexpected panic: %q", err)
}
} else if testCase.panicMsg != "" {
t.Error("Expected panic hasn't been raised")
}
}()

mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir)
if testCase.expectedError != nil {
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
}
} else {
rawMgr := mgr.(*manager)
if rawMgr.policy.Name() != testCase.expectedPolicy {
t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy)
}
}
})

}
}

func TestCPUManagerRemove(t *testing.T) {
mgr := &manager{
policy: &mockPolicy{
Expand Down
42 changes: 39 additions & 3 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,45 @@ func (p *staticPolicy) Name() string {
}

func (p *staticPolicy) Start(s state.State) {
// Configure the shared pool to include all detected CPU IDs.
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
if err := p.validateState(s); err != nil {
glog.Errorf("[cpumanager] static policy invalid state: %s\n", err.Error())
panic("[cpumanager] - please drain node and remove policy state file")
}
}

func (p *staticPolicy) validateState(s state.State) error {
tmpAssignments := s.GetCPUAssignments()
tmpDefaultCPUset := s.GetDefaultCPUSet()

// Default cpuset cannot be empty when assignments exist
if tmpDefaultCPUset.IsEmpty() {
if len(tmpAssignments) != 0 {
return fmt.Errorf("default cpuset cannot be empty")
}
// state is empty initialize
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
return nil
}

// State has already been initialized from file (is not empty)
// 1 Check if the reserved cpuset is not part of default cpuset because:
// - kube/system reserved have changed (increased) - may lead to some containers not being able to start
// - user tampered with file
if !p.reserved.Intersection(tmpDefaultCPUset).Equals(p.reserved) {
return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
p.reserved.String(), tmpDefaultCPUset.String())
}

// 2. Check if state for static policy is consistent
for cID, cset := range tmpAssignments {
// None of the cpu in DEFAULT cset should be in s.assignments
if !tmpDefaultCPUset.Intersection(cset).IsEmpty() {
return fmt.Errorf("container id: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"",
cID, cset.String(), tmpDefaultCPUset.String())
}
}
return nil
}

// assignableCPUs returns the set of unassigned CPUs minus the reserved set.
Expand Down
76 changes: 66 additions & 10 deletions pkg/kubelet/cm/cpumanager/policy_static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type staticPolicyTest struct {
expErr error
expCPUAlloc bool
expCSet cpuset.CPUSet
expPanic bool
}

func TestStaticPolicyName(t *testing.T) {
Expand All @@ -51,18 +52,73 @@ func TestStaticPolicyName(t *testing.T) {
}

func TestStaticPolicyStart(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1).(*staticPolicy)

st := &mockState{
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
testCases := []staticPolicyTest{
{
description: "non-corrupted state",
topo: topoDualSocketHT,
stAssignments: state.ContainerCPUAssignments{
"0": cpuset.NewCPUSet(0),
},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expCSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
description: "empty cpuset",
topo: topoDualSocketHT,
numReservedCPUs: 1,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(),
expCSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
description: "reserved cores 0 & 6 are not present in available cpuset",
topo: topoDualSocketHT,
numReservedCPUs: 2,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1),
expPanic: true,
},
{
description: "assigned core 2 is still present in available cpuset",
topo: topoDualSocketHT,
stAssignments: state.ContainerCPUAssignments{
"0": cpuset.NewCPUSet(0, 1, 2),
},
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expPanic: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
defer func() {
if err := recover(); err != nil {
if !testCase.expPanic {
t.Errorf("unexpected panic occured: %q", err)
}
} else if testCase.expPanic {
t.Error("expected panic doesn't occured")
}
}()
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs).(*staticPolicy)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
policy.Start(st)

policy.Start(st)
for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ {
if !st.defaultCPUSet.Contains(cpuid) {
t.Errorf("StaticPolicy Start() error. expected cpuid %d to be present in defaultCPUSet", cpuid)
}
if !testCase.stDefaultCPUSet.IsEmpty() {
for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ {
if !st.defaultCPUSet.Contains(cpuid) {
t.Errorf("StaticPolicy Start() error. expected cpuid %d to be present in defaultCPUSet", cpuid)
}
}
}
if !st.GetDefaultCPUSet().Equals(testCase.expCSet) {
t.Errorf("State CPUSet is different than expected. Have %q wants: %q", st.GetDefaultCPUSet(),
testCase.expCSet)
}

})
}
}

Expand Down