forked from grailbio/reflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cloudwatch.go
133 lines (117 loc) · 3.69 KB
/
cloudwatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright 2017 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package local
import (
"errors"
"io"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grailbio/reflow/log"
)
// TODO(pgopal) - Put this code into a separate package and inject the
// implementation via Config.
var errDropped = errors.New("dropped log message: buffer full")
type streamType string
const (
stdout streamType = "stdout"
stderr = "stderr"
)
// remoteStream is an interface to log to cloud but must be closed once all
// the streams are done. Closing before all the streams are done writing
// can cause panic.
type remoteStream interface {
io.Closer
// NewStream creates a new stream for logging. Creating two remote
// loggers that write to the same output stream is undefined.
NewStream(string, streamType) (log.Outputter, error)
}
type logEntry struct {
stream string
msg string
}
// cloudWatchLogs implements a client that can stream logs to Amazon CloudWatch
// Logs.
type cloudWatchLogs struct {
client cloudwatchlogsiface.CloudWatchLogsAPI
group string
stream string
buffer chan logEntry
}
// NewCloudWatchLogs creates a new remote logger client for Amazon
// CloudWatchLogs. The remoteLogger client can be used to create a new stream
// and log to them.
func newCloudWatchLogs(client cloudwatchlogsiface.CloudWatchLogsAPI, group string) (remoteStream, error) {
cwl := &cloudWatchLogs{client: client, group: group}
_, err := cwl.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{LogGroupName: aws.String(group)})
if err != nil {
aerr, ok := err.(awserr.Error)
if !ok || ok && aerr.Code() != cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
return nil, err
}
}
cwl.buffer = make(chan logEntry, 1024)
cwl.loop()
return cwl, nil
}
// NewStream creates new stream with the given stream prefix and type.
func (c *cloudWatchLogs) NewStream(prefix string, sType streamType) (log.Outputter, error) {
stream := &cloudWatchLogsStream{
client: c,
name: prefix + "/" + string(sType),
}
_, err := c.client.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(stream.client.group),
LogStreamName: aws.String(stream.name),
})
if err != nil {
aerr, ok := err.(awserr.Error)
if ok && aerr.Code() != cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
return nil, err
}
}
return stream, nil
}
func (c *cloudWatchLogs) Close() error {
close(c.buffer)
return nil
}
func (c *cloudWatchLogs) loop() {
go func() {
sequenceToken := make(map[string]*string)
for entry := range c.buffer {
event := []*cloudwatchlogs.InputLogEvent{{
Message: aws.String(entry.msg),
Timestamp: aws.Int64(time.Now().UnixNano() / 1000000),
}}
response, err := c.client.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogEvents: event,
LogGroupName: aws.String(c.group),
LogStreamName: aws.String(entry.stream),
SequenceToken: sequenceToken[entry.stream],
})
if err != nil {
log.Errorf("cloudWatchLogs.PutLogEvent: %v", err)
} else {
sequenceToken[entry.stream] = response.NextSequenceToken
}
}
}()
}
type cloudWatchLogsStream struct {
client *cloudWatchLogs
name string
}
// Output writes the contents of s to cloudwatchlogs via a buffer.
// If the buffer is full, logs are dropped on the floor.
func (s *cloudWatchLogsStream) Output(calldepth int, msg string) error {
select {
case s.client.buffer <- logEntry{s.name, msg}:
return nil
default:
return errDropped
}
}