Skip to content

Commit

Permalink
Merge pull request #401 from VladoLavor/cpu-affinity
Browse files Browse the repository at this point in the history
Process manager: option to set CPU affinity
  • Loading branch information
VladoLavor committed Aug 14, 2019
2 parents 5002489 + d9b1f76 commit b526ab9
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 47 deletions.
32 changes: 23 additions & 9 deletions exec/processmanager/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"io"
"os"
"os/exec"
"time"

"github.com/ligato/cn-infra/exec/processmanager/status"
"github.com/ligato/cn-infra/exec/processmanager/template"
Expand Down Expand Up @@ -326,7 +327,9 @@ func (p *Plugin) processToTemplate(pr *Process) (*process.Template, error) {
}
return true
}(pr.options.notifyChan),
AutoTerminate: pr.options.autoTerm,
AutoTerminate: pr.options.autoTerm,
CpuAffinity: pr.options.cpuAffinity,
CpuAffinityDelay: pr.options.cpuAffinityDelay.String(),
}

return &process.Template{
Expand All @@ -347,29 +350,31 @@ func (p *Plugin) templateToProcess(tmp *process.Template) (*Process, error) {

pOptions := &POptions{}
if tmp.POptions != nil {
pOptions.args = tmp.POptions.Args
pOptions.args = tmp.POptions.GetArgs()
pOptions.outWriter = func(isSet bool) io.Writer {
if isSet {
return os.Stdout
}
return nil
}(tmp.POptions.OutWriter)
}(tmp.POptions.GetOutWriter())
pOptions.errWriter = func(isSet bool) io.Writer {
if isSet {
return os.Stderr
}
return nil
}(tmp.POptions.ErrWriter)
pOptions.detach = tmp.POptions.Detach
pOptions.restart = tmp.POptions.Restart
pOptions.runOnStartup = tmp.POptions.RunOnStartup
}(tmp.POptions.GetErrWriter())
pOptions.detach = tmp.POptions.GetDetach()
pOptions.restart = tmp.POptions.GetRestart()
pOptions.runOnStartup = tmp.POptions.GetRunOnStartup()
pOptions.notifyChan = func(notify bool) chan status.ProcessStatus {
if notify {
return make(chan status.ProcessStatus)
}
return nil
}(tmp.POptions.Notify)
pOptions.autoTerm = tmp.POptions.AutoTerminate
}(tmp.POptions.GetNotify())
pOptions.autoTerm = tmp.POptions.GetAutoTerminate()
pOptions.cpuAffinity = tmp.POptions.GetCpuAffinity()
pOptions.cpuAffinityDelay = p.parseDuration(tmp.POptions.GetCpuAffinityDelay())
}

return &Process{
Expand All @@ -384,3 +389,12 @@ func (p *Plugin) templateToProcess(tmp *process.Template) (*Process, error) {
cancelChan: make(chan struct{}),
}, nil
}

func (p *Plugin) parseDuration(durationStr string) (duration time.Duration) {
var err error
duration, err = time.ParseDuration(durationStr)
if err != nil {
p.Log.Errorf("failed to parse duration %s: %v", err)
}
return
}
41 changes: 36 additions & 5 deletions exec/processmanager/process_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"io"
"os"
"os/exec"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -81,6 +82,15 @@ func (p *Process) startProcess() (cmd *exec.Cmd, err error) {
go p.watch()
}

// set process CPU lock in another go routine
// since it may contain delayed startup
if p.options.cpuAffinity != "" {
go func() {
time.Sleep(p.options.cpuAffinityDelay)
p.setProcessCPUAffinity(cmd)
}()
}

if cmd.Process != nil {
_, err = p.sh.ReadStatusFromPID(cmd.Process.Pid)
}
Expand Down Expand Up @@ -149,7 +159,7 @@ func (p *Process) isAlive() bool {
return true
}

