Skip to content

Commit

Permalink
remove resetStats from GatherOnce
Browse files Browse the repository at this point in the history
When the measruement tries to gather attached metrics is using the
GatherOnce function which was resetting measurement stats

This behaviour was causing that filter or other stats could be removed
based on the order of execution.

This PR removes the ResetStats call to delegate into the upper function
in order to control and manage stats behaviour, fixes #514
  • Loading branch information
sbengo committed Feb 15, 2022
1 parent 521c1c8 commit 1f356e9
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions pkg/data/measurement/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,7 @@ func (m *Measurement) GatherLoop(
// If connection and initialization are correct, mark the measurement as initiliazed and gather data for the first time
// Set influxClient as nil, the first time it shouldn't send metrics
m.initialized = true
m.stats.ResetCounters()
err := m.GatherOnce(gatherLock, varMap, tagMap, nil)
if err != nil {
// if error is because of no response from any metric
Expand All @@ -985,12 +986,6 @@ func (m *Measurement) GatherLoop(
}
utils.WaitAlignForNextCycle(gatherFreq, m.Log)

// Gather ticker initialization and stats
gatherTicker := time.NewTicker(time.Duration(gatherFreq) * time.Second)
defer gatherTicker.Stop()
m.stats.GatherFreq = gatherFreq
m.stats.SetGatherNextTime(time.Now().Add(time.Duration(gatherFreq) * time.Second).Unix())

// Filter ticker initialization and stats
// Measurement Filter Freq overrides Device Filter Freq (creating ticker)
filterFreq := gatherFreq * deviceUpdateFilterFreq
Expand All @@ -1011,6 +1006,12 @@ func (m *Measurement) GatherLoop(
defer updateFilterTicker.Stop()
}

// Gather ticker initialization and stats
gatherTicker := time.NewTicker(time.Duration(gatherFreq) * time.Second)
defer gatherTicker.Stop()
m.stats.GatherFreq = gatherFreq
m.stats.SetGatherNextTime(time.Now().Add(time.Duration(gatherFreq) * time.Second).Unix())

for {
m.Log.Info("MeasurementLoop new Iteration")
// In each iteration, if measurement is enabled and not connected, try again to connect
Expand All @@ -1035,6 +1036,7 @@ func (m *Measurement) GatherLoop(
// If connection and initialization are correct, mark the measurement as initialized and gather data for the first time
m.initialized = true
// Set influxClient as nil, the first time it shouldn't send metrics
m.stats.ResetCounters()
err := m.GatherOnce(gatherLock, varMap, tagMap, nil)
if err != nil {
// if error is because of no response from any metric
Expand All @@ -1047,12 +1049,18 @@ func (m *Measurement) GatherLoop(
}
m.stats.SetStatus(m.Active, m.Connected)
select {
case <-updateFilterTicker.C:
// compute next filter time ( needed to show in the UI )
m.stats.SetFilterNextTime(time.Now().Add(time.Duration(filterFreq) * time.Second).Unix())
// filter
m.filterUpdate()
case <-gatherTicker.C:
// we reset counter to start a clear stats counter iteration
// and ensure that he filters is always 0
m.stats.ResetCounters()
// check if we need to update the filters based on the ticker
select {
case <-updateFilterTicker.C:
m.stats.SetFilterNextTime(time.Now().Add(time.Duration(filterFreq) * time.Second).Unix())
m.filterUpdate()
default:
m.Log.Debugf("No filter ticker received")
}
// compute next gather time ( needed to show in the UI )
m.stats.SetGatherNextTime(time.Now().Add(time.Duration(gatherFreq) * time.Second).Unix())
// Gather
Expand All @@ -1077,6 +1085,7 @@ func (m *Measurement) GatherLoop(
case bus.FilterUpdate:
m.filterUpdate()
case bus.ForceGather:
m.stats.ResetCounters()
m.GatherOnce(gatherLock, varMap, tagMap, influxClient)
case bus.Enabled:
active, ok := val.Data.(bool)
Expand Down Expand Up @@ -1215,7 +1224,6 @@ func (m *Measurement) GatherOnce(
// Do not gather data if measurement is disabled or it doesn't have a connection or measurement is not initialized
if !m.Active || !m.Connected || !m.initialized {
m.Log.Infof("Skip measurement Gathering process Active[%t],Connected[%t],Initialized[%t]", m.Active, m.Connected, m.initialized)
m.stats.ResetCounters()
m.statsData.Lock()
m.Stats = m.getBasicStats()
m.statsData.Unlock()
Expand All @@ -1231,7 +1239,6 @@ func (m *Measurement) GatherOnce(
// Mark previous values as old so we can know if new metrics
// have been gathered
m.InvalidateMetrics()
m.stats.ResetCounters()

m.Log.Debugf("-------Processing measurement : %s", m.ID)

Expand Down

0 comments on commit 1f356e9

Please sign in to comment.