Skip to content

Commit

Permalink
[WIP] cgroupv2: add init cgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
shankerwangmiao committed Jul 24, 2021
1 parent 3ba70c6 commit 02a1447
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 4 deletions.
143 changes: 139 additions & 4 deletions worker/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/moby/moby/pkg/reexec"
cgv1 "github.com/containerd/cgroups"
cgv2 "github.com/containerd/cgroups/v2"
contspecs "github.com/opencontainers/runtime-spec/specs-go"
)

type cgroupHook struct {
Expand Down Expand Up @@ -66,6 +67,7 @@ func waitExec () {

func initCgroup(cfg *cgroupConfig) (error) {

logger.Debugf("Initializing cgroup")
baseGroup := cfg.Group
//subsystem := cfg.Subsystem

Expand All @@ -78,27 +80,160 @@ func initCgroup(cfg *cgroupConfig) (error) {
cfg.isUnified = cgv1.Mode() == cgv1.Unified

if cfg.isUnified {
var err error
logger.Debugf("Cgroup V2 detected")
g := baseGroup
if g == "" {
logger.Debugf("Detecting my cgroup path")
var err error
if g, err = cgv2.NestedGroupPath(""); err != nil {
return err
}
}
logger.Infof("Using cgroup path: %s", g)

var err error
if cfg.cgMgrV2, err = cgv2.LoadManager("/sys/fs/cgroup", g); err != nil {
return err
}
if baseGroup == "" {
logger.Debugf("Creating a sub group and move all processes into it")
wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil);
if err != nil {
return err
}
for {
logger.Debugf("Reading pids")
procs, err := cfg.cgMgrV2.Procs(false)
if err != nil {
logger.Errorf("Cannot read pids in that group")
return err
}
if len(procs) == 0 {
break
}
for _, p := range(procs) {
if err := wkrMgr.AddProc(p); err != nil{
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
return err
}
}
}
}
} else {
logger.Debugf("Trying to create a sub group in that group")
testMgr, err := cfg.cgMgrV2.NewChild("__test", nil);
if err != nil {
logger.Errorf("Cannot create a sub group in the cgroup")
return err
}
if err := testMgr.Delete(); err != nil {
return err
}
procs, err := cfg.cgMgrV2.Procs(false)
if err != nil {
logger.Errorf("Cannot read pids in that group")
return err
}
if len(procs) != 0 {
return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup)
}
}
} else {
var err error
logger.Debugf("Cgroup V1 detected")
var pather cgv1.Path
if baseGroup != "" {
pather = cgv1.StaticPath(baseGroup)
} else {
pather = cgv1.NestedPath("")
pather = (func(p cgv1.Path) (cgv1.Path){
return func(subsys cgv1.Name) (string, error){
path, err := p(subsys);
if err != nil {
return "", err
}
if path == "/" {
return "", cgv1.ErrControllerNotActive
}
return path, err
}
})(cgv1.NestedPath(""))
}
logger.Infof("Loading cgroup")
var err error
if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather); err != nil {
return err
}
logger.Debugf("Available subsystems:")
for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
p, err := pather(subsys.Name())
if err != nil {
return err
}
logger.Debugf("%s: %s", subsys.Name(), p)
}
if baseGroup == "" {
logger.Debugf("Creating a sub group and move all processes into it")
wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{});
if err != nil {
return err
}
for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
logger.Debugf("Reading pids for subsystem %s", subsys.Name())
for {
procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
if err != nil {
p, err := pather(subsys.Name())
if err != nil {
return err
}
logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
return err
}
if len(procs) == 0 {
break
}
for _, proc := range(procs) {
if err := wkrMgr.Add(proc); err != nil {
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
return err
}
}
}
}
}
} else {
logger.Debugf("Trying to create a sub group in that group")
testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{});
if err != nil {
logger.Errorf("Cannot create a sub group in the cgroup")
return err
}
if err := testMgr.Delete(); err != nil {
return err
}
for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
logger.Debugf("Reading pids for subsystem %s", subsys.Name())
procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
if err != nil {
p, err := pather(subsys.Name())
if err != nil {
return err
}
logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
return err
}
if len(procs) != 0 {
p, err := pather(subsys.Name())
if err != nil {
return err
}
return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name())
}
}
}
}

return nil
Expand All @@ -123,7 +258,7 @@ func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgrou
emptyHook: emptyHook{
provider: p,
},
basePath: basePath,
basePath: basePath,
baseGroup: baseGroup,
subsystem: subsystem,
}
Expand Down
4 changes: 4 additions & 0 deletions worker/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ sleep 30
So(err, ShouldBeNil)

cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
err = initCgroup(&cgcf)
So(err, ShouldBeNil)
cg := newCgroupHook(provider, cgcf, 0)
provider.AddHook(cg)

Expand Down Expand Up @@ -135,6 +137,8 @@ sleep 30
So(err, ShouldBeNil)

cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
err = initCgroup(&cgcf)
So(err, ShouldBeNil)
cg := newCgroupHook(provider, cgcf, 512 * units.MiB)
provider.AddHook(cg)

Expand Down
6 changes: 6 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
w.httpClient = httpClient
}

if cfg.Cgroup.Enable {
if err := initCgroup(&cfg.Cgroup); err != nil {
logger.Errorf("Error initializing Cgroup: %s", err.Error())
return nil
}
}
w.initJobs()
w.makeHTTPServer()
return w
Expand Down

0 comments on commit 02a1447

Please sign in to comment.