-
Notifications
You must be signed in to change notification settings - Fork 174
/
s3.go
132 lines (110 loc) · 3.21 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package s3
import (
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"errors"
"io"
"path"
"strconv"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/sirupsen/logrus"
"github.com/stripe/veneur/plugins"
"github.com/stripe/veneur/samplers"
)
// TODO set log level
var _ plugins.Plugin = &S3Plugin{}
type S3Plugin struct {
Logger *logrus.Logger
Svc s3iface.S3API
S3Bucket string
Hostname string
Interval int
}
func (p *S3Plugin) Flush(ctx context.Context, metrics []samplers.InterMetric) error {
const Delimiter = '\t'
const IncludeHeaders = false
csv, err := EncodeInterMetricsCSV(metrics, Delimiter, IncludeHeaders, p.Hostname, p.Interval)
if err != nil {
p.Logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"metrics": len(metrics),
}).Error("Could not marshal metrics before posting to s3")
return err
}
err = p.S3Post(p.Hostname, csv, tsvGzFt)
if err != nil {
p.Logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"metrics": len(metrics),
}).Error("Error posting to s3")
return err
}
p.Logger.WithField("metrics", len(metrics)).Debug("Completed flush to s3")
return nil
}
func (p *S3Plugin) Name() string {
return "s3"
}
type filetype string
const (
jsonFt filetype = "json"
csvFt = "csv"
tsvFt = "tsv"
tsvGzFt = "tsv.gz"
)
var S3ClientUninitializedError = errors.New("s3 client has not been initialized")
func (p *S3Plugin) S3Post(hostname string, data io.ReadSeeker, ft filetype) error {
if p.Svc == nil {
return S3ClientUninitializedError
}
params := &s3.PutObjectInput{
Bucket: aws.String(p.S3Bucket),
Key: S3Path(hostname, ft),
Body: data,
}
_, err := p.Svc.PutObject(params)
return err
}
func S3Path(hostname string, ft filetype) *string {
t := time.Now()
filename := strconv.FormatInt(t.Unix(), 10) + "." + string(ft)
return aws.String(path.Join(t.Format("2006/01/02"), hostname, filename))
}
// EncodeInterMetricsCSV returns a reader containing the gzipped CSV representation of the
// InterMetric data, one row per InterMetric.
// the AWS sdk requires seekable input, so we return a ReadSeeker here
func EncodeInterMetricsCSV(metrics []samplers.InterMetric, delimiter rune, includeHeaders bool, hostname string, interval int) (io.ReadSeeker, error) {
b := &bytes.Buffer{}
gzw := gzip.NewWriter(b)
w := csv.NewWriter(gzw)
w.Comma = delimiter
if includeHeaders {
// Write the headers first
headers := [...]string{
// the order here doesn't actually matter
// as long as the keys are right
TsvName: TsvName.String(),
TsvTags: TsvTags.String(),
TsvMetricType: TsvMetricType.String(),
TsvInterval: TsvInterval.String(),
TsvVeneurHostname: TsvVeneurHostname.String(),
TsvValue: TsvValue.String(),
TsvTimestamp: TsvTimestamp.String(),
TsvPartition: TsvPartition.String(),
}
w.Write(headers[:])
}
// TODO avoid edge case at midnight
partitionDate := time.Now()
for _, metric := range metrics {
EncodeInterMetricCSV(metric, w, &partitionDate, hostname, interval)
}
w.Flush()
gzw.Close()
return bytes.NewReader(b.Bytes()), w.Error()
}