-
Notifications
You must be signed in to change notification settings - Fork 232
add fixed size buffer to handle fd in log tailer with feature flag #1577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
754d84a
to
0dc46cd
Compare
e3a0758
to
15d84b5
Compare
@@ -210,6 +212,15 @@ func (tail *Tail) reopen() error { | |||
break | |||
} | |||
OpenFileCount.Add(1) | |||
|
|||
tail.openReader() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like logic that was similar to tailFileSync
. Do we need to restart the watcher or any of the other parts of that function?
plugins/inputs/logfile/tailersrc.go
Outdated
@@ -310,6 +326,7 @@ func (ts *tailerSrc) cleanUp() { | |||
|
|||
if ts.outputFn != nil { | |||
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest | |||
ts.outputFn = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to prevent any unseen race condition due to added go routines trying to call outputFn
. mutex has been added around this to double protect as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there has to be a better way to do this without setting the outputFn to nil. The race condition is in the outputFn itself right? Wouldn't it be better to fix it there?
plugins/inputs/logfile/tailersrc.go
Outdated
log.Printf("D! [logfile] runSender buffer was closed for %s", ts.tailer.Filename) | ||
return | ||
} | ||
if e != nil && ts.outputFn != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, with clean up path.
plugins/inputs/logfile/logfile.go
Outdated
} | ||
|
||
func NewLogFile() *LogFile { | ||
backpressureDrop := envconfig.IsBackpressureDropEnabled() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, this could be set in the log configuration. The translator could be the one that reads the environment variable or JSON config and sets the field. Would allow for more granular control this way since you could opt-in/opt-out per log file.
for k, v := range tt.envVars { | ||
os.Setenv(k, v) | ||
} | ||
defer func() { | ||
for k := range tt.envVars { | ||
os.Unsetenv(k) | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: t.Setenv
will do the cleanup for us.
for k, v := range tt.envVars { | |
os.Setenv(k, v) | |
} | |
defer func() { | |
for k := range tt.envVars { | |
os.Unsetenv(k) | |
} | |
}() | |
for k, v := range tt.envVars { | |
t.Setenv(k, v) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't look like so. dropping Unsetenv
will fail subsequent tests after setting the env var.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might also have leak in our tests by not properly closing files.
4fb59d8
to
3f2a3f3
Compare
plugins/inputs/logfile/tailersrc.go
Outdated
@@ -310,6 +326,7 @@ func (ts *tailerSrc) cleanUp() { | |||
|
|||
if ts.outputFn != nil { | |||
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest | |||
ts.outputFn = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there has to be a better way to do this without setting the outputFn to nil. The race condition is in the outputFn itself right? Wouldn't it be better to fix it there?
d97315f
to
e144b1e
Compare
plugins/inputs/logfile/tailersrc.go
Outdated
select { | ||
case ts.buffer <- e: | ||
// sent event after buffer gets freed up | ||
if err := ts.tailer.Reopen(false); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the file isn't closed before the buffer has capacity? It looks like it'll close and then reopen the file, which defeats the purpose of the timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if a file is closed and deleted before it tries to reopen the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added time out to handle deleted file without getting blocked indifinitely
plugins/inputs/logfile/tailersrc.go
Outdated
select { | ||
case ts.buffer <- e: | ||
// sent event after buffer gets freed up | ||
if err := ts.tailer.Reopen(false); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if a file is closed and deleted before it tries to reopen the file?
@@ -163,7 +179,12 @@ func (ts *tailerSrc) Done(offset fileOffset) { | |||
} | |||
|
|||
func (ts *tailerSrc) Stop() { | |||
close(ts.done) | |||
ts.stopOnce.Do(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? did you find that in your testing we had multiple Stop()
invocations?
37f9101
to
9ed151a
Compare
internal/logscommon/const.go
Outdated
type BackpressureMode string | ||
|
||
const ( | ||
LogBackpressureModeFDDrop BackpressureMode = "fd_release" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Rename to match the value.
LogBackpressureModeFDDrop BackpressureMode = "fd_release" | |
BackpressureModeFDRelease BackpressureMode = "fd_release" |
src.SetOutput(func(e LogEvent) { | ||
if closed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is this needed? Are we continuing to call the function after it's closed?
plugins/inputs/logfile/tailersrc.go
Outdated
@@ -96,27 +101,33 @@ func NewTailerSrc( | |||
maxEventSize int, | |||
truncateSuffix string, | |||
retentionInDays int, | |||
backpressureMode string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should be the enum type. There's no point in having an enum if it's just converted to a string everywhere.
plugins/inputs/logfile/logfile.go
Outdated
@@ -287,8 +289,8 @@ func (t *LogFile) getTargetFiles(fileconfig *FileConfig) ([]string, error) { | |||
var targetFileName string | |||
var targetModTime time.Time | |||
for matchedFileName, matchedFileInfo := range g.Match() { | |||
t.Log.Debugf("Processing matched file: %s, ModTime: %v", matchedFileName, matchedFileInfo.ModTime()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove these debug logs.
plugins/inputs/logfile/tailersrc.go
Outdated
stateFileMode = 0644 | ||
bufferLimit = 50 | ||
stateFileMode = 0644 | ||
tailCloseInterval = 3 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It's less of an interval and more of a threshold.
bb19fd4
to
35154e1
Compare
// sent event after buffer gets freed up | ||
if ts.tailer.IsFileClosed() { // skip file closing if not already closed | ||
if err := ts.tailer.Reopen(false); err != nil { | ||
log.Printf("E! [logfile] error reopening file %s: %v", ts.tailer.Filename, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We might not want to log this if it just wasn't able to find the file anymore. Might be misleading.
plugins/inputs/logfile/logfile.go
Outdated
fileBaseName := filepath.Base(matchedFileName) | ||
if blacklistP != nil && blacklistP.MatchString(fileBaseName) { | ||
continue | ||
} | ||
if !fileconfig.PublishMultiLogs { | ||
t.Log.Debugf("Single file mode - current target: %s (ModTime: %v), candidate: %s (ModTime: %v)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this can become excessive logging as well depending on the number of files that match.
Revisions
Rev2
backpressure_mode
value tofd_release
Reopen
Rev1
backpressure_drop
feature flag to individual file level config for granular controlDescription of the issue
The current CWA strategy for log collection prioritizes log integrity without consideration for disk space. CWA holds onto open file descriptors until all the log lines have been successfully pushed to CloudWatch Logs. If another process deletes the file, the file descriptor held by the agent will be unaffected. If CWA is unable to publish the logs, it will retry for 14 days. For some customers, this is the expectation and preferred behavior particularly for sensitive files such as security logs.
On Unix, when a file is removed by a process, the file name is unlinked from the underlying file instead of deleted. If there’s an open reference to the file via file descriptor, the file will not be deleted.
Description of changes
backpressure_drop
auto_removal
which reads all contents from a rotate log file before deleting them.env-config.json
by the translator so it can be added as env var of systemd process of the agent at start up timeNOTE
The
auto_removal
setting takes precedence, disabling any tailer-sender modifications for log entries when enabled. Withauto_removal
, all existing files are expected to be fully processed before they are deleted. However, thebackpressure_drop
flag contradicts this behavior by stopping the processing of previous files upon rotation. To ensure backward compatibility for customers who depend on theauto_removal
functionality, we must retain the current behavior.License
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Tests
Block logs endpoint by pointing API URI (eg
logs.us-west-2.amazonaws.com
) to localhost. Then use a bash script to generate 10 lines/sec to/var/log/test/application.log
, rotating the log every 3 minutes with a ~30-minute bake period.. Keep a maximum of 3 rotated filesRequirements
Before commit the code, please do the following steps.
make fmt
andmake fmt-sh
make lint