Skip to content

Commit

Permalink
device stats refactor, also added inheritDeviceTags option to send de…
Browse files Browse the repository at this point in the history
…vice tags to selfmon statistics backend, related to #192
  • Loading branch information
toni-moreno committed May 1, 2017
1 parent c355e19 commit 661a0f3
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 117 deletions.
4 changes: 2 additions & 2 deletions .bra.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[run]
init_cmds = [
["go", "build", "-o", "./bin/snmpcollector", "./pkg/"],
["go", "build", "-race","-o", "./bin/snmpcollector", "./pkg/"],
["./bin/snmpcollector"]
]
watch_all = true
Expand All @@ -10,6 +10,6 @@ watch_dirs = [
watch_exts = [".go", ".toml"]
build_delay = 1500
cmds = [
["go", "build", "-o", "./bin/snmpcollector", "./pkg/"],
["go", "build", "-race","-o", "./bin/snmpcollector", "./pkg/"],
["./bin/snmpcollector"]
]
2 changes: 2 additions & 0 deletions conf/sample.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
freq = 60
#prefix for measurement naming
prefix = ""
#inherit device tags
inheritdevicetags = true
#adds extra tags to the measurement config should be set as a csv - tag=value1,tag2=value2,...,tagN=valN
extratags = [ "instance=snmpcollector01" ]

Expand Down
50 changes: 15 additions & 35 deletions pkg/agent/device/measgather.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
)

func (d *SnmpDevice) measConcurrentGatherAndSend() {
var totalGets int64
var totalErrors int64

startSnmpStats := time.Now()
var wg sync.WaitGroup
for _, m := range d.Measurements {
Expand All @@ -20,42 +17,34 @@ func (d *SnmpDevice) measConcurrentGatherAndSend() {
d.Debugf("-------Processing measurement : %s", m.ID)

nGets, nErrors, _ := m.GetData()
totalGets += nGets
totalErrors += nErrors

m.ComputeEvaluatedMetrics()
m.ComputeOidConditionalMetrics()

if nGets > 0 {
d.addGets(nGets)
d.stats.AddGets(nGets)
}
if nErrors > 0 {
d.addErrors(nErrors)
d.stats.AddErrors(nErrors)
}
//prepare batchpoint
points := m.GetInfluxPoint(d.TagMap)
startInfluxStats := time.Now()
if bpts != nil {
(*bpts).AddPoints(points)
//send data
d.Influx.Send(bpts)
} else {
d.Warnf("Can not send data to the output DB becaouse of batchpoint creation error")
}
elapsedInfluxStats := time.Since(startInfluxStats)
d.stats.AddSentDuration(elapsedInfluxStats)

}(m)
}
wg.Wait()
elapsedSnmpStats := time.Since(startSnmpStats)
d.Infof("snmp pooling took [%s] SNMP: Gets [%d] Errors [%d]", elapsedSnmpStats, totalGets, totalErrors)
d.setGatherStats(startSnmpStats, elapsedSnmpStats)
if d.selfmon != nil {
fields := map[string]interface{}{
"process_t": elapsedSnmpStats.Seconds(),
"getsent": totalGets,
"geterror": totalErrors,
}
d.selfmon.AddDeviceMetrics(d.cfg.ID, fields)
}

d.stats.SetGatherDuration(startSnmpStats, elapsedSnmpStats)
}

