forked from s12v/awsbeats
/
client.go
157 lines (137 loc) · 5.12 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package firehose
import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/codec/json"
"github.com/elastic/beats/libbeat/publisher"
)
type client struct {
firehose *firehose.Firehose
deliveryStreamName string
beatName string
encoder codec.Codec
timeout time.Duration
observer outputs.Observer
}
func newClient(sess *session.Session, config *FirehoseConfig, observer outputs.Observer, beat beat.Info) (*client, error) {
client := &client{
firehose: firehose.New(sess),
deliveryStreamName: config.DeliveryStreamName,
beatName: beat.Beat,
encoder: json.New(beat.Version, json.Config{Pretty: false, EscapeHTML: true}),
timeout: config.Timeout,
observer: observer,
}
return client, nil
}
func (client *client) String() string {
return "firehose"
}
func (client *client) Close() error {
return nil
}
func (client *client) Connect() error {
return nil
}
func (client *client) Publish(batch publisher.Batch) error {
events := batch.Events()
rest, _ := client.publishEvents(events)
if len(rest) == 0 {
// We have to ACK only when all the submission succeeded
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L232
batch.ACK()
} else {
// Mark the failed events to retry
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L234
batch.RetryEvents(rest)
}
// This shouldn't be an error object according to other official beats' implementations
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/kafka/client.go#L119
return nil
}
func (client *client) publishEvents(events []publisher.Event) ([]publisher.Event, error) {
observer := client.observer
observer.NewBatch(len(events))
logp.NewLogger("firehose").Debug("received events: %v", events)
okEvents, records, dropped := client.mapEvents(events)
logp.NewLogger("firehose").Debug("sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(okEvents))
logp.NewLogger("firehose").Debug("mapped to records: %v", records)
res, err := client.sendRecords(records)
failed := collectFailedEvents(res, events)
if err != nil && len(failed) == 0 {
failed = events
}
if len(failed) > 0 {
logp.NewLogger("firehose").Info("retrying %d events on error: %v", len(failed), err)
}
return failed, err
}
func (client *client) mapEvents(events []publisher.Event) ([]publisher.Event, []*firehose.Record, int) {
dropped := 0
records := make([]*firehose.Record, 0, len(events))
okEvents := make([]publisher.Event, 0, len(events))
for _, event := range events {
record, err := client.mapEvent(&event)
if err != nil {
logp.NewLogger("firehose").Warn("failed to map event(%v): %v", event, err)
dropped++
} else {
okEvents = append(okEvents, event)
records = append(records, record)
}
}
return okEvents, records, dropped
}
func (client *client) mapEvent(event *publisher.Event) (*firehose.Record, error) {
var buf []byte
{
serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content)
if err != nil {
if !event.Guaranteed() {
return nil, err
}
logp.NewLogger("firehose").Error("Unable to encode event: %v", err)
return nil, err
}
// See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164
// You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data.
buf = make([]byte, len(serializedEvent)+1)
copy(buf, serializedEvent)
// Firehose doesn't automatically add trailing new-line on after each record.
// This ends up a stream->firehose->s3 pipeline to produce useless s3 objects.
// No ndjson, but a sequence of json objects without separators...
// Fix it just adding a new-line.
//
// See https://stackoverflow.com/questions/43010117/writing-properly-formatted-json-to-s3-to-load-in-athena-redshift
buf[len(buf)-1] = byte('\n')
}
return &firehose.Record{Data: buf}, nil
}
func (client *client) sendRecords(records []*firehose.Record) (*firehose.PutRecordBatchOutput, error) {
request := firehose.PutRecordBatchInput{
DeliveryStreamName: &client.deliveryStreamName,
Records: records,
}
return client.firehose.PutRecordBatch(&request)
}
func collectFailedEvents(res *firehose.PutRecordBatchOutput, events []publisher.Event) []publisher.Event {
if aws.Int64Value(res.FailedPutCount) > 0 {
failedEvents := make([]publisher.Event, 0)
responses := res.RequestResponses
for i, r := range responses {
if aws.StringValue(r.ErrorCode) != "" {
failedEvents = append(failedEvents, events[i])
}
}
return failedEvents
}
return []publisher.Event{}
}