-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cloudwatch Logs producer #166
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code does not compile.
Also:
- Don't use "_" in filenames please. Go has special meanings for those postfixes. All lowercase, one word.
- Please provide an integration test if possible
producer/cloudwatch_logs.go
Outdated
// Configure initializes this producer with values from a plugin config. | ||
func (prod *CloudwatchLogs) Configure(conf core.PluginConfigReader) error { | ||
prod.SetStopCallback(prod.flush) | ||
stream := conf.GetString("stream", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using struct tags for stream and group (non-pointer) and only leave the checks on empty in the config method
producer/cloudwatch_logs.go
Outdated
// do anything reasonable with rejected logs. Ignore it. | ||
// Meybe expose some statistics for rejected counters. | ||
resp, err := dst.svc.PutLogEvents(prod.cloudwatchLogsParams) | ||
if err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err != nil there should be an error message logged
producer/cloudwatch_logs.go
Outdated
|
||
type CloudwatchLogs struct { | ||
core.BufferedProducer `gollumdoc:"embed_type"` | ||
stream *string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make these strings non-pointer.
- It will make it easier to process your config
- Strings are not scattered around in memory (cache locality, memory fragmentation, etc - won't hurt you here but in general I would consider it a good practice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created them as pointers to easily set them in upload params.
producer/cloudwatch_logs.go
Outdated
} | ||
err = prod.createStream() | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine like this, but you could also do it like this:
func (prod *CloudwatchLogs) create() error {
if err := prod.createGroup(); err != nil {
return err
}
return prod.createStream()
}
I'll leave it up to you which version you like more.
producer/cloudwatch_logs.go
Outdated
// http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html | ||
func (prod *CloudwatchLogs) createStream() error { | ||
params := &cloudwatchlogs.CreateLogStreamInput{ | ||
LogGroupName: &prod.group, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not compile ...?
LogGroupName
is a *string
, not a **string
.
producer/cloudwatch_logs.go
Outdated
} | ||
} | ||
|
||
func (prod *Console) Produce(workers *sync.WaitGroup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be func (prod *CloudwatchLogs) ...
producer/cloudwatch_logs.go
Outdated
func (prod *Console) Produce(workers *sync.WaitGroup) { | ||
defer prod.WorkerDone() | ||
prod.AddMainWorker(workers) | ||
prod.MessageControlLoop(prod.printMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
printMessage function does not exist.
You probably want to call upload
here?
If you use the buffered producer you will only get single messages though.
If you want to send batches of messages you should have a look at core.BatchProducer
producer/cloudwatch_logs.go
Outdated
"time" | ||
) | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use //
when doing comments.
Some people use /* .. */
to temporarily remove code and its super annoying if you have block comments inside your code that force you to create multiple blocks.
producer/cloudwatch_logs.go
Outdated
// Put log events and update sequence token. | ||
// Possible errors http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html | ||
func (prod *CloudwatchLogs) upload() { | ||
logevents := make([]*cloudwatchlogs.InputLogEvent, 0, len(events)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does events
come from?
Where should I put code to:
It should be launched before first upload to cloudwatch. Is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also check the current linter issues: https://travis-ci.org/trivago/gollum/jobs/252132692
producer/cloudwatchlogs.go
Outdated
// PutLogEvents 5 requests/second/log stream. | ||
putLogEventsDelay = 200 * time.Millisecond | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide a documentation and a config example . Look here for an example: https://github.com/trivago/gollum/blob/master/producer/elasticsearch.go#L31
producer/cloudwatchlogs.go
Outdated
|
||
type CloudwatchLogs struct { | ||
core.BufferedProducer `gollumdoc:"embed_type"` | ||
stream string `config:"Stream" default:""` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Stream" is quite close to the standard "Streams".
As by line 93 I would rename this to "LogStreamPrefix". Avoids ambiguity in the config.
producer/cloudwatchlogs.go
Outdated
type CloudwatchLogs struct { | ||
core.BufferedProducer `gollumdoc:"embed_type"` | ||
stream string `config:"Stream" default:""` | ||
group string `config:"Group" default:""` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To align with my previous proposal I would rename this to "LogGroup".
producer/cloudwatchlogs.go
Outdated
// Put log events and update sequence token. | ||
// Possible errors http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html | ||
func (prod *CloudwatchLogs) upload(msg *core.Message) { | ||
logevents := make([]*cloudwatchlogs.InputLogEvent, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You create logevents here and upload them later, but there is no data added to it ...?
Also this looks like you actually want to upload more than one event at a time.
As of this I would suggest you base this plugin on core.BatchProducer which gives you a list of messages instead of just one message.
I would suggest to have a look at producer.Kinesis for that.
producer/cloudwatchlogs.go
Outdated
if conf.GetString("group", "") == "" { | ||
prod.Logger.Error("group name can not be empty") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prod.service is not initialized at all.
If you need help about creation of the session and config object have a look at producer.Kinesis, producer.S3 or consumer.Kinesis (and of course the AWS SDK docs)
Like written in my code comments - for AWS a core.BatchProducer is normally the best option as you get a batch of messages instead of single messages. As an advice - have a look at the other AWS producers (Kinesis and S3). Most of the common stuff is in there.
Normally we use TextToJSON or write a custom formatter for that. After that you can use the different JSON processors (processJSON, templateJSON) to modify your message. This process will change with 0.6.0 (see #171) but for now this is how it works. Also - please test your plugin on AWS and notify us when this actually works. |
Thx for your feedback. I will let you know when I am finished. |
How can I set default max batch size ? In kinesis producer dosc state that |
The batch settings comes from the embedded batchedproducer. But the docs are not correct. The new configuration have to look like Batch:
MaxCount: 8192
FlushCount: 4096
TimeoutSec: 5 We will update all docs by #100 |
Ok. Is there any way to set this in code ? I would like to set max limits based on AWS limits so they will not be exceeded by user configuring gollum. |
To fix this i would recommend this change to fix the current issue: // Configure initializes this producer with values from a plugin config.
func (prod *AwsCloudwatchLogs) Configure(conf core.PluginConfigReader) {
if prod.stream == "" {
prod.Logger.Error("LogStream can not be empty")
}
if prod.group == "" {
prod.Logger.Error("LogGroup can not be empty")
}
if conf.GetInt("Batch/MaxCount", maxBatchEvents) > maxBatchEvents {
conf.Errors.Pushf("Batch/MaxCount must be below %d", maxBatchEvents)
}
}
In this case the doc part should look like this example: // AwsCloudwatchLogs producer plugin
//
// The AwsCloudwatchLogs producer plugin sends messages to
// AWS Cloudwatch Logs service.
//
// Parameters
//
// - LogStream: xxxx
//
// - LogGroup: xxx
//
// Examples
//
// This example ...:
//
// CwLogs:
// Type: AwsCloudwatchLogs:
// LogStream: stream_name
// LogGroup: group_name
// Credential:
// Type: shared
// After the requested changes this PR should be mergeable! :) |
I will catch you later. I am busy now. Thx for reply. |
Hi. Do you need any thing else from this PR ? |
@luqasz : Sorry for the delay! We will merge this PR after the final v0.5.0 release. Last time I saw no further issues with this PR - so i think we are able to merge this directly after the release. |
Handle errors in a separate function. Create group, stream if needed. Fixes errors when no group / stream is present when uploading.
I've allowed myself to add a separate function to handle upload results. Usefull when group / stream was removed during program runtime. Gollum will create group / stream (without uploading batch) so next uploads will succeed. |
@arnecls @msiebeneicher Do you need anything from me to merge this PR ? |
LGTM. If @msiebeneicher does not find anything I'll do the merge on Wednesday when I'm back at the office. |
Merged. Will be released along with 0.5.2 |
Hi
I'd like to add cloudwatch logs producer to gollum. AWS puts some limitations on this service which are written in this plugin constants. I have couple of problems here.
Can you please give some advice / guide ?