This repository has been archived by the owner on Sep 16, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub_consumer.go
143 lines (115 loc) · 3.63 KB
/
pubsub_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package consumer
import (
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
"google.golang.org/api/option"
)
const ContextDuration time.Duration = 30 * time.Second
// Google PubSub consumer and message implementation
type googlePubSubConsumer struct {
Subscription *pubsub.Subscription
}
// Delegate to pubsub's Remove
func (c *googlePubSubConsumer) Remove() error {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
return c.Subscription.Delete(ctx)
}
type googlePubSubMessage struct {
OriginalMessage *pubsub.Message
}
// Delegate to pubsub message's data
func (m *googlePubSubMessage) Data() []byte {
return m.OriginalMessage.Data
}
// Delegate to pubsub message's Ack/Nack
func (m *googlePubSubMessage) Done(ack bool) {
if ack == true {
m.OriginalMessage.Ack()
} else {
m.OriginalMessage.Nack()
}
}
var defaultProjectId = "emulator-project-id"
func newPubSubClient() (*pubsub.Client, error) {
ctx := context.Background()
projectId := os.Getenv("PUBSUB_PROJECT_ID")
if projectId == "" {
projectId = defaultProjectId
}
var client *pubsub.Client
var err error
// Create a new client with token
keyfilePath := os.Getenv("PUBSUB_KEYFILE")
if keyfilePath != "" {
client, err = pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyfilePath))
} else {
// Create client without token
client, err = pubsub.NewClient(ctx, projectId)
}
return client, err
}
// Creates a new consumer
func NewConsumer(topicName string, subscriptionName string) Consumer {
pubsubClient, err := newPubSubClient()
if err != nil {
log.Fatalf("Could not create PubSub client: %v", err)
}
topic := ensureTopic(pubsubClient, topicName)
sub := ensureSubscription(pubsubClient, topic, subscriptionName)
return &googlePubSubConsumer{Subscription: sub}
}
// Finds or creates a topic
func ensureTopic(pubsubClient *pubsub.Client, topicName string) *pubsub.Topic {
var topic *pubsub.Topic
ctx, _ := context.WithTimeout(context.Background(), ContextDuration)
topic = pubsubClient.Topic(topicName)
topicExists, err := topic.Exists(ctx)
if err != nil {
log.Fatalf("Could not check if topic exists: %v", err)
}
if !topicExists {
new_topic, err := pubsubClient.CreateTopic(ctx, topicName)
if err != nil {
log.Fatalf("Could not create PubSub topic: %v", err)
}
topic = new_topic
}
return topic
}
// Finds or creates a subscription
func ensureSubscription(pubsubClient *pubsub.Client, topic *pubsub.Topic, subscriptionName string) *pubsub.Subscription {
var subscription *pubsub.Subscription
ctx, _ := context.WithTimeout(context.Background(), ContextDuration)
subscription = pubsubClient.Subscription(subscriptionName)
subscriptionExists, err := subscription.Exists(ctx)
if err != nil {
log.Fatalf("Could not check if subscription exists: %v", err)
}
if !subscriptionExists {
new_subscription, err := pubsubClient.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
log.Fatalf("Could not create PubSub subscription: %v", err)
}
subscription = new_subscription
}
return subscription
}
// Creates a channel that pulls messages from the subscription
func (consumer *googlePubSubConsumer) Consume() (chan Message, error) {
channel := make(chan Message)
go func() {
cctx, _ := context.WithTimeout(context.Background(), ContextDuration)
err := consumer.Subscription.Receive(cctx,
func(ctx context.Context, msg *pubsub.Message) {
wrappedMsg := &googlePubSubMessage{OriginalMessage: msg}
channel <- wrappedMsg
})
if err != nil {
log.Fatalf("Could not receive message from subscription: %v", err)
}
}()
return channel, nil
}