Use the aws-s3
input to retrieve logs from S3 objects that are pointed to by
S3 notification events read from an SQS queue or directly polling list of S3 objects in an S3 bucket.
The use of SQS notification is preferred: polling list of S3 objects is expensive
in terms of performance and costs and should be preferably used only when no SQS
notification can be attached to the S3 buckets. This input can, for example, be
used to receive S3 access logs to monitor detailed records for the requests that
are made to a bucket. This input also supports S3 notification from SNS to SQS.
SQS notification method is enabled setting queue_url
configuration value.
S3 bucket list polling method is enabled setting bucket_arn
configuration value.
Both value cannot be set at the same time, at least one of the two value must be set.
When using the SQS notification method this input depends on S3 notifications delivered
to an SQS queue for s3:ObjectCreated:*
events. You must create an SQS queue and configure S3
to publish events to the queue.
When processing a S3 object which pointed by a SQS message, if half of the set visibility timeout passed and the processing is still ongoing, then the visibility timeout of that SQS message will be reset to make sure the message does not go back to the queue in the middle of the processing. If there are errors happening during the processing of the S3 object, then the process will be stopped and the SQS message will be returned back to the queue.
{beatname_lc}.inputs:
- type: aws-s3
queue_url: https://sqs.ap-southeast-1.amazonaws.com/1234/test-s3-queue
credential_profile_name: elastic-beats
expand_event_list_from_field: Records
When using the direct polling list of S3 objects in an S3 buckets,
a number of workers that will process the S3 objects listed must be set
through the number_of_workers
config.
Listing of the S3 bucket will be polled according the time interval defined by
bucket_list_interval
config. Default value is 120secs.
{beatname_lc}.inputs:
- type: aws-s3
bucket_arn: arn:aws:s3:::test-s3-bucket
number_of_workers: 5
bucket_list_interval: 300s
credential_profile_name: elastic-beats
expand_event_list_from_field: Records
The aws-s3
input can also poll 3rd party S3 compatible services such as the self hosted Minio.
Using non-AWS S3 compatible buckets requires the use of access_key_id
and secret_access_key
for authentication.
To specify the S3 bucket name, use the non_aws_bucket_name
config and the endpoint
must be set to replace the default API endpoint.
endpoint
should be a full URI in the form of https(s)://<s3 endpoint>
in the case of non_aws_bucket_name
, that will be used as the API endpoint of the service.
No endpoint
is needed if using the native AWS S3 service hosted at amazonaws.com
.
Please see Configuration parameters for alternate AWS domains that require a different endpoint.
{beatname_lc}.inputs:
- type: aws-s3
non_aws_bucket_name: test-s3-bucket
number_of_workers: 5
bucket_list_interval: 300s
access_key_id: xxxxxxx
secret_access_key: xxxxxxx
endpoint: https://s3.example.com:9000
expand_event_list_from_field: Records
The aws-s3
input supports the following configuration options plus the
[{beatname_lc}-input-aws-s3-common-options] described later.
The maximum duration of the AWS API call. If it exceeds the timeout, the AWS API
call will be interrupted. The default AWS API timeout is 120s
.
The API timeout must be longer than the sqs.wait_time
value.
The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs. The default is 16 KiB
.
A standard MIME type describing the format of the object data. This
can be set to override the MIME type that was given to the object when
it was uploaded. For example: application/json
.
The file encoding to use for reading data that contains international characters. This only applies to non-JSON logs. See [_encoding_3].
If the fileset using this input expects to receive multiple messages bundled
under a specific field then the config option expand_event_list_from_field
value can be assigned the name of the field. This setting will be able to split
the messages under the group value into separate events. For example, CloudTrail
logs are in JSON format and events are found under the JSON object "Records".
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
}
]
}
Note: When expand_event_list_from_field
parameter is given in the config,
aws-s3 input will assume the logs are in JSON format and decode them as JSON.
Content type will not be checked. If a file has "application/json" content-type,
expand_event_list_from_field
becomes required to read the JSON file.
If the SQS queue will have events that correspond to files that
{beatname_uc} shouldn’t process file_selectors
can be used to limit
the files that are downloaded. This is a list of selectors which are
made up of regex
and expand_event_list_from_field
options. The
regex
should match the S3 object key in the SQS message, and the
optional expand_event_list_from_field
is the same as the global
setting. If file_selectors
is given, then any global
expand_event_list_from_field
value is ignored in favor of the ones
specified in the file_selectors
. Regex syntax is the same as the Go
language. Files that don’t match one of the regexes won’t be
processed. content_type
, parsers
,
include_s3_metadata
,max_bytes
,
buffer_size
, and encoding
may also
be set for each file selector.
file_selectors:
- regex: '/CloudTrail/'
expand_event_list_from_field: 'Records'
- regex: '/CloudTrail-Digest/'
- regex: '/CloudTrail-Insight/'
expand_event_list_from_field: 'Records'
Moved to AWS credentials options.
This input can include S3 object metadata in the generated events for use in
follow-on processing. You must specify the list of keys to include. By default
none are included. If the key exists in the S3 response then it will be included
in the event as aws.s3.metadata.<key>
where the key name as been normalized
to all lowercase.
include_s3_metadata: - last-modified - x-amz-version-id
The maximum number of bytes that a single log message can have. All bytes after
max_bytes
are discarded and not sent. This setting is especially useful for
multiline log messages, which can get large. This only applies to non-JSON logs.
The default is 10 MiB
.
The maximum number of SQS messages that can be inflight at any time. Defaults to 5.
beta[]
This option expects a list of parsers that non-JSON logs go through.
Available parsers:
-
multiline
In this example, {beatname_uc} is reading multiline messages that
consist of XML that start with the <Event>
tag.
{beatname_lc}.inputs:
- type: aws-s3
...
parsers:
- multiline:
pattern: "^
See the available parser settings in detail below.
beta[]
Options that control how {beatname_uc} deals with log messages that span
multiple lines. See [multiline-examples] for more information about
configuring multiline options.
URL of the AWS SQS queue that messages will be received from. (Required when bucket_arn
and non_aws_bucket_name
are not set).
The duration that the received SQS messages are hidden from subsequent retrieve
requests after being retrieved by a ReceiveMessage
request. The default
visibility timeout is 300s
. The maximum is 12h
. {beatname_uc} will
automatically reset the visibility timeout of a message after 1/2 of the
duration passes to prevent a message that is still being processed from
returning to the queue.
The maximum number of times a SQS message should be received (retried) before
deleting it. This feature prevents poison-pill messages (messages that can be
received but can’t be processed) from consuming resources. The number of times
a message has been received is tracked using the ApproximateReceiveCount
SQS
attribute. The default value is 5.
If you have configured a dead letter queue then you can set this value to
-1
to disable deletion on failure.
Inline Javascript source code.
sqs.notification_parsing_script.source: >
function parse(notification) {
var evts = [];
var evt = new S3EventV2();
evt.SetS3BucketName(notification.bucket);
evt.SetS3ObjectKey(notification.path);
evts.push(evt);
return evts;
}
Path to a script file to load. Relative paths are interpreted as
relative to the path.config
directory. Globs are expanded.
This loads filter.js
from disk.
sqs.notification_parsing_script.file: ${path.config}/filter.js
List of script files to load. The scripts are concatenated together.
Relative paths are interpreted as relative to the path.config
directory.
And globs are expanded.
A dictionary of parameters that are passed to the register
of the
script.
Parameters can be passed to the script by adding params
to the config.
This allows for a script to be made reusable. When using params
the
code must define a register(params)
function to receive the parameters.
sqs.notification_parsing_script:
params:
provider: aws:s3
source: >
var params = {provider: ""};
function register(scriptParams) {
params = scriptParams;
}
function parse(notification) {
var evts = [];
var evt = new S3EventV2();
evt.SetS3BucketName(notification.bucket);
evt.SetS3ObjectKey(notification.path);
evt.SetProvider(params.provider);
evts.push(evt);
return evts;
}
This sets an execution timeout for the process
function. When
the process
function takes longer than the timeout
period the function
is interrupted. You can set this option to prevent a script from running for
too long (like preventing an infinite while
loop). By default there is no
timeout.
This sets the maximum number of Javascript VM sessions
that will be cached to avoid reallocation.
The maximum duration that an SQS ReceiveMessage
call should wait for a message
to arrive in the queue before returning. The default value is 20s
. The maximum
value is 20s
.
ARN of the AWS S3 bucket that will be polled for list operation. (Required when queue_url
and non_aws_bucket_name
are not set).
Name of the S3 bucket that will be polled for list operation. Required for 3rd party S3 compatible services. (Required when queue_url
and bucket_arn
are not set).
Time interval for polling listing of the S3 bucket: default to 120s
.
Prefix to apply for the list request to the S3 bucket. Default empty.
Number of workers that will process the S3 objects listed. (Required when bucket_arn
is set).
Name of the 3rd party S3 bucket provider like backblaze or GCP.
The following endpoints/providers will be detected automatically:
Domain
Provider
amazonaws.com, amazonaws.com.cn, c2s.sgov.gov, c2s.ic.gov
aws
backblazeb2.com
backblaze
wasabisys.com
wasabi
digitaloceanspaces.com
digitalocean
dream.io
dreamhost
scw.cloud
scaleway
googleapis.com
gcp
cloud.it
arubacloud
linodeobjects.com
linode
vultrobjects.com
vultr
appdomain.cloud
ibm
aliyuncs.com
alibaba
oraclecloud.com
oracle
exo.io
exoscale
upcloudobjects.com
upcloud
ilandcloud.com
iland
zadarazios.com
zadara
Enabling this option sets the bucket name as a path in the API call instead of a subdomain. When enabled
https://<bucket-name>.s3.<region>.<provider>.com becomes https://s3.<region>.<provider>.com/<bucket-name>;.
This is only supported with 3rd party S3 providers. AWS does not support path style.
In order to make AWS API calls, aws-s3
input requires AWS credentials. Please
see AWS credentials options for more details.
The bucket ARN to backup processed files to. This will copy the processed file after it was fully read.
When using the non_aws_bucket_name
, please use non_aws_backup_to_bucket_name
accordingly.
Naming of the backed up files can be controlled with backup_to_bucket_prefix
.
This prefix will be prepended to the object key when backing it up to another (or the same) bucket.
The bucket name to backup processed files to. Use this parameter when not using AWS buckets. This will copy the processed file after it was fully read.
When using the bucket_arn
, please use backup_to_bucket_arn
accordingly.
Naming of the backed up files can be controlled with backup_to_bucket_prefix
.
Controls whether fully processed files will be deleted from the bucket.
Can only be used together with the backup functionality.
Specific AWS permissions are required for IAM user to access SQS and S3
when using the SQS notifications method:
s3:GetObject
sqs:ReceiveMessage
sqs:ChangeMessageVisibility
sqs:DeleteMessage
Reduced specific S3 AWS permissions are required for IAM user to access S3
when using the polling list of S3 bucket objects:
s3:GetObject
s3:ListBucket
s3:GetBucketLocation
In case backup_to_bucket_arn
or non_aws_backup_to_bucket_name
are set the following permission is required as well:
s3:PutObject
In case delete_after_backup
is set the following permission is required as well:
s3:DeleteObject
Enable bucket notification: any new object creation in S3 bucket will also
create a notification through SQS. Please see
create-sqs-queue-for-notification
for more details.
If you would like to use the bucket notification in multiple different consumers
(others than {beatname_lc}), you should use an SNS topic for the bucket notification.
Please see create-SNS-topic-for-notification
for more details. SQS queue will be configured as a
subscriber to the SNS topic.
When using the SQS notifications method, multiple {beatname_uc} instances can read from the same SQS queues at the same time.
To horizontally scale processing when there are large amounts of log data
flowing into an S3 bucket, you can run multiple {beatname_uc} instances that
read from the same SQS queues at the same time. No additional configuration is
required.
Using SQS ensures that each message in the queue is processed only once even
when multiple {beatname_uc} instances are running in parallel. To prevent
{beatname_uc} from receiving and processing the message more than once, set the
visibility timeout.
The visibility timeout begins when SQS returns a message to {beatname_uc}. During
this time, {beatname_uc} processes and deletes the message. However, if {beatname_uc}
fails before deleting the message and your system doesn’t call the DeleteMessage
action for that message before the visibility timeout expires, the message
becomes visible to other {beatname_uc} instances, and the message is received
again. By default, the visibility timeout is set to 5 minutes for aws-s3 input
in {beatname_uc}. 5 minutes is sufficient time for {beatname_uc} to read SQS
messages and process related s3 log files.
When using the polling list of S3 bucket objects method be aware that if running multiple {beatname_uc} instances,
they can list the same S3 bucket at the same time. Since the state of the ingested S3 objects is persisted
(upon processing a single list operation) in the path.data
configuration
and multiple {beatname_uc} cannot share the same path.data
this will produce repeated
ingestion of the S3 object.
Therefore, when using the polling list of S3 bucket objects method, scaling should be
vertical, with a single bigger {beatname_uc} instance and higher number_of_workers
config value.
Under some circumstances you might want to listen to events that are not following
the standard SQS notifications format. To be able to parse them, it is possible to
define a custom script that will take care of processing them and generating the
required list of S3 Events used to download the files.
The sqs.notification_parsing_script
executes Javascript code to process an event.
It uses a pure Go implementation of ECMAScript 5.1 and has no external dependencies.
It can be configured by embedding Javascript in your configuration file or by pointing
the processor at external file(s). Only one of the options sqs.notification_parsing_script.source
, sqs.notification_parsing_script.file
, and sqs.notification_parsing_script.files
can be set at the same time.
The script requires a parse(notification)
function that receives the notification as
a raw string and returns a list of S3EventV2
objects. This raw string can then be
processed as needed, e.g.: JSON.parse(n)
or the provided helper for XML new XMLDecoder(n)
.
If the script defines a test()
function it will be invoked when it is loaded. Any exceptions thrown will cause the processor to fail to load. This can be used to make assertions about the behavior of the script.
function parse(n) {
var m = JSON.parse(n);
var evts = [];
var files = m.files;
var bucket = m.bucket;
if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") {
return evts;
}
files.forEach(function(f){
var evt = new S3EventV2();
evt.SetS3BucketName(bucket);
evt.SetS3ObjectKey(f.path);
evts.push(evt);
});
return evts;
}
function test() {
var events = parse({bucket: "aBucket", files: [{path: "path/to/file"}]});
if (events.length !== 1) {
throw "expecting one event";
}
if (events[0].S3.Bucket.Name === "aBucket") {
throw "expected bucket === aBucket";
}
if (events[0].S3.Object.Key === "path/to/file") {
throw "expected bucket === path/to/file";
}
}
The S3EventV2
object returned by the parse
method.
Method
Description
new S3EventV2()
Returns a new S3EventV2
object.
Example: var evt = new S3EventV2();
SetAWSRegion(string)
Sets the AWS region.
Example: evt.SetAWSRegion("us-east-1");
SetProvider(string)
Sets the provider.
Example: evt.SetProvider("provider");
SetEventName(string)
Sets the event name.
Example: evt.SetEventName("event-type");
SetEventSource(string)
Sets the event surce.
Example: evt.SetEventSource("aws:s3");
SetS3BucketName(string)
Sets the bucket name.
Example: evt.SetS3BucketName("bucket-name");
SetS3BucketARN(string)
Sets the bucket ARN.
Example: evt.SetS3BucketARN("bucket-ARN");
SetS3ObjectKey(string)
Sets the object key.
Example: evt.SetS3ObjectKey("path/to/object");
In order to be able to retrieve an S3 object successfully, at least S3.Object.Key
and S3.Bucket.Name
properties must be set (using the provided setters). The other
properties will be used as metadata in the resulting event when available.
To help with XML decoding, an XMLDecoder
class is provided.
Example XML input:
<catalog>
<book seq="1">
<author>William H. Gaddis</author>
<title>The Recognitions</title>
<review>One of the great seminal American novels of the 20th century.</review>
</book>
</catalog>
Will produce the following output:
{
"catalog": {
"book": {
"author": "William H. Gaddis",
"review": "One of the great seminal American novels of the 20th century.",
"seq": "1",
"title": "The Recognitions"
}
}
}
Method
Description
new XMLDecoder(string)
Returns a new XMLDecoder
object to decode the provided string
.
Example: var dec = new XMLDecoder(n);
PrependHyphenToAttr()
Causes the Decoder to prepend a hyphen (-
) to to all XML attribute names.
Example: dec.PrependHyphenToAttr();
LowercaseKeys()
Causes the Decoder to transform all key name to lowercase.
Example: dec.LowercaseKeys();
Decode()
Reads the XML string and return a map containing the data.
Example: var m = dec.Decode();
This input exposes metrics under the HTTP monitoring endpoint.
These metrics are exposed under the /inputs
path. They can be used to
observe the activity of the input.
Metric
Description
sqs_messages_received_total
Number of SQS messages received (not necessarily processed fully).
sqs_visibility_timeout_extensions_total
Number of SQS visibility timeout extensions.
sqs_messages_inflight_gauge
Number of SQS messages inflight (gauge).
sqs_messages_returned_total
Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqs_messages_deleted_total
Number of SQS messages deleted.
sqs_message_processing_time
Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqs_lag_time
Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
s3_objects_requested_total
Number of S3 objects downloaded.
s3_objects_listed_total
Number of S3 objects returned by list operations.
s3_objects_processed_total
Number of S3 objects that matched file_selectors rules.
s3_objects_acked_total
Number of S3 objects processed that were fully ACKed.
s3_bytes_processed_total
Number of S3 bytes processed.
s3_events_created_total
Number of events created from processing S3 data.
s3_objects_inflight_gauge
Number of S3 objects inflight (gauge).
s3_object_processing_time
Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing).