Navigation Menu

Skip to content

Commit

Permalink
Fix #12 Aggregate parent with child stats.
Browse files Browse the repository at this point in the history
NumThreads will now count the sum of child processes and threads
per-process.
Also added a slice of the child pids to stats collection.
  • Loading branch information
nickjones committed Nov 28, 2015
1 parent ae19962 commit db78873
Showing 1 changed file with 72 additions and 28 deletions.
100 changes: 72 additions & 28 deletions agents/process_stats.go
Expand Up @@ -3,6 +3,7 @@ package agents
import (
"encoding/json"
"fmt"
"reflect"
"time"

log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -37,6 +38,7 @@ type ProcessStatSample struct {
OpenFiles []process.OpenFilesStat // See gopsutil for description
NumThreads int32 // Number of threads in use by the child processes (if supported)
Pid int32 // Process ID for the group
ChildPids []int32
}

// NewProcessStats establishes a new AMQP channel and configures sampling period
Expand Down Expand Up @@ -104,35 +106,16 @@ func (ps *ProcessStatCollector) Sample() error {
hostnameRoutingKey = fmt.Sprintf(".%s", stat.Host.Hostname)
}

meminfo, err := proc.MemoryInfo()
if err != nil {
log.Warnf("Error encountered collecting memory stats: %s", err)
} else {
stat.Memory = *meminfo
}
// Collect for parent
stat.aggregateStatForProc(proc)

cputimes, err := proc.CPUTimes()
if err != nil {
log.Warnf("Error encountered collecting CPU stats: %s", err)
} else {
stat.CPUTimes = *cputimes
}

iocnt, err := proc.IOCounters()
if err != nil {
log.Warnf("Error encountered collecting I/O stats: %s", err)
} else {
stat.IOCounters = *iocnt
}

stat.OpenFiles, err = proc.OpenFiles()
if err != nil {
log.Warnf("Error encountered collecting open files stats: %s", err)
}

stat.NumThreads, err = proc.NumThreads()
if err != nil {
log.Warnf("Error encountered collecting thread count stats: %s", err)
children, err := proc.Children()
// Collect on children (if any)
if err == nil {
for _, cproc := range children {
stat.ChildPids = append(stat.ChildPids, cproc.Pid)
stat.aggregateStatForProc(cproc)
}
}

log.Debugf("Sample: %#v\n", stat)
Expand Down Expand Up @@ -161,3 +144,64 @@ func (ps *ProcessStatCollector) Sample() error {

return err
}

func (p *ProcessStatSample) aggregateStatForProc(proc *process.Process) {
meminfo, err := proc.MemoryInfo()
if err != nil {
log.Warnf("Error encountered collecting memory stats: %s", err)
} else {
src := reflect.ValueOf(meminfo).Elem()
dest := reflect.ValueOf(&p.Memory).Elem()
sum(&src, &dest)
}

cputimes, err := proc.CPUTimes()
if err != nil {
log.Warnf("Error encountered collecting CPU stats: %s", err)
} else {
src := reflect.ValueOf(cputimes).Elem()
dest := reflect.ValueOf(&p.CPUTimes).Elem()
sum(&src, &dest)
}

iocnt, err := proc.IOCounters()
if err != nil {
log.Warnf("Error encountered collecting I/O stats: %s", err)
} else {
src := reflect.ValueOf(iocnt).Elem()
dest := reflect.ValueOf(&p.IOCounters).Elem()
sum(&src, &dest)
}

openFiles, err := proc.OpenFiles()
if err != nil {
log.Warnf("Error encountered collecting open files stats: %s", err)
} else {
p.OpenFiles = append(p.OpenFiles, openFiles...)
}

// NOTE: This will end up counting separate pids as "threads"
numThreads, err := proc.NumThreads()
if err != nil {
log.Warnf("Error encountered collecting thread count stats: %s", err)
} else {
p.NumThreads += numThreads
}
}

func sum(src *reflect.Value, dest *reflect.Value) {
for i := 0; i < src.NumField(); i++ {
switch src.Field(i).Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
var total uint64
total += src.Field(i).Uint()
total += dest.Field(i).Uint()
dest.Field(i).SetUint(total)
case reflect.Float32, reflect.Float64:
var total float64
total += src.Field(i).Float()
total += dest.Field(i).Float()
dest.Field(i).SetFloat(total)
}
}
}

0 comments on commit db78873

Please sign in to comment.