Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fixed size buffer to handle fd in log tailer with feature flag #1577

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

movence
Copy link
Contributor

@movence movence commented Mar 3, 2025

Description of the issue

The current CWA strategy for log collection prioritizes log integrity without consideration for disk space. CWA holds onto open file descriptors until all the log lines have been successfully pushed to CloudWatch Logs. If another process deletes the file, the file descriptor held by the agent will be unaffected. If CWA is unable to publish the logs, it will retry for 14 days. For some customers, this is the expectation and preferred behavior particularly for sensitive files such as security logs.

On Unix, when a file is removed by a process, the file name is unlinked from the underlying file instead of deleted. If there’s an open reference to the file via file descriptor, the file will not be deleted.

Description of changes

  • Add buffer-based (size of 1) log handling in tailer for file rotation
  • Add new runSender goroutine. tailer now sends single event at a time to channel that the sender reads from
  • Enable this new path behind feature flag backpressure_drop
  • Add feature toggle flag that is read either from agent json config or env var with the value from agent config takes precedence. This is to keep the original behavior with auto_removal which reads all contents from a rotate log file before deleting them.
  • Feature flag value is then added to env-config.json by the translator so it can be added as env var of systemd process of the agent at start up time

NOTE
The auto_removal setting takes precedence, disabling any tailer-sender modifications for log entries when enabled. With auto_removal, all existing files are expected to be fully processed before they are deleted. However, the backpressure_drop flag contradicts this behavior by stopping the processing of previous files upon rotation. To ensure backward compatibility for customers who depend on the auto_removal functionality, we must retain the current behavior.

License

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Tests

Block logs endpoint by pointing API URI (eg logs.us-west-2.amazonaws.com) to localhost. Then use a bash script to generate 10 lines/sec to /var/log/test/application.log, rotating the log every 3 minutes with a ~30-minute bake period.. Keep a maximum of 3 rotated files

#### Agent version 1.300052.0
> pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent"
21390
> ls -l /proc/$(pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent")/fd | grep application | wc -l
12
> ls -l /proc/$(pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent")/fd | grep application | head -5
lr-x------ 1 root root 64 Mar  6 03:12 10 -> /var/log/test/application.log.20250306_031458 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 11 -> /var/log/test/application.log.20250306_031530 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 12 -> /var/log/test/application.log.20250306_031602 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 13 -> /var/log/test/application.log.20250306_031634 (deleted)
lr-x------ 1 root root 64 Mar  6 03:12 14 -> /var/log/test/application.log.20250306_031706 (deleted)


#### With `backpressure_drop` enabled
> pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent"
27016
> ls -l /proc/$(pgrep -f "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent")/fd | grep application | wc -l
0


#### agent config used
{
  "agent": {
    "debug": true,
    "run_as_user": "root"
  },
  "logs": {
    "backpressure_drop": true,
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "file_path": "/var/log/test/application.log*",
            "log_group_name": "application-logs",
            "log_stream_name": "{instance_id}-application",
            #"auto_removal": true
          }
        ]
      }
    }
  }
}

Requirements

Before commit the code, please do the following steps.

  1. Run make fmt and make fmt-sh
  2. Run make lint

@movence movence force-pushed the fix-logs-fd-leak branch 2 times, most recently from 754d84a to 0dc46cd Compare March 4, 2025 21:37
@movence movence marked this pull request as ready for review March 5, 2025 15:17
@movence movence requested a review from a team as a code owner March 5, 2025 15:17
@movence movence force-pushed the fix-logs-fd-leak branch from e3a0758 to 15d84b5 Compare March 6, 2025 03:33
if !resetOffset && tail.curOffset > 0 {
err := tail.seekTo(SeekInfo{Offset: tail.curOffset, Whence: io.SeekStart})
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return err
return fmt.Errorf("unable to restore offset on reopen: %w", err)

@@ -210,6 +212,15 @@ func (tail *Tail) reopen() error {
break
}
OpenFileCount.Add(1)

tail.openReader()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like logic that was similar to tailFileSync. Do we need to restart the watcher or any of the other parts of that function?


// helper to handle event publishing
publishEvent := func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would rather this were an actual method.

func (ts *tailerSrc) publishEvent(msgBuf bytes.Buffer, fo *fileOffset)

case <-ts.done:
return
default:
log.Printf("D! [logfile] tailer sender buffer is full, closing file %v", ts.tailer.Filename)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think there needs to be a delay before closing the file. Just because the buffer is backed up doesn't mean it's completely blocked. The concern is that this will result in heavy I/O with the file getting constantly closed and reopened.

Would suggest adding a timer.

@@ -310,6 +326,7 @@ func (ts *tailerSrc) cleanUp() {

if ts.outputFn != nil {
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest
ts.outputFn = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

log.Printf("D! [logfile] runSender buffer was closed for %s", ts.tailer.Filename)
return
}
if e != nil && ts.outputFn != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever be false?

}

func NewLogFile() *LogFile {
backpressureDrop := envconfig.IsBackpressureDropEnabled()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, this could be set in the log configuration. The translator could be the one that reads the environment variable or JSON config and sets the field. Would allow for more granular control this way since you could opt-in/opt-out per log file.

Comment on lines 263 to 270
for k, v := range tt.envVars {
os.Setenv(k, v)
}
defer func() {
for k := range tt.envVars {
os.Unsetenv(k)
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: t.Setenv will do the cleanup for us.

Suggested change
for k, v := range tt.envVars {
os.Setenv(k, v)
}
defer func() {
for k := range tt.envVars {
os.Unsetenv(k)
}
}()
for k, v := range tt.envVars {
t.Setenv(k, v)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants