Skip to content

add fixed size buffer to handle fd in log tailer with feature flag #1577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Mar 20, 2025

Conversation

movence
Copy link
Contributor

@movence movence commented Mar 3, 2025

Revisions

Rev2

  • rename only supported backpressure_mode value to fd_release
  • add time out (5m) to blocked wait for a new file in Reopen
  • remove unnecessary logging

Rev1

  • move backpressure_drop feature flag to individual file level config for granular control
  • rebase

Description of the issue

The current CWA strategy for log collection prioritizes log integrity without consideration for disk space. CWA holds onto open file descriptors until all the log lines have been successfully pushed to CloudWatch Logs. If another process deletes the file, the file descriptor held by the agent will be unaffected. If CWA is unable to publish the logs, it will retry for 14 days. For some customers, this is the expectation and preferred behavior particularly for sensitive files such as security logs.

On Unix, when a file is removed by a process, the file name is unlinked from the underlying file instead of deleted. If there’s an open reference to the file via file descriptor, the file will not be deleted.

Description of changes

  • Add buffer-based (size of 1) log handling in tailer for file rotation
  • Add new runSender goroutine. tailer now sends single event at a time to channel that the sender reads from
  • Enable this new path behind feature flag backpressure_drop
  • Add feature toggle flag that is read either from agent json config or env var with the value from agent config takes precedence. This is to keep the original behavior with auto_removal which reads all contents from a rotate log file before deleting them.
  • Feature flag value is then added to env-config.json by the translator so it can be added as env var of systemd process of the agent at start up time

NOTE
The auto_removal setting takes precedence, disabling any tailer-sender modifications for log entries when enabled. With auto_removal, all existing files are expected to be fully processed before they are deleted. However, the backpressure_drop flag contradicts this behavior by stopping the processing of previous files upon rotation. To ensure backward compatibility for customers who depend on the auto_removal functionality, we must retain the current behavior.

License

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Tests

Block logs endpoint by pointing API URI (eg logs.us-west-2.amazonaws.com) to localhost. Then use a bash script to generate 10 lines/sec to /var/log/test/application.log, rotating the log every 3 minutes with a ~30-minute bake period.. Keep a maximum of 3 rotated files

#### Agent version 1.300052.0
> pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent"
21390
> ls -l /proc/$(pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent")/fd | grep application | wc -l
12
> ls -l /proc/$(pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent")/fd | grep application | head -5
lr-x------ 1 root root 64 Mar  6 03:12 10 -> /var/log/test/application.log.20250306_031458 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 11 -> /var/log/test/application.log.20250306_031530 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 12 -> /var/log/test/application.log.20250306_031602 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 13 -> /var/log/test/application.log.20250306_031634 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 14 -> /var/log/test/application.log.20250306_031706 (deleted)


#### With `backpressure_drop` enabled
> pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent"
27016
> ls -l /proc/$(pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent")/fd | grep application | wc -l
0


#### agent config used
{
  "agent": {
    "debug": true,
    "run_as_user": "root"
  },
  "logs": {
    "backpressure_drop": true,
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "file_path": "/var/log/test/application.log*",
            "log_group_name": "application-logs",
            "log_stream_name": "{instance_id}-application",
            #"auto_removal": true
          }
        ]
      }
    }
  }
}

Requirements

Before commit the code, please do the following steps.

  1. Run make fmt and make fmt-sh
  2. Run make lint

@movence movence force-pushed the fix-logs-fd-leak branch 2 times, most recently from 754d84a to 0dc46cd Compare March 4, 2025 21:37
@movence movence marked this pull request as ready for review March 5, 2025 15:17
@movence movence requested a review from a team as a code owner March 5, 2025 15:17
@movence movence force-pushed the fix-logs-fd-leak branch from e3a0758 to 15d84b5 Compare March 6, 2025 03:33
@@ -210,6 +212,15 @@ func (tail *Tail) reopen() error {
break
}
OpenFileCount.Add(1)

tail.openReader()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like logic that was similar to tailFileSync. Do we need to restart the watcher or any of the other parts of that function?

@@ -310,6 +326,7 @@ func (ts *tailerSrc) cleanUp() {

if ts.outputFn != nil {
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest
ts.outputFn = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to prevent any unseen race condition due to added go routines trying to call outputFn. mutex has been added around this to double protect as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there has to be a better way to do this without setting the outputFn to nil. The race condition is in the outputFn itself right? Wouldn't it be better to fix it there?

log.Printf("D! [logfile] runSender buffer was closed for %s", ts.tailer.Filename)
return
}
if e != nil && ts.outputFn != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, with clean up path.

}

