-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add fixed size buffer to handle fd in log tailer with feature flag #1577
base: main
Are you sure you want to change the base?
Conversation
754d84a
to
0dc46cd
Compare
e3a0758
to
15d84b5
Compare
plugins/inputs/logfile/tail/tail.go
Outdated
if !resetOffset && tail.curOffset > 0 { | ||
err := tail.seekTo(SeekInfo{Offset: tail.curOffset, Whence: io.SeekStart}) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return err | |
return fmt.Errorf("unable to restore offset on reopen: %w", err) |
@@ -210,6 +212,15 @@ func (tail *Tail) reopen() error { | |||
break | |||
} | |||
OpenFileCount.Add(1) | |||
|
|||
tail.openReader() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like logic that was similar to tailFileSync
. Do we need to restart the watcher or any of the other parts of that function?
plugins/inputs/logfile/tailersrc.go
Outdated
|
||
// helper to handle event publishing | ||
publishEvent := func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would rather this were an actual method.
func (ts *tailerSrc) publishEvent(msgBuf bytes.Buffer, fo *fileOffset)
plugins/inputs/logfile/tailersrc.go
Outdated
case <-ts.done: | ||
return | ||
default: | ||
log.Printf("D! [logfile] tailer sender buffer is full, closing file %v", ts.tailer.Filename) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think there needs to be a delay before closing the file. Just because the buffer is backed up doesn't mean it's completely blocked. The concern is that this will result in heavy I/O with the file getting constantly closed and reopened.
Would suggest adding a timer.
@@ -310,6 +326,7 @@ func (ts *tailerSrc) cleanUp() { | |||
|
|||
if ts.outputFn != nil { | |||
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest | |||
ts.outputFn = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
plugins/inputs/logfile/tailersrc.go
Outdated
log.Printf("D! [logfile] runSender buffer was closed for %s", ts.tailer.Filename) | ||
return | ||
} | ||
if e != nil && ts.outputFn != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever be false?
} | ||
|
||
func NewLogFile() *LogFile { | ||
backpressureDrop := envconfig.IsBackpressureDropEnabled() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, this could be set in the log configuration. The translator could be the one that reads the environment variable or JSON config and sets the field. Would allow for more granular control this way since you could opt-in/opt-out per log file.
for k, v := range tt.envVars { | ||
os.Setenv(k, v) | ||
} | ||
defer func() { | ||
for k := range tt.envVars { | ||
os.Unsetenv(k) | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: t.Setenv
will do the cleanup for us.
for k, v := range tt.envVars { | |
os.Setenv(k, v) | |
} | |
defer func() { | |
for k := range tt.envVars { | |
os.Unsetenv(k) | |
} | |
}() | |
for k, v := range tt.envVars { | |
t.Setenv(k, v) | |
} |
c39defc
to
7313d25
Compare
Description of the issue
The current CWA strategy for log collection prioritizes log integrity without consideration for disk space. CWA holds onto open file descriptors until all the log lines have been successfully pushed to CloudWatch Logs. If another process deletes the file, the file descriptor held by the agent will be unaffected. If CWA is unable to publish the logs, it will retry for 14 days. For some customers, this is the expectation and preferred behavior particularly for sensitive files such as security logs.
On Unix, when a file is removed by a process, the file name is unlinked from the underlying file instead of deleted. If there’s an open reference to the file via file descriptor, the file will not be deleted.
Description of changes
backpressure_drop
auto_removal
which reads all contents from a rotate log file before deleting them.env-config.json
by the translator so it can be added as env var of systemd process of the agent at start up timeNOTE
The
auto_removal
setting takes precedence, disabling any tailer-sender modifications for log entries when enabled. Withauto_removal
, all existing files are expected to be fully processed before they are deleted. However, thebackpressure_drop
flag contradicts this behavior by stopping the processing of previous files upon rotation. To ensure backward compatibility for customers who depend on theauto_removal
functionality, we must retain the current behavior.License
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Tests
Block logs endpoint by pointing API URI (eg
logs.us-west-2.amazonaws.com
) to localhost. Then use a bash script to generate 10 lines/sec to/var/log/test/application.log
, rotating the log every 3 minutes with a ~30-minute bake period.. Keep a maximum of 3 rotated filesRequirements
Before commit the code, please do the following steps.
make fmt
andmake fmt-sh
make lint