-
-
Notifications
You must be signed in to change notification settings - Fork 158
/
Copy pathexporter.go
87 lines (70 loc) · 2.31 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package pubsubexporter
import (
"cloud.google.com/go/pubsub"
"context"
"encoding/json"
"github.com/thomaspoignant/go-feature-flag/exporter"
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
"google.golang.org/api/option"
)
// Exporter publishes events on a PubSub topic.
type Exporter struct {
// ProjectID is a project to which the PubSub topic belongs.
ProjectID string
// Topic is the name of a topic on which messages will be published.
Topic string
// Options are Google Cloud API options to connect to PubSub.
Options []option.ClientOption
// PublishSettings controls the bundling of published messages.
// If not set pubsub.DefaultPublishSettings are used.
PublishSettings *pubsub.PublishSettings
// EnableMessageOrdering enables the delivery of ordered keys.
EnableMessageOrdering bool
// newClientFunc is used only for unit testing purposes.
newClientFunc func(context.Context, string, ...option.ClientOption) (*pubsub.Client, error)
// publisher facilitates publishing messages on a PubSub topic.
publisher *pubsub.Topic
}
// Export publishes a PubSub message for each exporter.FeatureEvent received.
func (e *Exporter) Export(ctx context.Context, _ *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error {
if e.publisher == nil {
if err := e.initPublisher(ctx); err != nil {
return err
}
}
for _, event := range featureEvents {
messageBody, err := json.Marshal(event)
if err != nil {
return err
}
_, err = e.publisher.Publish(ctx, &pubsub.Message{
Data: messageBody,
Attributes: map[string]string{"emitter": "GO Feature Flag"},
}).Get(ctx)
if err != nil {
return err
}
}
return nil
}
// IsBulk always returns false as PubSub exporter sends each exporter.FeatureEvent as a separate message.
func (e *Exporter) IsBulk() bool {
return false
}
// initPublisher inits PubSub topic publisher according to the provided configuration.
func (e *Exporter) initPublisher(ctx context.Context) error {
if e.newClientFunc == nil {
e.newClientFunc = pubsub.NewClient
}
client, err := e.newClientFunc(ctx, e.ProjectID, e.Options...)
if err != nil {
return err
}
topic := client.Topic(e.Topic)
if e.PublishSettings != nil {
topic.PublishSettings = *e.PublishSettings
}
topic.EnableMessageOrdering = e.EnableMessageOrdering
e.publisher = topic
return nil
}