Skip to content

Commit

Permalink
koordlet: revise cgroup update err when unsupported or path not exist
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed May 31, 2023
1 parent ca52ffb commit 2391957
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 59 deletions.
2 changes: 2 additions & 0 deletions pkg/koordlet/resourceexecutor/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func cgroupFileWriteIfDifferent(cgroupTaskDir string, r sysutil.Resource, value
if currentErr != nil {
return false, currentErr
}
// FIXME(saintube): Instead of handling cpuset resource in writing function, we should use a updater and do
// MergeUpdate in resourceexecutor's LeveledUpdateBatch.
if r.ResourceType() == sysutil.CPUSetCPUSName && cpuset.IsEqualStrCpus(currentValue, value) {
return false, nil
}
Expand Down
37 changes: 26 additions & 11 deletions pkg/koordlet/resourceexecutor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@ func Test_NewDefaultConfig(t *testing.T) {
}

func Test_InitFlags(t *testing.T) {
cmdArgs := []string{
"",
"--resource-force-update-seconds=120",
}
fs := flag.NewFlagSet(cmdArgs[0], flag.ExitOnError)

type fields struct {
ResourceForceUpdateSeconds int
}
type args struct {
fs *flag.FlagSet
fs *flag.FlagSet
cmdArgs []string
}
tests := []struct {
name string
Expand All @@ -54,18 +49,38 @@ func Test_InitFlags(t *testing.T) {
fields: fields{
ResourceForceUpdateSeconds: 120,
},
args: args{fs: fs},
args: args{
fs: flag.NewFlagSet("", flag.ExitOnError),
cmdArgs: []string{
"",
"--resource-force-update-seconds=120",
},
},
},
{
name: "not default 1",
fields: fields{
ResourceForceUpdateSeconds: 90,
},
args: args{
fs: flag.NewFlagSet("", flag.ExitOnError),
cmdArgs: []string{
"",
"--resource-force-update-seconds=90",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
raw := &Config{
want := &Config{
ResourceForceUpdateSeconds: tt.fields.ResourceForceUpdateSeconds,
}
c := NewDefaultConfig()
c.InitFlags(tt.args.fs)
tt.args.fs.Parse(cmdArgs[1:])
assert.Equal(t, raw, c)
err := tt.args.fs.Parse(tt.args.cmdArgs[1:])
assert.NoError(t, err, err)
assert.Equal(t, want, c)
})
}
}
75 changes: 47 additions & 28 deletions pkg/koordlet/resourceexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (e *ResourceUpdateExecutorImpl) Update(cacheable bool, resource ResourceUpd
// UpdateBatch updates a batch of resources with the given cacheable attribute.
// TODO: merge and resolve conflicts of batch updates from multiple callers.
func (e *ResourceUpdateExecutorImpl) UpdateBatch(cacheable bool, updaters ...ResourceUpdater) {
failures, unsupported := 0, 0
failures := 0
if cacheable {
if !e.gcStarted {
klog.Error("failed to cacheable update resources, err: cache GC is not started")
Expand All @@ -84,38 +84,30 @@ func (e *ResourceUpdateExecutorImpl) UpdateBatch(cacheable bool, updaters ...Res

for _, updater := range updaters {
isUpdated, err := e.updateByCache(updater)
if err != nil && sysutil.IsResourceUnsupportedErr(err) {
unsupported++
klog.V(5).Infof("failed to cacheable update unsupported resource %s to %v, isUpdated %v, err: %v",
updater.Key(), updater.Value(), isUpdated, err)
continue
} else if err != nil {
if err != nil {
failures++
klog.V(4).Infof("failed to cacheable update resource %s to %v, isUpdated %v, err: %v",
updater.Key(), updater.Value(), isUpdated, err)
continue
}

klog.V(5).Infof("successfully cacheable update resource %s to %v, isUpdated %v",
updater.Key(), updater.Value(), isUpdated)
}
} else {
for _, updater := range updaters {
err := e.update(updater)
if err != nil && sysutil.IsResourceUnsupportedErr(err) {
unsupported++
klog.V(5).Infof("failed to update unsupported resource %s to %v, err: %v",
updater.Key(), updater.Value(), err)
continue
} else if err != nil {
if err != nil {
failures++
klog.V(4).Infof("failed to update resource %s to %v, err: %v", updater.Key(), updater.Value(), err)
continue
}

klog.V(5).Infof("successfully update resource %s to %v", updater.Key(), updater.Value())
}
}
klog.V(6).Infof("finished batch updating resources, isCacheable %v, total %v, failures %v, unsupported %v",
cacheable, len(updaters), failures, unsupported)
klog.V(6).Infof("finished batch updating resources, isCacheable %v, total %v, failures %v",
cacheable, len(updaters), failures)
}

func (e *ResourceUpdateExecutorImpl) LeveledUpdateBatch(updaters [][]ResourceUpdater) {
Expand All @@ -135,14 +127,17 @@ func (e *ResourceUpdateExecutorImpl) LeveledUpdateBatch(updaters [][]ResourceUpd
}

mergedUpdater, err := updater.MergeUpdate()
if err != nil && (sysutil.IsResourceUnsupportedErr(err) || IsCgroupDirErr(err)) {
klog.V(5).Infof("failed merge update resource %s, err: %v", updater.Key(), err)
if err != nil && e.isUpdateErrIgnored(err) {
klog.V(5).Infof("failed to merge update resource %s to %v, ignored err: %v",
updater.Key(), updater.Value(), err)
continue
} else if err != nil {
klog.V(4).Infof("failed merge update resource %s, err: %v", updater.Key(), err)
}
if err != nil {
klog.V(4).Infof("failed to merge update resource %s to %v, err: %v",
updater.Key(), updater.Value(), err)
continue
}
klog.V(6).Infof("successfully merge update resource %s to %v", updater.Key(), updater.Value())
klog.V(5).Infof("successfully merge update resource %s to %v", updater.Key(), updater.Value())

if mergedUpdater == nil {
skipMerge[updater.Key()] = true
Expand Down Expand Up @@ -171,10 +166,7 @@ func (e *ResourceUpdateExecutorImpl) LeveledUpdateBatch(updaters [][]ResourceUpd
continue
}
err = updater.update()
if err != nil && (sysutil.IsResourceUnsupportedErr(err) || IsCgroupDirErr(err)) {
klog.V(5).Infof("failed update resource %s, err: %v", updater.Key(), err)
continue
} else if err != nil {
if err != nil {
klog.V(4).Infof("failed update resource %s, err: %v", updater.Key(), err)
continue
}
Expand All @@ -194,12 +186,16 @@ func (e *ResourceUpdateExecutorImpl) LeveledUpdateBatch(updaters [][]ResourceUpd
// TODO: run single executor when the qos manager starts.
func (e *ResourceUpdateExecutorImpl) Run(stopCh <-chan struct{}) {
e.onceRun.Do(func() {
_ = e.ResourceCache.Run(stopCh)
klog.V(4).Info("starting ResourceUpdateExecutor successfully")
e.gcStarted = true
e.run(stopCh)
})
}

func (e *ResourceUpdateExecutorImpl) run(stopCh <-chan struct{}) {
_ = e.ResourceCache.Run(stopCh)
klog.V(4).Info("starting ResourceUpdateExecutor successfully")
e.gcStarted = true
}

func (e *ResourceUpdateExecutorImpl) needUpdate(updater ResourceUpdater) bool {
preResource, _ := e.ResourceCache.Get(updater.Key())
if preResource == nil {
Expand All @@ -222,8 +218,12 @@ func (e *ResourceUpdateExecutorImpl) needUpdate(updater ResourceUpdater) bool {

func (e *ResourceUpdateExecutorImpl) update(updater ResourceUpdater) error {
err := updater.update()
if err != nil && e.isUpdateErrIgnored(err) {
klog.V(5).Infof("failed to update resource %s to %v, ignored err: %v", updater.Key(), updater.Value(), err)
return nil
}
if err != nil {
klog.V(4).Infof("failed to update resource %s to %v, err: %v", updater.Key(), updater.Value(), err)
klog.V(5).Infof("failed to update resource %s to %v, err: %v", updater.Key(), updater.Value(), err)
return err
}
klog.V(6).Infof("successfully update resource %s to %v", updater.Key(), updater.Value())
Expand All @@ -233,6 +233,10 @@ func (e *ResourceUpdateExecutorImpl) update(updater ResourceUpdater) error {
func (e *ResourceUpdateExecutorImpl) updateByCache(updater ResourceUpdater) (bool, error) {
if e.needUpdate(updater) {
err := updater.update()
if err != nil && e.isUpdateErrIgnored(err) {
klog.V(5).Infof("failed to cacheable update resource %s to %v, ignored err: %v", updater.Key(), updater.Value(), err)
return false, nil
}
if err != nil {
klog.V(5).Infof("failed to cacheable update resource %s to %v, err: %v", updater.Key(), updater.Value(), err)
return false, err
Expand All @@ -248,3 +252,18 @@ func (e *ResourceUpdateExecutorImpl) updateByCache(updater ResourceUpdater) (boo
}
return false, nil
}

func (e *ResourceUpdateExecutorImpl) isUpdateErrIgnored(err error) bool {
if err == nil {
return true
}
if sysutil.IsResourceUnsupportedErr(err) {
klog.V(6).Infof("update resource failed, ignored unsupported err: %v", err)
return true
}
if IsCgroupDirErr(err) {
klog.V(6).Infof("update resource failed, ignored cgroup not exist err: %v", err)
return true
}
return false
}
50 changes: 40 additions & 10 deletions pkg/koordlet/resourceexecutor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,16 @@ func TestResourceUpdateExecutor_Update(t *testing.T) {
assert.NoError(t, err)
testUpdater1, err := DefaultCgroupUpdaterFactory.New(sysutil.MemoryLimitName, "test", "1048576", &audit.EventHelper{})
assert.NoError(t, err)
testUpdater2, err := DefaultCgroupUpdaterFactory.New(sysutil.CPUSetCPUSName, "test", "0-31", &audit.EventHelper{})
assert.NoError(t, err)
testUpdater3, err := DefaultCgroupUpdaterFactory.New(sysutil.CPUSharesName, "test", "1024", &audit.EventHelper{})
assert.NoError(t, err)
testInvalidUpdater, err := DefaultCgroupUpdaterFactory.New(sysutil.CPUSetCPUSName, "test", "invalid content", &audit.EventHelper{})
assert.NoError(t, err)
type fields struct {
notStarted bool
updateErr bool
notStarted bool
pathNotExist bool
config *Config
}
type args struct {
isCacheable bool
Expand Down Expand Up @@ -93,36 +100,59 @@ func TestResourceUpdateExecutor_Update(t *testing.T) {
},
args: args{
isCacheable: true,
resource: testUpdater1,
resource: testUpdater2,
},
want: false,
wantErr: true,
},
{
name: "update error",
name: "cacheable update error",
args: args{
isCacheable: true,
resource: testInvalidUpdater,
},
want: false,
wantErr: true,
},
{
name: "ignore update error for path not exist",
fields: fields{
pathNotExist: true,
},
args: args{
isCacheable: false,
resource: testUpdater3,
},
want: true,
wantErr: false,
},
{
name: "ignore cacheable update error for path not exist",
fields: fields{
updateErr: true,
pathNotExist: true,
},
args: args{
isCacheable: true,
resource: testUpdater1,
resource: testUpdater3,
},
want: false,
wantErr: true,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := sysutil.NewFileTestUtil(t)
defer helper.Cleanup()
if !tt.fields.updateErr { // make update non-exist file error
if !tt.fields.pathNotExist { // prepare test file
helper.WriteFileContents(tt.args.resource.Path(), "")
}

e := &ResourceUpdateExecutorImpl{
ResourceCache: cache.NewCacheDefault(),
Config: NewDefaultConfig(),
}
if tt.fields.config != nil {
e.Config = tt.fields.config
}
if !tt.fields.notStarted {
stop := make(chan struct{})
defer func() {
Expand All @@ -134,7 +164,7 @@ func TestResourceUpdateExecutor_Update(t *testing.T) {

got, gotErr := e.Update(tt.args.isCacheable, tt.args.resource)
assert.Equal(t, tt.want, got)
assert.Equal(t, tt.wantErr, gotErr != nil)
assert.Equal(t, tt.wantErr, gotErr != nil, gotErr)
})
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/koordlet/resourceexecutor/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func NewMergeableCgroupUpdaterIfCPUSetLooser(resourceType sysutil.ResourceType,
}

// NewDetailCgroupUpdater returns a new *CgroupResourceUpdater according to the given Resource, which is generally used
// for backwards compatibility. It it not guaranteed for updating successfully since it does not retrieve from the
// for backwards compatibility. It is not guaranteed for updating successfully since it does not retrieve from the
// known cgroup resources.
func NewDetailCgroupUpdater(resource sysutil.Resource, parentDir string, value string, updateFunc UpdateFunc, e *audit.EventHelper) (ResourceUpdater, error) {
return &CgroupResourceUpdater{
Expand Down Expand Up @@ -458,6 +458,10 @@ func MergeConditionIfCPUSetIsLooser(oldValue, newValue string) (string, bool, er
return newValue, false, fmt.Errorf("old value is not valid cpuset, err: %v", err)
}

// no need to merge if new cpuset is equal to old
if v.Equals(old) {
return newValue, false, nil
}
// no need to merge if new cpuset is a subset of the old
if v.IsSubsetOf(old) {
return newValue, false, nil
Expand Down Expand Up @@ -499,10 +503,10 @@ func BlkIOUpdateFunc(resource ResourceUpdater) error {
return cgroupBlkIOFileWriteIfDifferent(info.parentDir, info.file, info.Value())
}

// resourceType sysutil.ResourceType, parentDir string, value string, updateFunc UpdateFunc
func NewBlkIOResourceUpdater(resourceType sysutil.ResourceType, parentDir string, value string, e *audit.EventHelper) (ResourceUpdater, error) {
return NewCgroupUpdater(resourceType, parentDir, value, BlkIOUpdateFunc, e)
}

func cgroupBlkIOFileWriteIfDifferent(cgroupTaskDir string, file sysutil.Resource, value string) error {
var needUpdate bool
currentValue, currentErr := cgroupFileRead(cgroupTaskDir, file)
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/runtimehooks/hooks/groupidentity/bvt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func Test_bvtPlugin_systemSupported(t *testing.T) {
testHelper := system.NewFileTestUtil(t)
defer testHelper.Cleanup()
testHelper.SetCgroupsV2(tt.fields.UseCgroupsV2)
testHelper.SetValidateResource(false)
if tt.fields.initPath != nil {
initCPUBvt(*tt.fields.initPath, 0, testHelper)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/util/system/system_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package system

import (
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/util/system/system_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package system

import (
Expand Down

0 comments on commit 2391957

Please sign in to comment.