/
action.go
146 lines (126 loc) · 4.06 KB
/
action.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package fdr
import (
"context"
"encoding/json"
"errors"
"io"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/m-mizutani/goerr"
"github.com/m-mizutani/hatchery/pkg/domain/config"
"github.com/m-mizutani/hatchery/pkg/domain/interfaces"
"github.com/m-mizutani/hatchery/pkg/domain/model"
"github.com/m-mizutani/hatchery/pkg/infra"
"github.com/m-mizutani/hatchery/pkg/utils"
)
type fdrMessage struct {
Bucket string `json:"bucket"`
Cid string `json:"cid"`
FileCount int64 `json:"fileCount"`
Files []file `json:"files"`
PathPrefix string `json:"pathPrefix"`
Timestamp int64 `json:"timestamp"`
TotalSize int64 `json:"totalSize"`
}
type file struct {
Checksum string `json:"checksum"`
Path string `json:"path"`
Size int64 `json:"size"`
}
type fdrClients struct {
infra *infra.Clients
sqs interfaces.SQS
s3 interfaces.S3
}
func Exec(ctx context.Context, clients *infra.Clients, req *config.FalconDataReplicatorImpl) error {
// Create an AWS session
awsSession, err := session.NewSession(&aws.Config{
Region: aws.String(req.AwsRegion),
Credentials: credentials.NewCredentials(&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: req.AwsAccessKeyId,
SecretAccessKey: req.AwsSecretAccessKey,
},
}),
})
if err != nil {
return goerr.Wrap(err, "failed to create AWS session").With("req", req)
}
// Create AWS service clients
sqsClient := clients.NewSQS(awsSession)
s3Client := clients.NewS3(awsSession)
prefix := config.LogObjNamePrefix(req, utils.CtxNow(ctx))
// Receive messages from SQS queue
input := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(req.SqsUrl),
}
if req.MaxMessages != nil {
input.MaxNumberOfMessages = aws.Int64(int64(*req.MaxMessages))
}
for i := 0; ; i++ {
if req.MaxPulls != nil && i >= *req.MaxPulls {
break
}
c := &fdrClients{infra: clients, sqs: sqsClient, s3: s3Client}
if err := copy(ctx, c, input, model.CSBucket(req.Bucket), prefix); err != nil {
if err == errNoMoreMessage {
break
}
return err
}
}
return nil
}
var (
errNoMoreMessage = errors.New("no more message")
)
func copy(ctx context.Context, clients *fdrClients, input *sqs.ReceiveMessageInput, bucket model.CSBucket, prefix model.CSObjectName) error {
result, err := clients.sqs.ReceiveMessageWithContext(ctx, input)
if err != nil {
return goerr.Wrap(err, "failed to receive messages from SQS").With("input", input)
}
if len(result.Messages) == 0 {
return errNoMoreMessage
}
// Iterate over received messages
for _, message := range result.Messages {
// Get the S3 object key from the message
var msg fdrMessage
if err := json.Unmarshal([]byte(*message.Body), &msg); err != nil {
return goerr.Wrap(err, "failed to unmarshal message").With("message", *message.Body)
}
for _, file := range msg.Files {
// Download the object from S3
s3Input := &s3.GetObjectInput{
Bucket: aws.String(msg.Bucket),
Key: aws.String(file.Path),
}
s3Obj, err := clients.s3.GetObjectWithContext(ctx, s3Input)
if err != nil {
return goerr.Wrap(err, "failed to download object from S3").With("msg", msg)
}
defer utils.SafeClose(s3Obj.Body)
csObj := prefix + model.CSObjectName(file.Path)
w := clients.infra.CloudStorage().NewObjectWriter(ctx, bucket, csObj)
if _, err := io.Copy(w, s3Obj.Body); err != nil {
return goerr.Wrap(err, "failed to write object to GCS").With("msg", msg)
}
if err := w.Close(); err != nil {
return goerr.Wrap(err, "failed to close object writer").With("msg", msg)
}
utils.CtxLogger(ctx).Info("FDR: object forwarded from S3 to GCS", "s3", s3Input, "gcsObj", csObj)
}
// Delete the message from SQS
_, err = clients.sqs.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: input.QueueUrl,
ReceiptHandle: message.ReceiptHandle,
})
if err != nil {
return goerr.Wrap(err, "failed to delete message from SQS")
}
}
return nil
}