Skip to content

Commit

Permalink
Issue #15 Recover from stat collection panics
Browse files Browse the repository at this point in the history
Some platforms (RHEL6/CentOS 6) will runtime panic collecting the
OpenFiles statistic.  Potentially any one of these sources could
generate a panic; however, there's no reason to crash the entire stack
(including the important monitored process) if we can't collect some bit
of statistics.  Each call into gopsutil is now protected to enable
collecting as much as possible each cycle.
  • Loading branch information
nickjones committed Dec 7, 2015
1 parent 2b37cf5 commit 1164f79
Showing 1 changed file with 81 additions and 18 deletions.
99 changes: 81 additions & 18 deletions agents/process_stats.go
Expand Up @@ -26,6 +26,7 @@ type ProcessStatCollector struct {
exchange string
job *JobControl
ticker <-chan time.Time
hostInfo *host.HostInfoStat
}

// ProcessStatSample contains a single sample of the underlying process system usage.
Expand Down Expand Up @@ -54,6 +55,7 @@ func NewProcessStats(amqp *amqp.Connection, routingKey string,
exchange,
job,
nil,
nil,
}

psc.channel, err = psc.connection.Channel()
Expand Down Expand Up @@ -84,6 +86,10 @@ func NewProcessStats(amqp *amqp.Connection, routingKey string,
}
}(psc)

// Collect basic host info for conditional sampling of some data
// as a workaround for certain platforms.
psc.hostInfo, err = host.HostInfo()

return psc, nil
}

Expand Down Expand Up @@ -147,6 +153,46 @@ func (ps *ProcessStatCollector) Sample() error {
}

func (p *ProcessStatSample) aggregateStatForProc(proc *process.Process) {
// Split individual stat calls into functions for panic recovery
// without losing the entire statistic.

p.collectMemInfo(proc)

p.collectCPUTimes(proc)

p.collectIOCounters(proc)

p.collectOpenFiles(proc)

// NOTE: This will end up counting separate pids as "threads"
p.collectNumThreads(proc)

p.collectCPUPercent(proc)
}

func sum(src *reflect.Value, dest *reflect.Value) {
for i := 0; i < src.NumField(); i++ {
switch src.Field(i).Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
var total uint64
total += src.Field(i).Uint()
total += dest.Field(i).Uint()
dest.Field(i).SetUint(total)
case reflect.Float32, reflect.Float64:
var total float64
total += src.Field(i).Float()
total += dest.Field(i).Float()
dest.Field(i).SetFloat(total)
}
}
}

func (p *ProcessStatSample) collectMemInfo(proc *process.Process) {
defer func() {
if e := recover(); e != nil {
log.Warnf("Recovered from panic on memory stats collection. Maybe unsupported on this platform.")
}
}()
meminfo, err := proc.MemoryInfo()
if err != nil {
log.Warnf("Error encountered collecting memory stats: %s", err)
Expand All @@ -155,7 +201,14 @@ func (p *ProcessStatSample) aggregateStatForProc(proc *process.Process) {
dest := reflect.ValueOf(&p.Memory).Elem()
sum(&src, &dest)
}
}

func (p *ProcessStatSample) collectCPUTimes(proc *process.Process) {
defer func() {
if e := recover(); e != nil {
log.Warnf("Recovered from panic on CPU times collection. Maybe unsupported on this platform.")
}
}()
cputimes, err := proc.CPUTimes()
if err != nil {
log.Warnf("Error encountered collecting CPU stats: %s", err)
Expand All @@ -164,7 +217,14 @@ func (p *ProcessStatSample) aggregateStatForProc(proc *process.Process) {
dest := reflect.ValueOf(&p.CPUTimes).Elem()
sum(&src, &dest)
}
}

func (p *ProcessStatSample) collectIOCounters(proc *process.Process) {
defer func() {
if e := recover(); e != nil {
log.Warnf("Recovered from panic on IO counters collection. Maybe unsupported on this platform.")
}
}()
iocnt, err := proc.IOCounters()
if err != nil {
log.Warnf("Error encountered collecting I/O stats: %s", err)
Expand All @@ -173,22 +233,42 @@ func (p *ProcessStatSample) aggregateStatForProc(proc *process.Process) {
dest := reflect.ValueOf(&p.IOCounters).Elem()
sum(&src, &dest)
}
}

func (p *ProcessStatSample) collectOpenFiles(proc *process.Process) {
defer func() {
if e := recover(); e != nil {
log.Warnf("Recovered from panic on Open Files collection. Maybe unsupported on this platform.")
}
}()
openFiles, err := proc.OpenFiles()
if err != nil {
log.Warnf("Error encountered collecting open files stats: %s", err)
} else {
p.OpenFiles = append(p.OpenFiles, openFiles...)
}
}

// NOTE: This will end up counting separate pids as "threads"
func (p *ProcessStatSample) collectNumThreads(proc *process.Process) {
defer func() {
if e := recover(); e != nil {
log.Warnf("Recovered from panic on Number of Threads collection. Maybe unsupported on this platform.")
}
}()
numThreads, err := proc.NumThreads()
if err != nil {
log.Warnf("Error encountered collecting thread count stats: %s", err)
} else {
p.NumThreads += numThreads
}
}

func (p *ProcessStatSample) collectCPUPercent(proc *process.Process) {
defer func() {
if e := recover(); e != nil {
log.Warnf("Recovered from panic on CPU Percent collection. Maybe unsupported on this platform.")
}
}()
// Use 0 interval to get difference since the last call
cpuPercent, err := proc.CPUPercent(0 * time.Second)
if err != nil {
Expand All @@ -197,20 +277,3 @@ func (p *ProcessStatSample) aggregateStatForProc(proc *process.Process) {
p.CPUPercent += cpuPercent
}
}

func sum(src *reflect.Value, dest *reflect.Value) {
for i := 0; i < src.NumField(); i++ {
switch src.Field(i).Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
var total uint64
total += src.Field(i).Uint()
total += dest.Field(i).Uint()
dest.Field(i).SetUint(total)
case reflect.Float32, reflect.Float64:
var total float64
total += src.Field(i).Float()
total += dest.Field(i).Float()
dest.Field(i).SetFloat(total)
}
}
}

0 comments on commit 1164f79

Please sign in to comment.