Skip to content

S3 log support#32

Merged
mheffner merged 16 commits intomainfrom
s3-log-support
Mar 5, 2026
Merged

S3 log support#32
mheffner merged 16 commits intomainfrom
s3-log-support

Conversation

@mheffner
Copy link
Contributor

@mheffner mheffner commented Feb 20, 2026

This introduces the framework for supporting S3 event notifications. S3 event notifications will invoke the Lambda function when an AWS service writes new logs to an S3 bucket (the S3 creation event). While this PR mostly lays the framework for future support of S3 log based AWS services, it currently supports CloudTrail logs from S3 (in addition to the existing Cloudtrails CloudWatch support).

A single S3 notification may contain multiple updates from the creation of multiple S3 objects, each which needs to be read, parsed and converted. By default this will process five S3 objects concurrently and will emit logs in batches up to 1k to the logs pipeline. This may be mean that ordering of the S3 objects, and logs, are not maintained. S3 objects listed later in the event notification may be loaded, parsed and exported before earlier S3 objects. In theory this can be fixed by setting FORWARDER_S3_MAX_PARALLEL_OBJECTS=1. However, S3 event notifications may be fired to multiple Lambda methods concurrently, so ordering is not guaranteed on prinicple.

There's some refactoring that I plan to do later that will cleanup sharing between the Cloudwatch and S3 logs support. In addition, the CW logs support should be broken out of the parse module, similar to this new s3logs support.

The PR required some changes to the acker component because we won't know how many acks exist ahead of time. Instead of buffering all acks, we spawn a listener to consume the acks/nacks concurrent to processing the S3 objects.

Fixes: #5

@mheffner mheffner marked this pull request as ready for review February 25, 2026 22:28
@mheffner mheffner requested a review from rjenkins February 25, 2026 22:28
let md = MessageMetadata::forwarder(counter.increment());
if let Err(e) = self
.logs_tx
.send(Message::new(Some(md), vec![log], None))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this block indefinitely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory this should not block indefinitely if the function is configured correctly because the maximum retry duration should equal the maximum function duration. This would mean that requests can't sit in the pipeline indefinitely, eventually unblocking the channel.

It's possible this could block slightly passed the maximum function runtime since there's a bit of race clearing the queue and then continuing to send here. However, in that case Lambda would likely tear the function down and respin the container. For persistent failures downstream, this would likely back pressure on Lambda causing some back off.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool

let json_map = self.parse_message_to_map(message, &mut lr);

match json_map {
Ok(None) => {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just return lr here?

mut json_map: serde_json::Map<String, JsonValue>,
) -> LogRecord {
let mut lr = LogRecord {
time_unix_nano: (timestamp * 1_000_000) as u64,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not likely but could this silently wrap around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlikely, but yeah pushing a wrapper that'll fall back to now_nanos just in case.


// Wait for the first task to finish if we've hit the concurrency limit, then
// stream its results immediately rather than accumulating them.
while tasks.len() >= max_concurrent {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this go one over max concurrent intentionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it shouldn't. The > check is mostly just a safe guard.


// Load object from S3
let object_data =
load_s3_object(&self.s3_client, bucket_name, object_key, &self.request_id).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this block indefinitely or do we rely on timeouts from the S3 client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The s3 client's timeouts and retries should prevent that.

Copy link

@rjenkins rjenkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@mheffner mheffner merged commit 66b2be3 into main Mar 5, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support S3 log sources

2 participants