forked from stripe/veneur
-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3.go
134 lines (111 loc) · 3.24 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package s3
import (
"bytes"
"compress/gzip"
"encoding/csv"
"errors"
"io"
"path"
"strconv"
"time"
"github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stripe/veneur/plugins"
"github.com/stripe/veneur/samplers"
)
// TODO set log level
var _ plugins.Plugin = &S3Plugin{}
type S3Plugin struct {
Logger *logrus.Logger
Svc s3iface.S3API
S3Bucket string
Hostname string
}
func (p *S3Plugin) Flush(metrics []samplers.DDMetric, hostname string) error {
const Delimiter = '\t'
const IncludeHeaders = false
csv, err := EncodeDDMetricsCSV(metrics, Delimiter, IncludeHeaders, p.Hostname)
if err != nil {
p.Logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"metrics": len(metrics),
}).Error("Could not marshal metrics before posting to s3")
return err
}
err = p.S3Post(hostname, csv, tsvGzFt)
if err != nil {
p.Logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"metrics": len(metrics),
}).Error("Error posting to s3")
return err
}
p.Logger.WithField("metrics", len(metrics)).Debug("Completed flush to s3")
return nil
}
func (p *S3Plugin) Name() string {
return "s3"
}
type filetype string
const (
jsonFt filetype = "json"
csvFt = "csv"
tsvFt = "tsv"
tsvGzFt = "tsv.gz"
)
var S3Bucket = "stripe-veneur"
var S3ClientUninitializedError = errors.New("s3 client has not been initialized")
func (p *S3Plugin) S3Post(hostname string, data io.ReadSeeker, ft filetype) error {
if p.Svc == nil {
return S3ClientUninitializedError
}
params := &s3.PutObjectInput{
Bucket: aws.String(S3Bucket),
Key: S3Path(hostname, ft),
Body: data,
}
_, err := p.Svc.PutObject(params)
return err
}
func S3Path(hostname string, ft filetype) *string {
t := time.Now()
filename := strconv.FormatInt(t.Unix(), 10) + "." + string(ft)
return aws.String(path.Join(t.Format("2006/01/02"), hostname, filename))
}
// EncodeDDMetricsCSV returns a reader containing the gzipped CSV representation of the
// DDMetrics data, one row per DDMetric.
// the AWS sdk requires seekable input, so we return a ReadSeeker here
func EncodeDDMetricsCSV(metrics []samplers.DDMetric, delimiter rune, includeHeaders bool, hostname string) (io.ReadSeeker, error) {
b := &bytes.Buffer{}
gzw := gzip.NewWriter(b)
w := csv.NewWriter(gzw)
w.Comma = delimiter
if includeHeaders {
// Write the headers first
headers := [...]string{
// the order here doesn't actually matter
// as long as the keys are right
TsvName: TsvName.String(),
TsvTags: TsvTags.String(),
TsvMetricType: TsvMetricType.String(),
TsvHostname: TsvHostname.String(),
TsvDeviceName: TsvDeviceName.String(),
TsvInterval: TsvInterval.String(),
TsvVeneurHostname: TsvVeneurHostname.String(),
TsvValue: TsvValue.String(),
TsvTimestamp: TsvTimestamp.String(),
TsvPartition: TsvPartition.String(),
}
w.Write(headers[:])
}
// TODO avoid edge case at midnight
partitionDate := time.Now()
for _, metric := range metrics {
EncodeDDMetricCSV(metric, w, &partitionDate, hostname)
}
w.Flush()
gzw.Close()
return bytes.NewReader(b.Bytes()), w.Error()
}