func (d *SnmpDevice) measSeqGatherAndSend() {
Expand All @@ -74,30 +63,21 @@ func (d *SnmpDevice) measSeqGatherAndSend() {
m.ComputeEvaluatedMetrics()
m.ComputeOidConditionalMetrics()

if nGets > 0 {
d.addGets(nGets)
}
if nErrors > 0 {
d.addErrors(nErrors)
}
//prepare batchpoint
points := m.GetInfluxPoint(d.TagMap)
if bpts != nil {
(*bpts).AddPoints(points)
}
}

elapsedSnmpStats := time.Since(startSnmpStats)
d.Infof("snmp pooling took [%s] SNMP: Gets [%d] Errors [%d]", elapsedSnmpStats, totalGets, totalErrors)
d.setGatherStats(startSnmpStats, elapsedSnmpStats)
if d.selfmon != nil {
fields := map[string]interface{}{
"process_t": elapsedSnmpStats.Seconds(),
"getsent": totalGets,
"geterror": totalErrors,
}
d.selfmon.AddDeviceMetrics(d.cfg.ID, fields)
if totalGets > 0 {
d.stats.AddGets(totalGets)
}
if totalErrors > 0 {
d.stats.AddErrors(totalErrors)
}
elapsedSnmpStats := time.Since(startSnmpStats)
d.stats.SetGatherDuration(startSnmpStats, elapsedSnmpStats)
/*************************
*
* Send data to InfluxDB process
Expand All @@ -111,6 +91,6 @@ func (d *SnmpDevice) measSeqGatherAndSend() {
d.Warnf("Can not send data to the output DB becaouse of batchpoint creation error")
}
elapsedInfluxStats := time.Since(startInfluxStats)
d.Infof("influx send took [%s]", elapsedInfluxStats)
d.stats.AddSentDuration(elapsedInfluxStats)

}
65 changes: 34 additions & 31 deletions pkg/agent/device/snmpdevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,13 @@ type SnmpDevice struct {
//snmpClient *gosnmp.GoSNMP
snmpClientMap map[string]*gosnmp.GoSNMP
Influx *output.InfluxDB `json:"-"`
LastError time.Time
//LastError time.Time
//Runtime stats
Requests int64
Gets int64
Errors int64
LastGatherTime time.Time
LastGatherDuration time.Duration
LastFltUpdateTime time.Time
LastFltUpdateDuration time.Duration
//runtime controls
stats DevStat //Runtime Internal statistic
Stats *DevStat //Public info for thread safe accessing to the data ()

//runtime controls
mutex sync.Mutex
ReloadLoopsPending int

DeviceActive bool
Expand All @@ -72,8 +68,7 @@ type SnmpDevice struct {
chLogLevel chan string
chExit chan bool
chFltUpdate chan bool
mutex sync.Mutex
selfmon *selfmon.SelfMon

CurLogLevel string
Gather func() `json:"-"`
}
Expand All @@ -90,18 +85,27 @@ func (d *SnmpDevice) GetLogFilePath() string {
return d.cfg.LogFile
}

func (d *SnmpDevice) setGatherStats(start time.Time, duration time.Duration) {
func (d *SnmpDevice) GetSelfThreadSafe() *SnmpDevice {
d.mutex.Lock()
defer d.mutex.Unlock()
d.LastGatherTime = start
d.LastGatherDuration = duration
d.Stats = d.GetBasicStats()
return d
}

func (d *SnmpDevice) setFltUpdateStats(start time.Time, duration time.Duration) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.LastFltUpdateTime = start
d.LastFltUpdateDuration = duration
// GetBasicStats get basic info for this device
func (d *SnmpDevice) GetBasicStats() *DevStat {

sum := 0
for _, m := range d.Measurements {
sum += len(m.OidSnmpMap)
}
stat := d.stats.ThSafeCopy()
stat.ReloadLoopsPending = d.ReloadLoopsPending
stat.DeviceActive = d.DeviceActive
stat.DeviceConnected = d.DeviceConnected
stat.NumMeasurements = len(d.Measurements)
stat.NumMetrics = sum
return stat
}

//ReloadLoopPending needs to be mutex excluded
Expand All @@ -120,16 +124,10 @@ func (d *SnmpDevice) getReloadLoopsPending() int {

func (d *SnmpDevice) decReloadLoopsPending() {
d.mutex.Lock()
defer d.mutex.Unlock()
if d.ReloadLoopsPending > 0 {
d.ReloadLoopsPending--
}
d.mutex.Unlock()
}

func (d *SnmpDevice) GetSelfThreadSafe() *SnmpDevice {
d.mutex.Lock()
defer d.mutex.Unlock()
return d
}

// GetOutSenderFromMap to get info about the sender will use
Expand Down Expand Up @@ -358,6 +356,8 @@ func (d *SnmpDevice) Init(c *config.SnmpDeviceCfg) error {
} else {
d.Warnf("No map detected in device")
}
// Init stats
d.stats.Init(d.cfg.ID, d.TagMap, d.log)

if d.cfg.ConcurrentGather == true {
d.Gather = d.measConcurrentGatherAndSend
Expand All @@ -382,7 +382,7 @@ func (d *SnmpDevice) End() {

// SetSelfMonitoring set the ouput device where send monitoring metrics
func (d *SnmpDevice) SetSelfMonitoring(cfg *selfmon.SelfMon) {
d.selfmon = cfg
d.stats.SetSelfMonitoring(cfg)
}

// InitSnmpConnect does the SNMP client conection and retrieve system info
Expand Down Expand Up @@ -417,7 +417,7 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) {
startSnmp := time.Now()
d.InitDevMeasurements()
elapsedSnmp := time.Since(startSnmp)
d.setFltUpdateStats(startSnmp, elapsedSnmp)
d.stats.SetFltUpdateStats(startSnmp, elapsedSnmp)
d.Infof("snmp INIT runtime measurements/filters took [%s] ", elapsedSnmp)

} else {
Expand All @@ -438,7 +438,7 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) {
startSnmp := time.Now()
d.InitDevMeasurements()
elapsedSnmp := time.Since(startSnmp)
d.setFltUpdateStats(startSnmp, elapsedSnmp)
d.stats.SetFltUpdateStats(startSnmp, elapsedSnmp)
d.Infof("snmp INIT runtime measurements/filters took [%s] ", elapsedSnmp)
// Round collection to nearest interval by sleeping
utils.WaitAlignForNextCicle(d.cfg.Freq, d.log)
Expand All @@ -455,8 +455,9 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) {
* SNMP Gather data process
*
***************************/
d.resetCounters()
d.stats.ResetCounters()
d.Gather()
d.stats.Send()
/*******************************************
*
* Reload Indexes/Filters process(if needed)
Expand All @@ -481,9 +482,10 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) {
m.InitBuildRuntime()
}
}

d.setReloadLoopsPending(d.cfg.UpdateFltFreq)
elapsedIdxUpdateStats := time.Since(startIdxUpdateStats)
d.setFltUpdateStats(startIdxUpdateStats, elapsedIdxUpdateStats)
d.stats.SetFltUpdateStats(startIdxUpdateStats, elapsedIdxUpdateStats)
d.Infof("Index reload took [%s]", elapsedIdxUpdateStats)
}
}
Expand All @@ -499,6 +501,7 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) {
d.Infof("invoked EXIT from SNMP Gather process ")
return
case <-d.chFltUpdate:

d.setReloadLoopsPending(1)
case debug := <-d.chDebug:
d.StateDebug = debug
Expand Down
Loading

0 comments on commit 661a0f3

Please sign in to comment.