forked from taggledevel2/ratchet
/
s3_writer.go
46 lines (38 loc) · 1.54 KB
/
s3_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package processors
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/dailyburn/ratchet/data"
"github.com/dailyburn/ratchet/util"
)
// By default, we will not compress data before sending it upstream to S3. Set the `Compress`
// flag to true to use gzip compression before storing in S3 (if this flag is set to true,
// ".gz" will automatically be appended to the key name specified).
//
// By default, we will separate each iteration of data sent to `ProcessData` with a new line
// when we piece back together to send to S3. Change the `LineSeparator` attribute to change
// this behavior.
type S3Writer struct {
data []string
Compress bool
LineSeparator string
config *aws.Config
bucket string
key string
}
func NewS3Writer(awsID, awsSecret, awsRegion, bucket, key string) *S3Writer {
w := S3Writer{bucket: bucket, key: key, LineSeparator: "\n", Compress: false}
creds := credentials.NewStaticCredentials(awsID, awsSecret, "")
// .WithLogLevel(aws.LogDebugWithRequestRetries | aws.LogDebugWithRequestErrors)
w.config = aws.NewConfig().WithRegion(awsRegion).WithDisableSSL(true).WithCredentials(creds)
return &w
}
func (w *S3Writer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) {
w.data = append(w.data, string(d))
}
func (w *S3Writer) Finish(outputChan chan data.JSON, killChan chan error) {
util.WriteS3Object(w.data, w.config, w.bucket, w.key, w.LineSeparator, w.Compress)
}
func (w *S3Writer) String() string {
return "S3Writer"
}