-
Notifications
You must be signed in to change notification settings - Fork 16
/
topic.go
222 lines (192 loc) · 5.68 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package sns
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"os"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sns/snsiface"
"github.com/zerofox-oss/go-aws-msg/retryer"
msg "github.com/zerofox-oss/go-msg"
b64 "github.com/zerofox-oss/go-msg/decorators/base64"
)
// Topic configures and manages SNSAPI for sns.MessageWriter.
type Topic struct {
Svc snsiface.SNSAPI
TopicARN string
session *session.Session
}
func getConf(t *Topic) (*aws.Config, error) {
svc, ok := t.Svc.(*sns.SNS)
if !ok {
return nil, errors.New("svc could not be casted to a SNS client")
}
return &svc.Client.Config, nil
}
// Option is the signature that modifies a `Topic` to set some configuration
type Option func(*Topic) error
// WithCustomRetryer sets a custom `Retryer` to use on the SQS client.
func WithCustomRetryer(r request.Retryer) Option {
return func(t *Topic) error {
c, err := getConf(t)
if err != nil {
return err
}
c.Retryer = r
t.Svc = sns.New(t.session, c)
return nil
}
}
// WithRetries makes the `Server` retry on credential errors until
// `max` attempts with `delay` seconds between requests.
// This is needed in scenarios where credentials are automatically generated
// and the program starts before AWS finishes propagating them
func WithRetries(delay time.Duration, max int) Option {
return func(t *Topic) error {
c, err := getConf(t)
if err != nil {
return err
}
c.Retryer = retryer.DefaultRetryer{
Retryer: client.DefaultRetryer{NumMaxRetries: max},
Delay: delay,
}
t.Svc = sns.New(t.session, c)
return nil
}
}
// NewTopic returns a sns.Topic with fully configured SNSAPI.
//
// Note: SQS has limited support for unicode characters.
// - See http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-messages.html
// Because we use SNS and SQS together, we recommend
// that SNS messages are base64-encoded as a best practice.
// You may use NewUnencodedTopic if you wish to ignore the encoding step.
func NewTopic(topicARN string, opts ...Option) (msg.Topic, error) {
topic, err := NewUnencodedTopic(topicARN, opts...)
if err != nil {
return nil, err
}
return b64.Encoder(topic), nil
}
// NewUnencodedTopic creates an concrete SNS msg.Topic
//
// Messages published by the `Topic` returned will not
// have the body base64-encoded.
func NewUnencodedTopic(topicARN string, opts ...Option) (msg.Topic, error) {
conf := &aws.Config{
Credentials: credentials.NewCredentials(&credentials.EnvProvider{}),
Region: aws.String("us-west-2"),
}
// You may override AWS_REGION, SNS_ENDPOINT
// http://docs.aws.amazon.com/sdk-for-go/api/aws/client/#Config
if r := os.Getenv("AWS_REGION"); r != "" {
conf.Region = aws.String(r)
}
if url := os.Getenv("SNS_ENDPOINT"); url != "" {
conf.Endpoint = aws.String(url)
}
sess, err := session.NewSession(conf)
if err != nil {
return nil, err
}
t := &Topic{
Svc: sns.New(sess),
TopicARN: topicARN,
session: sess,
}
// Default retryer
if err = WithRetries(2*time.Second, 7)(t); err != nil {
return nil, err
}
for _, opt := range opts {
if err = opt(t); err != nil {
return nil, fmt.Errorf("cannot set option: %s", err)
}
}
return t, nil
}
// NewWriter returns a sns.MessageWriter instance for writing to
// the configured SNS topic.
func (t *Topic) NewWriter(ctx context.Context) msg.MessageWriter {
return &MessageWriter{
attributes: make(map[string][]string),
snsClient: t.Svc,
topicARN: t.TopicARN,
ctx: ctx,
}
}
// MessageWriter writes data to an output SNS topic as configured via its
// topicARN.
type MessageWriter struct {
msg.MessageWriter
attributes msg.Attributes
buf bytes.Buffer
closed bool
mux sync.Mutex
snsClient snsiface.SNSAPI
topicARN string
ctx context.Context
}
// Attributes returns the msg.Attributes associated with the MessageWriter.
func (w *MessageWriter) Attributes() *msg.Attributes {
return &w.attributes
}
// Close converts the MessageWriter's Body and Attributes to sns.PublishInput
// in order to publish itself to the MessageWriter's snsClient.
//
// On the first call to Close, the MessageWriter is set to "isClosed" therefore
// blocking subsequent Close and Write calls.
func (w *MessageWriter) Close() error {
w.mux.Lock()
defer w.mux.Unlock()
if w.closed {
return msg.ErrClosedMessageWriter
}
w.closed = true
params := &sns.PublishInput{
Message: aws.String(w.buf.String()),
TopicArn: aws.String(w.topicARN),
}
if len(*w.Attributes()) > 0 {
params.MessageAttributes = buildSNSAttributes(w.Attributes())
}
log.Printf("[TRACE] writing to sns: %v", params)
_, err := w.snsClient.PublishWithContext(w.ctx, params)
return err
}
// Write writes data to the MessageWriter's internal buffer for aggregation
// before a .Close()
//
// After a MessageWriter's .Close() method has been called, it is no longer
// available for .Write() calls.
func (w *MessageWriter) Write(p []byte) (int, error) {
w.mux.Lock()
defer w.mux.Unlock()
if w.closed {
return 0, msg.ErrClosedMessageWriter
}
return w.buf.Write(p)
}
// buildSNSAttributes converts msg.Attributes into SNS message attributes.
// uses csv encoding to use AWS's String datatype
func buildSNSAttributes(a *msg.Attributes) map[string]*sns.MessageAttributeValue {
attrs := make(map[string]*sns.MessageAttributeValue)
for k, v := range *a {
attrs[k] = &sns.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(strings.Join(v, ",")),
}
}
return attrs
}