-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher.go
64 lines (56 loc) · 1.4 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cloud_pubsub
import (
"context"
"encoding/json"
"log"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
type Data struct {
Id string
Publisher string
Action string
RequestTime string
Data interface{}
}
type Publisher interface {
PublishMessage(ctx context.Context, data Data) (messageID string, err error)
}
func NewCloudPublisher(projectID, topicID, credentialsJSON string) (Publisher, error) {
ctx := context.Background()
var client *pubsub.Client
var err error
if credentialsJSON != "" {
client, err = pubsub.NewClient(ctx, projectID, option.WithCredentialsJSON([]byte(credentialsJSON)))
} else {
client, err = pubsub.NewClient(ctx, projectID)
}
if err != nil {
log.Printf("publisher.NewClient err: %v\n", err)
return nil, err
}
cleanUpCloudClient(client)
return &CloudPublisher{
ProjectID: projectID,
TopicID: topicID,
Client: client,
}, nil
}
type CloudPublisher struct {
ProjectID string
TopicID string
Client *pubsub.Client
}
func (cp *CloudPublisher) PublishMessage(ctx context.Context, pubSubData Data) (messageID string, err error) {
topic := cp.Client.Topic(cp.TopicID)
byteData, _ := json.Marshal(pubSubData)
result := topic.Publish(ctx, &pubsub.Message{
Data: byteData,
})
messageID, err = result.Get(ctx)
if err != nil {
log.Printf("publisher.Get: %v\n", err)
return messageID, err
}
return messageID, nil
}