Skip to content

Commit

Permalink
add locking around tail state
Browse files Browse the repository at this point in the history
  • Loading branch information
dayvar14 committed Jan 23, 2024
1 parent 1b37419 commit 6ceb5e2
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 20 deletions.
3 changes: 2 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (a *Agent) runPersistLoop(

a.persistLoop(ctx, ticker)

log.Printf("D! [agent] Stopping the persist loop)")
log.Printf("D! [agent] Stopping the persist loop")
}

func (a *Agent) persistLoop(
Expand All @@ -363,6 +363,7 @@ func (a *Agent) persistLoop(
for {
select {
case <-ticker.Elapsed():
log.Printf("D! [agent] Persisting state")
err := a.Config.Persister.Store()
if err != nil {
log.Printf("E! [agent] Error writing state on persist loop: %v", err)
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
## The interval of time at which the state is peristed into the statefile.
## When set to 0s, persist interval is disabled.
## Example: statefile_persist_interval = "10m"
## Note: This state may not be consistent across all plugins.
# statefile_persist_interval = "0s"

## Rounds persist interval to 'statefile_persist_interval'
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ type AgentConfig struct {
Statefile string `toml:"statefile"`

// PersistInterval The interval in which the state will be stored into the
// statefile. When set to 0, persist interval is disabled.
// statefile. When set to 0, persist interval is disabled. Note: This state may not
// be consistent across all plugins.
PersistInterval Duration `toml:"statefile_persist_interval"`

// RoundPersistInterval rounds persist interval to 'PersistInterval'.
Expand Down
2 changes: 1 addition & 1 deletion docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ The agent table configures Telegraf and the defaults used across all plugins.

- **statefile_persist_interval**:
The interval of time at which the state is peristed into the statefile. When
set to 0, persist interval is disabled.
set to 0, persist interval is disabled. Note: This state may not be consistent across all plugins.

- **round_statefile_persist_interval**:
Rounds persist interval to 'statefile_persist_interval'
Expand Down
59 changes: 42 additions & 17 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ type Tail struct {
Filters []string `toml:"filters"`
filterColors bool

Log telegraf.Logger `toml:"-"`
tailers map[string]*tail.Tail
offsets map[string]int64
parserFunc telegraf.ParserFunc
wg sync.WaitGroup
Log telegraf.Logger `toml:"-"`
tailers map[string]*tail.Tail
offsets map[string]int64
offsetsMutex sync.Mutex
parserFunc telegraf.ParserFunc
wg sync.WaitGroup

acc telegraf.TrackingAccumulator

Expand Down Expand Up @@ -108,23 +109,43 @@ func (t *Tail) Init() error {
}

func (t *Tail) GetState() interface{} {
t.offsetsMutex.Lock()
defer t.offsetsMutex.Unlock()

if !t.Pipe && !t.FromBeginning {
for _, tailer := range t.tailers {
t.storeOffsets(tailer)
offset, err := tailer.Tell()
if err == nil {
t.Log.Debugf("Recording offset %d for %q", offset, tailer.Filename)
t.offsets[tailer.Filename] = offset
} else {
t.Log.Errorf("Recording offset for %q: %s", tailer.Filename, err.Error())
}
}
}

return t.offsets
offsetsCopy := make(map[string]int64, len(t.offsets))

for k, v := range t.offsets {
offsetsCopy[k] = v
}

return offsetsCopy
}

func (t *Tail) SetState(state interface{}) error {
offsetsState, ok := state.(map[string]int64)
if !ok {
return errors.New("state has to be of type 'map[string]int64'")
}

t.offsetsMutex.Lock()
defer t.offsetsMutex.Unlock()

for k, v := range offsetsState {
t.offsets[k] = v
}

return nil
}

Expand Down Expand Up @@ -189,6 +210,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {

var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
t.offsetsMutex.Lock()
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Expand All @@ -201,6 +223,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
Offset: 0,
}
}
t.offsetsMutex.Unlock()
}

tailer, err := tail.TailFile(file,
Expand Down Expand Up @@ -376,9 +399,18 @@ func (t *Tail) receiver(parser telegraf.Parser, tailer *tail.Tail) {
}

func (t *Tail) Stop() {
t.offsetsMutex.Lock()
defer t.offsetsMutex.Unlock()

for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
t.storeOffsets(tailer)
offset, err := tailer.Tell()
if err == nil {
t.Log.Debugf("Recording offset %d for %q", offset, tailer.Filename)
t.offsets[tailer.Filename] = offset
} else {
t.Log.Errorf("Recording offset for %q: %s", tailer.Filename, err.Error())
}
}
err := tailer.Stop()
if err != nil {
Expand All @@ -395,16 +427,9 @@ func (t *Tail) Stop() {
offsets[k] = v
}
offsetsMutex.Unlock()
}

func (t *Tail) storeOffsets(tailer *tail.Tail) {
offset, err := tailer.Tell()
if err == nil {
t.Log.Debugf("Recording offset %d for %q", offset, tailer.Filename)
t.offsets[tailer.Filename] = offset
} else {
t.Log.Errorf("Recording offset for %q: %s", tailer.Filename, err.Error())
}
// Clear out tailers or state offsets will always be zero
t.tailers = make(map[string]*tail.Tail)
}

func (t *Tail) SetParserFunc(fn telegraf.ParserFunc) {
Expand Down

0 comments on commit 6ceb5e2

Please sign in to comment.