-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWSEMFExporter - Add EMF Exporter to support exporting metrics to AWS CloudWatch #498
AWSEMFExporter - Add EMF Exporter to support exporting metrics to AWS CloudWatch #498
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be almost impossible for one of the maintainer to review 5000+ lines. Consider to split this PR into more readable PRs, as a rule of thumb 500 lines is considered a large PR
// CloudWatch metrics namespace | ||
Namespace string `mapstructure:"namespace"` | ||
// CWLogs service endpoint | ||
Endpoint string `mapstructure:"endpoint"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use one of the confignet addresses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick skim - agree it'd be good to split this up, perhaps something like
- Initialize basic connection to cloudwatch
- Add translation of data formats
- Wire up translation to basic connection
- Add advanced connection features (STS)
Also, this is probably going to happen naturally if going with the above structure but there seems to be an overcoupling of sending logic and translation logic. I think having translation logic separate makes the code easier to reason about, see our translator package for xray exporter https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/master/exporter/awsxrayexporter/translator/sql.go
exporter/awsemfexporter/README.md
Outdated
The following exporter configuration parameters are supported. They mirror and have the same affect as the | ||
comparable AWS X-Ray Daemon configuration values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following exporter configuration parameters are supported. They mirror and have the same affect as the | |
comparable AWS X-Ray Daemon configuration values. | |
The following exporter configuration parameters are supported. |
Don't think the x-ray daemon is relevant to readers of this file.
exporter/awsemfexporter/config.go
Outdated
// Config defines configuration for AWS EMF exporter. | ||
type Config struct { | ||
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. | ||
// LogGroupName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem to be missing descriptions for these config fields. I think the receiver is a good example for what level of detail we should include here
@@ -0,0 +1,52 @@ | |||
package mapWithExpiry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filename has typo
} | ||
|
||
func (u *NonBlockingFifoQueue) Dequeue() (interface{}, bool) { | ||
u.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not non-blocking if you lock
) | ||
|
||
// It is a FIFO queue with the functionality that dropping the front if the queue size reaches to the maxSize | ||
type NonBlockingFifoQueue struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, I didn't look in detail, but at first glance it seems strange to have a queue in the exporter. Shouldn't we be able to use the pipeline for queueing, batching, etc?
…t exporting metrics to CloudWatch
…t exporting metrics to CloudWatch, Fix unit tests timeout issue
Codecov Report
@@ Coverage Diff @@
## master #498 +/- ##
==========================================
+ Coverage 87.97% 89.25% +1.28%
==========================================
Files 251 260 +9
Lines 12012 12700 +688
==========================================
+ Hits 10568 11336 +768
+ Misses 1109 1007 -102
- Partials 335 357 +22
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
Split the original PR into several pieces. The current one only contains the very basic functionality to get metrics from pipeline, translate them into EMF format and publish to CloudWatch. It doesn't include any batching or queuing logic. The feature to correlate CloudWatch log group name and CloudWatch metrics namespace is not included in this version as well. These features will be added in subsequent PRs. tag @alolita on AWS side for her visibility on this PR. |
|
||
const ( | ||
// this is the retry count, the total attempts would be retry count + 1 at most. | ||
defaultRetryCount = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default retries should be 1. This is usually too many retries for observability functionality not on the main execution path before dropping the data. In addition, this many retries will likely trigger throttling by CloudWatch API.
default: | ||
// ThrottlingException is handled here because the type cloudwatch.ThrottlingException is not yet available in public SDK | ||
// Retry request if ThrottlingException happens | ||
if awsErr.Code() == ErrCodeThrottlingException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retrying during throttling will never catch up. Just drop the data at this point. Possibly even the entire sequence.
} | ||
|
||
//sleep some back off time before retries. | ||
func backoffSleep(i int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleeping this long will never allow catchup
retryCnt: *awsConfig.MaxRetries, | ||
logger: logger, | ||
} | ||
if config.(*Config).ForceFlushInterval > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upstream processors in the collector such as batch_retry should handle retries and queuing. Exporter should not need a timer. Just push any data received immediately. Only thing that needs to be handled is making sure maximum records is not exceeded per put. You will likely need a loop for that. See the X-ray exporter.
|
||
//Put log events. The method mainly handles different possible error could be returned from server side, and retries them | ||
//if necessary. | ||
func (client *CloudWatchLogClient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retryCnt int) *string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return error to collector pipeline instead of handling everything here. Requests that should not be retried (HTTP 4xx) must be wrapped in consumererror.Permanent
|
||
// Possible exceptions are combination of common errors (https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/CommonErrors.html) | ||
// and API specific erros (e.g. https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html#API_PutLogEvents_Errors) | ||
type CloudWatchLogClient struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to make struct internal
exporter/awsemfexporter/factory.go
Outdated
) | ||
|
||
// Factory is the factory for AWS EMF exporter. | ||
type Factory struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to use the helper factory
…Add the pusher to check the batched payload size.
@shaochengwang can you split the PR as @bogdandrutu asked? |
@tigrannajaryan @bogdandrutu - Ack will follow up w @shaochengwang |
Thanks a lot for reviewing this PR. I have tried to make this feature as simple as possible in the second revision. Splitting it further may break its logic and functionality. Although it has 6000+ lines, I think most of them are the dependencies list and unit test files, the source code change should not be huge. |
exporter/awsemfexporter/README.md
Outdated
| `log_group_name` | Customized log group name | | | ||
| `log_stream_name` | Customized log stream name | | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fill the default values for these configurations?
cmd/otelcontribcol/components.go
Outdated
@@ -107,6 +108,7 @@ func components() (component.Factories, error) { | |||
&splunkhecexporter.Factory{}, | |||
elasticexporter.NewFactory(), | |||
&alibabacloudlogserviceexporter.Factory{}, | |||
&awsemfexporter.Factory{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest update to NewFactory()
interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed it in the latest version.
e.Message(), | ||
e.Error(), | ||
e)) | ||
backoffSleep(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be waited if the error type is InvalidSequenceTokenException? it seems we can retry it right away with the new token returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
exporter/awsemfexporter/pusher.go
Outdated
//The file name where this log event comes from | ||
FileName string | ||
//The offset for the input file | ||
FilePosition int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may not need?
|
||
// Shutdown stops the exporter and is invoked during shutdown. | ||
func (emf *emfExporter) Shutdown(ctx context.Context) error { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we force to flush the unpublished batched log events(in puhsers) here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We tested this implementation within AWS test environments. @kbrockhoff, pls help us review the PR. Thanks.
Please follow the process described here: |
@anuraaga please review, we heard that this is urgent and we don't have time to review 5K lines PRs. But I trust that you as one of the future maintainer of this component can do this. |
This exporter converts OpenTelemetry metrics to | ||
[AWS CloudWatch Embedded Metric Format(EMF)](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html) | ||
and then sends them directly to CloudWatch Logs using the | ||
[PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html) API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really for this PR but is it easy to support both API sending and writing to STDOUT? I guess STDOUT can help in many cases too.
exporter/awsemfexporter/README.md
Outdated
| `no_verify_ssl` | Enable or disable TLS certificate verification. | false | | ||
| `proxy_address` | Upload Structured Logs to AWS CloudWatch through a proxy. | | | ||
| `region` | Send Structured Logs to AWS CloudWatch in a specific region. | determined by metadata | | ||
| `local_mode` | Local mode to skip EC2 instance metadata check. | false | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove local_mode
? I know our other exporter has it but I want to consider removing it there too - I had trouble explaining this flag to a user before. We use it to control returning errors, but I don't think that's actually a good idea. I think we also use it to flag whether to fetch region info, but it seems ok to just try in any case instead of having this confusing flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. will remove this local_mode
exporter/awsemfexporter/README.md
Outdated
| `endpoint` | Optionally override the default CloudWatch service endpoint. | | | ||
| `no_verify_ssl` | Enable or disable TLS certificate verification. | false | | ||
| `proxy_address` | Upload Structured Logs to AWS CloudWatch through a proxy. | | | ||
| `region` | Send Structured Logs to AWS CloudWatch in a specific region. | determined by metadata | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the AWS_REGION flag here since it's other confusing that we have another mechanism for setting region.
) | ||
|
||
// GetAWSConfigSession returns AWS config and session instances. | ||
func GetAWSConfigSession(logger *zap.Logger, cn connAttr, cfg *Config) (*aws.Config, *session.Session, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoever merges all the AWS conn.go files into one under internal/aws
will get a prize
) | ||
|
||
const ( | ||
// this is the retry count, the total attempts would be retry count + 1 at most. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// this is the retry count, the total attempts would be retry count + 1 at most. | |
// this is the retry count, the total attempts will be at most retry count + 1. |
|
||
const ( | ||
CleanInterval = 5 * time.Minute | ||
MinTimeDiff = 50 // We assume 50 micro-seconds is the minimal gap between two collected data sample to be valid to calculate delta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MinTimeDiffMicros
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively stick with time
package, 50 * time.Microseconds
serviceNamespace, svcNsOk := attributes[conventions.AttributeServiceNamespace] | ||
|
||
if svcNameOk { | ||
svcAttrMode = ServiceNameOnly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This svcAttrMode
seems unnecessarily complex, isn't it just
if svcNameOk && svcNsOk {
namespace = fmt....
} else if svcNsOk {
namespace = serviceNamespace
} else if svcNameOk {
namespace = serviceName
}
totalDroppedMetrics += len(metric.GetTimeseries()) | ||
continue | ||
} | ||
//TODO: Handle OTLib as a dimension when it's supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're missing this since we converted to OC, but we should be sticking to the native OTel format
exporter/awsemfexporter/pusher.go
Outdated
|
||
pusher := newPusher(logGroupName, logStreamName, svcStructuredLog) | ||
|
||
// For blocking queue, assuming the log batch payload size is 1MB. Set queue size to 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment in the wrong place? No queue size here
exporter/awsemfexporter/pusher.go
Outdated
//* None of the log events in the batch can be older than 14 days or the | ||
//retention period of the log group. | ||
currentTime := time.Now().UTC() | ||
utcTime := time.Unix(0, *logEvent.InputLogEvent.Timestamp*1e6).UTC() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think you can use something in time.
instead of magic number 1e6
…ove logging with zap logger, remove unnecessary metrics transformation to OCMetrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a couple more small but important points but should be straightforward. Just a bit more, thanks!
|
||
if response != nil { | ||
if response.RejectedLogEventsInfo != nil { | ||
rejectedLogEventsInfo := response.RejectedLogEventsInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder that later we'll need metrics for these sort of failures
err = wrapErrorIfBadRequest(&returnError) | ||
} | ||
if err != nil { | ||
emf.logger.Error("Experiences some errors when gracefully shutting down emf_exporter. Skipping to next pusher.", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emf.logger.Error("Experiences some errors when gracefully shutting down emf_exporter. Skipping to next pusher.", zap.Error(err)) | |
emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next pusher.", zap.Error(err)) |
exporter/awsemfexporter/pusher.go
Outdated
|
||
func (logEvent *LogEvent) truncateIfNeeded() bool { | ||
if logEvent.eventPayloadBytes() > MaxEventPayloadBytes { | ||
log.Printf("W! logpusher: the single log event size is %v, which is larger than the max event payload allowed %v. Truncate the log event.", logEvent.eventPayloadBytes(), MaxEventPayloadBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh can you make sure to use the zap logger throughout the PR? We don't use standard library logging in the collector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do. Thanks
exporter/awsemfexporter/pusher.go
Outdated
//retention period of the log group. | ||
currentTime := time.Now().UTC() | ||
utcTime := time.Unix(0, *logEvent.InputLogEvent.Timestamp*int64(time.Millisecond)).UTC() | ||
duration := currentTime.Sub(utcTime).Hours() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to round the hours? How about converting 24*14
to duration instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably have constants for the durations for the staleness check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree it's better to use constants here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
This adds a processor that drops data according to configured memory limits. The processor is important for high load situations when receiving rate exceeds exporting rate (and an extreme case of this is when the target of exporting is unavailable). Typical production run will need to have this processor included in every pipeline immediately after the batch processor.
Description:
This PR introduces an exporter for AWS CloudWatch Service. The exporter works by translating metrics into the Embedded Metric Format which enables you to ingest complex high-cardinality application data in the form of logs and to generate actionable metrics from them.
Testing:
Unit tests covered for emf_exporter, cloudwatchlogs client, pusher, publisher, request handler.
Manually tested the end to end functionality on AWS CloudWatch console.
Documentation:
A README is included. It explains this
awsemfexporter
's DataConversion, Configuration and AWS Credential.Notes:
Split the original PR into several pieces. The current one only contains the very basic functionality to get metrics from pipeline, translate them into EMF format and publish to CloudWatch. It doesn't do any batching or queuing. The feature to correlate CloudWatch log group name and CloudWatch metrics namespace is also not included in this version. These features will be added in subsequent PRs.
Although there are 4783 lines added in this PR, most of them are dependencies change and unit tests. The source code change is around 1200 lines.