// WaitOnCommand waits until the command completes
// waits until the command completes
func (p *Process) waitOnProcess() (*os.ProcessState, error) {
if p.command == nil || p.command.Process == nil {
return &os.ProcessState{}, nil
Expand All @@ -158,7 +168,7 @@ func (p *Process) waitOnProcess() (*os.ProcessState, error) {
return p.command.Process.Wait()
}

// Delete stops the process and internal watcher
// stops the process and internal watcher
func (p *Process) deleteProcess() error {
if p.command == nil || p.command.Process == nil {
return nil
Expand All @@ -173,15 +183,36 @@ func (p *Process) deleteProcess() error {
return nil
}

// WaitOnCommand waits until the command completes
// sends custom signal to process
func (p *Process) signalToProcess(signal os.Signal) error {
if p.command == nil || p.command.Process == nil {
p.log.Warn("Attempt to send signal to non-running process")
err := errors.Errorf("attempt to send signal to non-running process")
p.log.Error(err)
return err
}

return p.command.Process.Signal(signal)
}

// setProcessCpuAffinity process CPU affinity. Unsuccessful CPU assignment does not return error
// (message is only printed).
func (p *Process) setProcessCPUAffinity(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil || p.options == nil || p.options.cpuAffinity == "" {
return
}
if _, err := strconv.ParseUint(p.options.cpuAffinity, 16, 64); err != nil {
p.log.Errorf("Provided CPU affinity value %s for process %s is not valid (error: %v)",
p.options.cpuAffinity, "supervisor", err)
return
}
if _, err := exec.Command("taskset", "-p", p.options.cpuAffinity,
strconv.Itoa(cmd.Process.Pid)).Output(); err != nil {
p.log.Errorf("Failed to assign CPU affinity to process %s: %v", "supervisor", err)
return
}
p.log.Warnf("CPU affinity %s set to process %s", p.options.cpuAffinity, p.name)
}

// Periodically tries to 'ping' process. If the process is unresponsive, marks it as terminated. Otherwise the process
// status is updated. If process status was changed, notification is sent. In addition, terminated processes are
// restarted if allowed by policy, and dead processes are cleaned up.
Expand Down Expand Up @@ -282,4 +313,4 @@ func (p *Process) closeNotifyChan() {

p.isWatched = false
p.log.Debugf("Process %s watcher stopped", p.name)
}
}
13 changes: 13 additions & 0 deletions exec/processmanager/process_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package processmanager

import (
"io"
"time"

"github.com/ligato/cn-infra/exec/processmanager/status"
)
Expand Down Expand Up @@ -47,6 +48,10 @@ type POptions struct {

// auto-terminate
autoTerm bool

// cpu affinity
cpuAffinity string
cpuAffinityDelay time.Duration
}

// POption is helper function to set process options
Expand Down Expand Up @@ -111,3 +116,11 @@ func AutoTerminate() POption {
p.autoTerm = true
}
}

// CPUAffinityMask allows to set CPU affinity to given process
func CPUAffinityMask(affinity string, delay time.Duration) POption {
return func(p *POptions) {
p.cpuAffinity = affinity
p.cpuAffinityDelay = delay
}
}
55 changes: 37 additions & 18 deletions exec/processmanager/template/model/process/process.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 17 additions & 14 deletions exec/processmanager/template/model/process/process.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ syntax = "proto3";
// Package process provides a data model for process manager plugin template
package process;

message Template { /* Template is process definition which can be stored as file and read on plugin
startup */
string name = 1; /* Serves as a filename for given template */
string cmd = 2; /* Process command */
message Template { /* Template is process definition which can be stored as file and read on plugin
startup */
string name = 1; /* Serves as a filename for given template */
string cmd = 2; /* Process command */
message pOptions {
repeated string args = 1; /* Arguments process will be started with */
bool out_writer = 2; /* Use output writer. Note: only os.StdOut can be used this way */
bool err_writer = 3; /* Use error writer. Note: only os.StdErr can be used this way */
int32 restart = 4; /* Number of automatic restarts, <0> means no restart, <-1> always restart */
bool detach = 5; /* Set to true if process should be detached from parent application on startup */
bool run_on_startup = 6; /* Set to true if process is expected to start right after plugin initialization */
bool notify = 7; /* Create a notification channel (can be obtained via `GetNotification`) where
all status change events can be watched*/
bool auto_terminate = 8; /* This option ensures that every process which turns zombie/dead is automatically
released */
repeated string args = 1; /* Arguments process will be started with */
bool out_writer = 2; /* Use output writer. Note: only os.StdOut can be used this way */
bool err_writer = 3; /* Use error writer. Note: only os.StdErr can be used this way */
int32 restart = 4; /* Number of automatic restarts, <0> means no restart, <-1> always restart */
bool detach = 5; /* Set to true if process should be detached from parent application on startup */
bool run_on_startup = 6; /* Set to true if process is expected to start right after plugin initialization */
bool notify = 7; /* Create a notification channel (can be obtained via `GetNotification`) where
all status change events can be watched*/
bool auto_terminate = 8; /* This option ensures that every process which turns zombie/dead is automatically
released */
string cpu_affinity = 9; /* Allows to set CPU affinity to given process. Value has to be in hexadecimal
format (see taskset command) */
string cpu_affinity_delay = 10; /* Postpone CPU affinity setup if needed */
}
pOptions p_options = 3;
}
29 changes: 28 additions & 1 deletion exec/supervisor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package supervisor

import (
"fmt"
"time"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -43,8 +44,19 @@ func (e EventType) String() string {
// and hooks are special commands which are executed when certain event related to
// one of the processes occurs
type Config struct {
// Bond supervisor process to given set of CPUs. Plugin uses taskset to assign process
// to CPUs and uses the same hexadecimal format. Invalid value prints error but does
// not terminate the process.
// It is recommended to use this option only for testing, operating system CPU schedulers
// are in general more superior in managing CPU cycles.
SvCPUAffinityMask string `json:"sv-cpu-affinity-mask"`

// A list of programs started by the supervisor.
Programs []Program
Hooks []Hook

// A list of hooks managed by supervisor plugin. Hooks are additional commands or scripts
// called after some specific process events.
Hooks []Hook
}

// Program is a single program representation
Expand All @@ -66,6 +78,21 @@ type Program struct {
// when the program is restarted since the operating system sends events in order
// termination -> starting -> sleeping/idle
Restarts int `json:"restarts"`

// Bond process to given set of CPUs. Plugin uses taskset to assign process to CPUs
// and uses the same hexadecimal format. Invalid value prints error message but does
// not terminate the process.
// Note: use only when you know what you are doing, do not try to outsmart OS CPU
// scheduling. If a program has its own config file to manage CPUs, prioritize it.
// Keep in mind that incorrect use may slow down certain applications or that the
// application may contain its own CPU manager which overrides this value.
// Warning: Locking process to CPU does NOT keep other processes off that CPU.
CPUAffinityMask string `json:"cpu-affinity-mask"`

// This field can postpone CPU affinity setup for given time. Some processes may
// manipulate CPU scheduling during startup, this option allows to "bypass" it,
// waiting until the process is fully loaded and then lock it.
CPUAffinitySetupDelay time.Duration `json:"cpu-affinity-setup-delay"`
}

// Hook is a procedure called when a program gets into certain state.
Expand Down

0 comments on commit b526ab9

Please sign in to comment.