/
process.go
383 lines (310 loc) · 11.1 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
/*
Copyright 2022 TriggerMesh Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package awssqssource
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"go.uber.org/zap"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/aws/aws-sdk-go/service/eventbridge"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/triggermesh/triggermesh/pkg/apis/sources/v1alpha1"
)
// A message processor processes SQS messages (sends as CloudEvent)
// sequentially, as soon as they are written to processQueue.
func (a *adapter) runMessagesProcessor(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-a.processQueue:
a.sr.reportMessageDequeuedProcessCount()
a.logger.Debugw("Processing message", zap.String(logfieldMsgID, *msg.MessageId))
events, err := a.msgPrcsr.Process(msg)
if err != nil {
a.logger.Errorw("Failed to process SQS message", zap.Error(err),
zap.String(logfieldMsgID, *msg.MessageId))
continue
}
sendError := false
for _, event := range events {
if err := sendSQSEvent(ctx, a.ceClient, event); err != nil {
a.logger.Errorw("Failed to send event to the sink", zap.Error(err))
sendError = true
break
}
}
if sendError {
continue
}
a.deleteQueue <- msg
a.sr.reportMessageEnqueuedDeleteCount()
}
}
}
// sendSQSEvent sends a single SQS message as a CloudEvent to the event sink.
func sendSQSEvent(ctx context.Context, cli cloudevents.Client, event *cloudevents.Event) error {
if result := cli.Send(ctx, *event); !cloudevents.IsACK(result) {
return result
}
return nil
}
// MessageProcessor converts SQS messages to CloudEvents.
type MessageProcessor interface {
Process(*sqs.Message) ([]*cloudevents.Event, error)
}
var (
_ MessageProcessor = (*defaultMessageProcessor)(nil)
_ MessageProcessor = (*s3MessageProcessor)(nil)
)
// defaultMessageProcessor is the default message processor.
type defaultMessageProcessor struct {
ceSource string
}
// Process implements MessageProcessor.
func (p *defaultMessageProcessor) Process(msg *sqs.Message) ([]*cloudevents.Event, error) {
event, err := makeSQSEvent(msg, p.ceSource)
if err != nil {
return nil, fmt.Errorf("creating CloudEvent from SQS message: %w", err)
}
return []*cloudevents.Event{event}, nil
}
// makeSQSEvent returns a CloudEvent for a generic SQS message.
func makeSQSEvent(msg *sqs.Message, srcAttr string) (*cloudevents.Event, error) {
event := cloudevents.NewEvent()
event.SetType(v1alpha1.AWSEventType(sqs.ServiceName, v1alpha1.AWSSQSGenericEventType))
event.SetSource(srcAttr)
event.SetID(*msg.MessageId)
for name, val := range ceExtensionAttrsForMessage(msg) {
event.SetExtension(name, val)
}
if err := event.SetData(cloudevents.ApplicationJSON, toCloudEventData(msg)); err != nil {
return nil, fmt.Errorf("setting CloudEvent data: %w", err)
}
return &event, nil
}
// toCloudEventData returns a SQS message in a shape that is suitable for JSON
// serialization inside some CloudEvent data.
func toCloudEventData(msg *sqs.Message) interface{} {
if msg.Body == nil {
return msg
}
var data interface{}
data = msg
// if msg.Body contains raw JSON data, type it as json.RawMessage so it
// doesn't get encoded to base64 during the serialization of the
// CloudEvent data.
if json.Valid([]byte(*msg.Body)) {
data = &messageWithRawJSONBody{
Body: json.RawMessage([]byte(*msg.Body)),
Message: msg,
}
}
return data
}
// messageWithRawJSONBody is a SQS Message with a RawMessage-typed JSON body.
type messageWithRawJSONBody struct {
Body json.RawMessage
*sqs.Message
}
// s3MessageProcessor processes messages originating from S3 buckets.
type s3MessageProcessor struct {
// this value is set as the "source" CE context attribute when the S3
// processor handles messages which are not originating from S3
ceSourceFallback string
}
// Process implements MessageProcessor.
//
// This processor discards everything from the given message except its body,
// which must be in JSON format. If the body contains multiple records, each
// record is converted to an individual event.
//
// Expected events structure: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
func (p *s3MessageProcessor) Process(msg *sqs.Message) ([]*cloudevents.Event, error) {
var events []*cloudevents.Event
bodyData := make(map[string]interface{})
if err := json.Unmarshal([]byte(*msg.Body), &bodyData); err != nil {
// if the data is not a JSON object, we can be certain the
// message didn't originate from S3, and fall back to the
// default processor's behaviour
event, err := makeSQSEvent(msg, p.ceSourceFallback)
if err != nil {
return nil, fmt.Errorf("creating CloudEvent from SQS message: %w", err)
}
return append(events, event), nil
}
var records []interface{}
recordsVal, hasRecords := bodyData["Records"]
if hasRecords {
records, hasRecords = recordsVal.([]interface{})
}
switch {
case hasRecords:
for _, record := range records {
event, err := makeS3EventFromRecord(record)
if err != nil {
return nil, fmt.Errorf("creating CloudEvent from S3 event record: %w", err)
}
events = append(events, event)
}
// special case: test events are sent whenever event notifications are
// re-configured in a S3 bucket
case isTestEventPayload(bodyData):
body := json.RawMessage([]byte(*msg.Body))
event := cloudevents.NewEvent()
event.SetType(v1alpha1.AWSEventType(s3.ServiceName, v1alpha1.AWSS3TestEventType))
event.SetSource("arn:aws:s3:::" + bodyData["Bucket"].(string))
event.SetID(*msg.MessageId)
if err := event.SetData(cloudevents.ApplicationJSON, body); err != nil {
return nil, fmt.Errorf("setting CloudEvent data: %w", err)
}
events = append(events, &event)
// instead of discarding non-S3 events, fall back to the default processor's behaviour
default:
event, err := makeSQSEvent(msg, p.ceSourceFallback)
if err != nil {
return nil, fmt.Errorf("creating CloudEvent from SQS message: %w", err)
}
events = append(events, event)
}
return events, nil
}
// makeS3EventFromRecord returns a CloudEvent for the given S3 event record.
func makeS3EventFromRecord(record interface{}) (*cloudevents.Event, error) {
recBytes, err := json.Marshal(record)
if err != nil {
return nil, fmt.Errorf("serializing S3 event record: %w", err)
}
data := json.RawMessage(recBytes)
recordData := record.(map[string]interface{})
eventName := recordData["eventName"].(string)
s3prop := recordData["s3"].(map[string]interface{})
bucketARN := s3prop["bucket"].(map[string]interface{})["arn"].(string)
objectKey := s3prop["object"].(map[string]interface{})["key"].(string)
event := cloudevents.NewEvent()
event.SetType(v1alpha1.AWSEventType(s3.ServiceName, ceTypeFromS3Event(eventName)))
event.SetSource(bucketARN)
event.SetSubject(objectKey)
if err := event.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("setting CloudEvent data: %w", err)
}
return &event, nil
}
// ceTypeFromS3Event returns the name of a S3 event in a format that is
// suitable for the "type" context attribute of a CloudEvent.
func ceTypeFromS3Event(eventName string) string {
// Example: "ObjectRemoved:DeleteMarkerCreated" -> "objectremoved"
return strings.ToLower(strings.SplitN(eventName, ":", 2)[0])
}
// isTestEventPayload checks whether the provided payload data corresponds to a
// test event from S3.
func isTestEventPayload(data map[string]interface{}) bool {
v, ok := data["Service"]
if !ok {
return false
}
if v, ok = v.(string); !ok || v != "Amazon S3" {
return false
}
v, ok = data["Event"]
if !ok {
return false
}
if v, ok = v.(string); !ok || v != "s3:TestEvent" {
return false
}
return true
}
// eventbridgeMessageProcessor processes messages originating from EventBridge.
type eventbridgeMessageProcessor struct {
// this value is set as the "source" CE context attribute on messages
// that originate from EventBridge
ceSource string
// this value is set as the "source" CE context attribute when the
// EventBridge processor handles messages which are not originating
// from EventBridge
ceSourceFallback string
}
// Process implements MessageProcessor.
//
// This processor discards everything from the given message except its body,
// which must be in JSON format.
//
// Expected events structure: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-events.html
func (p *eventbridgeMessageProcessor) Process(msg *sqs.Message) ([]*cloudevents.Event, error) {
var events []*cloudevents.Event
bodyData := make(map[string]interface{})
if err := json.Unmarshal([]byte(*msg.Body), &bodyData); err != nil {
// if the data is not a JSON object, we can be certain the
// message didn't originate from EventBridge, and fall back to
// the default processor's behaviour
event, err := makeSQSEvent(msg, p.ceSourceFallback)
if err != nil {
return nil, fmt.Errorf("creating CloudEvent from SQS message: %w", err)
}
return append(events, event), nil
}
switch {
case isEventBridgeEvent(bodyData):
event, err := makeEventBridgeEvent(bodyData, p.ceSource)
if err != nil {
return nil, fmt.Errorf("creating CloudEvent from EventBridge event: %w", err)
}
events = append(events, event)
// instead of discarding non-EventBridge events, fall back to the
// default processor's behaviour
default:
event, err := makeSQSEvent(msg, p.ceSourceFallback)
if err != nil {
return nil, fmt.Errorf("creating CloudEvent from SQS message: %w", err)
}
events = append(events, event)
}
return events, nil
}
// isEventBridgeEvent returns whether the given data represents a valid
// EventBridge event.
func isEventBridgeEvent(data map[string]interface{}) bool {
if _, ok := data["detail"]; !ok {
return false
}
if _, ok := data["detail-type"]; !ok {
return false
}
_, ok := data["source"]
return ok
}
// makeEventBridgeEvent returns a CloudEvent for the given EventBridge event.
func makeEventBridgeEvent(data map[string]interface{}, srcAttr string) (*cloudevents.Event, error) {
event := cloudevents.NewEvent()
event.SetType(v1alpha1.AWSEventType(eventbridge.EndpointsID, v1alpha1.AWSEventBridgeGenericEventType))
event.SetSource(srcAttr)
event.SetExtension("awseventssource", data["source"])
event.SetExtension("awseventsdetailtype", data["detail-type"])
if id, ok := data["id"]; ok {
event.SetID(id.(string))
}
if t, ok := data["time"]; ok {
if ts, err := time.Parse(time.RFC3339, t.(string)); err == nil {
event.SetTime(ts)
}
}
if err := event.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("setting CloudEvent data: %w", err)
}
return &event, nil
}