func NewLogFile() *LogFile {
backpressureDrop := envconfig.IsBackpressureDropEnabled()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, this could be set in the log configuration. The translator could be the one that reads the environment variable or JSON config and sets the field. Would allow for more granular control this way since you could opt-in/opt-out per log file.

Comment on lines +263 to +208
for k, v := range tt.envVars {
os.Setenv(k, v)
}
defer func() {
for k := range tt.envVars {
os.Unsetenv(k)
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: t.Setenv will do the cleanup for us.

Suggested change
for k, v := range tt.envVars {
os.Setenv(k, v)
}
defer func() {
for k := range tt.envVars {
os.Unsetenv(k)
}
}()
for k, v := range tt.envVars {
t.Setenv(k, v)
}

Copy link
Contributor Author

@movence movence Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't look like so. dropping Unsetenv will fail subsequent tests after setting the env var.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might also have leak in our tests by not properly closing files.

@movence movence force-pushed the fix-logs-fd-leak branch 2 times, most recently from 4fb59d8 to 3f2a3f3 Compare March 14, 2025 13:39
@@ -310,6 +326,7 @@ func (ts *tailerSrc) cleanUp() {

if ts.outputFn != nil {
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest
ts.outputFn = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there has to be a better way to do this without setting the outputFn to nil. The race condition is in the outputFn itself right? Wouldn't it be better to fix it there?

select {
case ts.buffer <- e:
// sent event after buffer gets freed up
if err := ts.tailer.Reopen(false); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the file isn't closed before the buffer has capacity? It looks like it'll close and then reopen the file, which defeats the purpose of the timer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a file is closed and deleted before it tries to reopen the file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added time out to handle deleted file without getting blocked indifinitely

select {
case ts.buffer <- e:
// sent event after buffer gets freed up
if err := ts.tailer.Reopen(false); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a file is closed and deleted before it tries to reopen the file?

@@ -163,7 +179,12 @@ func (ts *tailerSrc) Done(offset fileOffset) {
}

func (ts *tailerSrc) Stop() {
close(ts.done)
ts.stopOnce.Do(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? did you find that in your testing we had multiple Stop() invocations?

type BackpressureMode string

const (
LogBackpressureModeFDDrop BackpressureMode = "fd_release"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Rename to match the value.

Suggested change
LogBackpressureModeFDDrop BackpressureMode = "fd_release"
BackpressureModeFDRelease BackpressureMode = "fd_release"

src.SetOutput(func(e LogEvent) {
if closed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this needed? Are we continuing to call the function after it's closed?

@@ -96,27 +101,33 @@ func NewTailerSrc(
maxEventSize int,
truncateSuffix string,
retentionInDays int,
backpressureMode string,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should be the enum type. There's no point in having an enum if it's just converted to a string everywhere.

@@ -287,8 +289,8 @@ func (t *LogFile) getTargetFiles(fileconfig *FileConfig) ([]string, error) {
var targetFileName string
var targetModTime time.Time
for matchedFileName, matchedFileInfo := range g.Match() {
t.Log.Debugf("Processing matched file: %s, ModTime: %v", matchedFileName, matchedFileInfo.ModTime())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Remove these debug logs.

stateFileMode = 0644
bufferLimit = 50
stateFileMode = 0644
tailCloseInterval = 3 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's less of an interval and more of a threshold.

jefchien
jefchien previously approved these changes Mar 20, 2025
lisguo
lisguo previously approved these changes Mar 20, 2025
@movence movence dismissed stale reviews from lisguo and jefchien via 35154e1 March 20, 2025 17:07
jefchien
jefchien previously approved these changes Mar 20, 2025
// sent event after buffer gets freed up
if ts.tailer.IsFileClosed() { // skip file closing if not already closed
if err := ts.tailer.Reopen(false); err != nil {
log.Printf("E! [logfile] error reopening file %s: %v", ts.tailer.Filename, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We might not want to log this if it just wasn't able to find the file anymore. Might be misleading.

fileBaseName := filepath.Base(matchedFileName)
if blacklistP != nil && blacklistP.MatchString(fileBaseName) {
continue
}
if !fileconfig.PublishMultiLogs {
t.Log.Debugf("Single file mode - current target: %s (ModTime: %v), candidate: %s (ModTime: %v)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this can become excessive logging as well depending on the number of files that match.

@sky333999 sky333999 merged commit 7e2fbb1 into aws:main Mar 20, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants