description |
---|
Batches `log` events to AWS Kinesis Data Stream via the `PutRecords` API endpoint. |
{% hint style="warning" %}
The aws_kinesis_streams
sink is in beta. Please see the current
enhancements and
bugs for known issues.
We kindly ask that you add any missing issues
as it will help shape the roadmap of this component.
{% endhint %}
The aws_kinesis_streams
sink batches log
events to AWS Kinesis Data Stream via the PutRecords
API endpoint.
{% code-tabs %} {% code-tabs-item title="vector.toml (simple)" %}
[sinks.my_sink_id]
# REQUIRED - General
type = "aws_kinesis_streams" # must be: "aws_kinesis_streams"
inputs = ["my-source-id"]
region = "us-east-1"
stream_name = "my-stream"
# REQUIRED - Requests
encoding = "json" # enum: "json" or "text"
# For a complete list of options see the "advanced" tab above.
{% endcode-tabs-item %} {% code-tabs-item title="vector.toml (advanced)" %}
[sinks.aws_kinesis_streams_sink]
#
# General
#
# The component type
#
# * required
# * no default
# * must be: "aws_kinesis_streams"
type = "aws_kinesis_streams"
# A list of upstream source or transform IDs. See Config Composition for more
# info.
#
# * required
# * no default
inputs = ["my-source-id"]
# The AWS region of the target Kinesis stream resides.
#
# * required
# * no default
region = "us-east-1"
# The stream name of the target Kinesis Logs stream.
#
# * required
# * no default
stream_name = "my-stream"
# Custom endpoint for use with AWS-compatible services.
#
# * optional
# * no default
endpoint = "127.0.0.0:5000"
# Enables/disables the sink healthcheck upon start.
#
# * optional
# * default: true
healthcheck = true
# The log field used as the Kinesis record's partition key value.
#
# * optional
# * no default
partition_key_field = "user_id"
#
# Requests
#
# The encoding format used to serialize the events before flushing.
#
# * required
# * no default
# * enum: "json" or "text"
encoding = "json"
encoding = "text"
# The window used for the `request_rate_limit_num` option
#
# * optional
# * default: 1
# * unit: seconds
rate_limit_duration = 1
# The maximum number of requests allowed within the `rate_limit_duration`
# window.
#
# * optional
# * default: 5
rate_limit_num = 5
# The maximum number of in-flight requests allowed at any given time.
#
# * optional
# * default: 5
request_in_flight_limit = 5
# The maximum time a request can take before being aborted.
#
# * optional
# * default: 30
# * unit: seconds
request_timeout_secs = 30
# The maximum number of retries to make for failed requests.
#
# * optional
# * default: 5
retry_attempts = 5
# The amount of time to wait before attempting a failed request again.
#
# * optional
# * default: 5
# * unit: seconds
retry_backoff_secs = 5
#
# Batching
#
# The maximum size of a batch before it is flushed.
#
# * optional
# * default: 1049000
# * unit: bytes
batch_size = 1049000
# The maximum age of a batch before it is flushed.
#
# * optional
# * default: 1
# * unit: seconds
batch_timeout = 1
#
# Buffer
#
[sinks.aws_kinesis_streams_sink.buffer]
# The buffer's type / location. `disk` buffers are persistent and will be
# retained between restarts.
#
# * optional
# * default: "memory"
# * enum: "memory" or "disk"
type = "memory"
type = "disk"
# The behavior when the buffer becomes full.
#
# * optional
# * default: "block"
# * enum: "block" or "drop_newest"
when_full = "block"
when_full = "drop_newest"
# The maximum size of the buffer on the disk.
#
# * optional
# * no default
# * unit: bytes
max_size = 104900000
# The maximum number of events allowed in the buffer.
#
# * optional
# * default: 500
# * unit: events
num_items = 500
{% endcode-tabs-item %} {% endcode-tabs %}
The aws_kinesis_streams
sink batches log
up to the batch_size
or batch_timeout
options. When flushed, Vector will write to AWS Kinesis Data Stream via the PutRecords
API endpoint. The encoding is dictated by the encoding
option. For example:
POST / HTTP/1.1
Host: kinesis.<region>.<domain>
Content-Length: <byte_size>
Content-Type: application/x-amz-json-1.1
Connection: Keep-Alive
X-Amz-Target: Kinesis_20131202.PutRecords
{
"Records": [
{
"Data": "<base64_encoded_event>",
"PartitionKey": "<partition_key>"
},
{
"Data": "<base64_encoded_event>",
"PartitionKey": "<partition_key>"
},
{
"Data": "<base64_encoded_event>",
"PartitionKey": "<partition_key>"
},
],
"StreamName": "<stream_name>"
}
Vector checks for AWS credentials in the following order:
- Environment variables
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
. - The
credential_process
command in the AWS config file. (usually located at~/.aws/config
) - The AWS credentials file. (usually located at
~/.aws/credentials
) - The IAM instance profile. (will only work if running on an EC2 instance with an instance profile/role)
If credentials are not found the healtcheck will fail and an error will be logged.
In general, we recommend using instance profiles/roles whenever possible. In cases where this is not possible you can generate an AWS access key for any user within your AWS account. AWS provides a detailed guide on how to do this.
The aws_kinesis_streams
sink buffers & batches data as
shown in the diagram above. You'll notice that Vector treats these concepts
differently, instead of treating them as global concepts, Vector treats them
as sink specific concepts. This isolates sinks, ensuring services disruptions
are contained and delivery guarantees are honored.
The buffer.type
option allows you to control buffer resource usage:
Type | Description |
---|---|
memory |
Pros: Fast. Cons: Not persisted across restarts. Possible data loss in the event of a crash. Uses more memory. |
disk |
Pros: Persisted across restarts, durable. Uses much less memory. Cons: Slower, see below. |
The buffer.when_full
option allows you to control the behavior when the
buffer overflows:
Type | Description |
---|---|
block |
Applies back pressure until the buffer makes room. This will help to prevent data loss but will cause data to pile up on the edge. |
drop_newest |
Drops new data as it's received. This data is lost. This should be used when performance is the highest priority. |
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
batch_timeout
(default:1 seconds
). - The batch size meets or exceeds the configured
batch_size
(default:1049000 bytes
).
This component offers an at least once delivery guarantee if your pipeline is configured to achieve this.
The aws_kinesis_streams
sink encodes events before writing
them downstream. This is controlled via the encoding
option which accepts
the following options:
Encoding | Description |
---|---|
json |
The payload will be encoded as a single JSON payload. |
text |
The payload will be encoded as new line delimited text, each line representing the value of the "message" key. |
By default, the encoding
chosen is dynamic based on the explicit/implcit
nature of the event's structure. For example, if this event is parsed (explicit
structuring), Vector will use json
to encode the structured data. If the event
was not explicitly structured, the text
encoding will be used.
To further explain why Vector adopts this default, take the simple example of
accepting data over the tcp
source and then connecting
it directly to the aws_kinesis_streams
sink. It is less
surprising that the outgoing data reflects the incoming data exactly since it
was not explicitly structured.
Environment variables are supported through all of Vector's configuration.
Simply add ${MY_ENV_VAR}
in your Vector configuration file and the variable
will be replaced before being evaluated.
You can learn more in the Environment Variables section.
Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization.
If the health check fails an error will be logged and Vector will proceed to
start. If you'd like to exit immediately upon health check failure, you can
pass the --require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
And finally, if you'd like to disable health checks entirely for this sink
you can set the healthcheck
option to false
.
By default, Vector issues random 16 byte values for each
Kinesis record's partition key, evenly
distributing records across your Kinesis partitions. Depending on your use case
this might not be sufficient since random distribution does not preserve order.
To override this, you can supply the partition_key_field
option. This option
represents a field on your event to use for the partition key value instead.
This is useful if you have a field already on your event, and it also pairs
nicely with the add_fields
transform.
Kenisis requires a value for the partition key and therefore if the key is
missing or the value is blank the event will be dropped and a
warning
level log event will be logged. As such,
the field specified in the partition_key_field
option should always contain
a value.
If the value provided exceeds the maximum allowed length of 256 characters Vector will slice the value and use the first 256 characters.
Vector will coerce the value into a string.
This is generally outside the scope of Vector but worth touching on. When you supply your own partition key it opens up the possibility for "hot spots", and you should be aware of your data distribution for the key you're providing. Kinesis provides the ability to manually split shards to accomodate this. If they key you're using is dynamic and unpredictable we highly recommend recondsidering your ordering policy to allow for even and random distribution.
Vector offers a few levers to control the rate and volume of requests to the
downstream service. Start with the rate_limit_duration
and rate_limit_num
options to ensure Vector does not exceed the specified number of requests in
the specified window. You can further control the pace at which this window is
saturated with the request_in_flight_limit
option, which will guarantee no
more than the specified number of requests are in-flight at any given time.
Please note, Vector's defaults are carefully chosen and it should be rare that you need to adjust these. If you found a good reason to do so please share it with the Vector team by opening an issie.
Vector will retry failed requests (status == 429
, >= 500
, and != 501
).
Other responses will not be retried. You can control the number of retry
attempts and backoff rate with the retry_attempts
and retry_backoff_secs
options.
To ensure the pipeline does not halt when a service fails to respond Vector
will abort requests after 30 seconds
.
This can be adjsuted with the request_timeout_secs
option.
It is highly recommended that you do not lower value below the service's internal timeout, as this could create orphaned requests, pile on retries, and result in deuplicate data downstream.
The best place to start with troubleshooting is to check the
Vector logs. This is typically located at
/var/log/vector.log
, then proceed to follow the
Troubleshooting Guide.
If the Troubleshooting Guide does not resolve your issue, please:
- Check for any open
aws_kinesis_streams_sink
issues. - If encountered a bug, please file a bug report.
- If encountered a missing feature, please file a feature request.
- If you need help, join our chat/forum community. You can post a question and search previous questions.