-
Notifications
You must be signed in to change notification settings - Fork 1
/
pubsub.go
235 lines (203 loc) · 6.64 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Copyright (c) 2023 Z5Labs and Contributors
//
// This software is released under the MIT License.
// https://opensource.org/licenses/MIT
package pubsub
import (
"context"
"log/slog"
"github.com/z5labs/bedrock/pkg/noop"
"github.com/z5labs/bedrock/pkg/slogfield"
"github.com/z5labs/bedrock/queue"
"golang.org/x/sync/errgroup"
pubsubpb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type pubsubPullClient interface {
Pull(context.Context, *pubsubpb.PullRequest, ...gax.CallOption) (*pubsubpb.PullResponse, error)
}
type consumerOptions struct {
commonOptions
maxNumOfMessages int32
}
// ConsumerOption are options for configuring the Consumer.
type ConsumerOption interface {
applyConsumer(*consumerOptions)
}
type consumerOptionFunc func(*consumerOptions)
func (f consumerOptionFunc) applyConsumer(co *consumerOptions) {
f(co)
}
// MaxNumOfMessages defines the maximum number of messages which
// Google Cloud PubSub will return in a single response.
//
// PubSub never returns more messages than this value (however,
// fewer messages might be returned). Must be a positive integer.
func MaxNumOfMessages(n int32) ConsumerOption {
return consumerOptionFunc(func(co *consumerOptions) {
co.maxNumOfMessages = n
})
}
// Consumer consumes messages from Google Cloud PubSub.
type Consumer struct {
log *slog.Logger
pubsub pubsubPullClient
subscription string
maxNumOfMessages int32
}
// NewConsumer returns a fully initialized Consumer.
func NewConsumer(opts ...ConsumerOption) *Consumer {
co := &consumerOptions{
commonOptions: commonOptions{
logHandler: noop.LogHandler{},
},
}
for _, opt := range opts {
opt.applyConsumer(co)
}
return &Consumer{
log: slog.New(co.logHandler),
pubsub: co.pubsub,
subscription: co.subscription,
maxNumOfMessages: co.maxNumOfMessages,
}
}
// Consume implements the queue.Consumer interface.
//
// A PullRequest is sent to Google Cloud PubSub with
// the configured options (e.g. max number of messages, etc.).
// An error is only returned in the case where the PubSub request
// fails or PubSub returns zero messages. In the case of zero messages,
// the error, queue.ErrNoItem, is returned which allows the queue based
// runtimes to disregard this as a failure and retry consuming messages.
func (c *Consumer) Consume(ctx context.Context) ([]*pubsubpb.ReceivedMessage, error) {
spanCtx, span := otel.Tracer("pubsub").Start(ctx, "Consumer.Consume")
defer span.End()
resp, err := c.pubsub.Pull(spanCtx, &pubsubpb.PullRequest{
Subscription: c.subscription,
MaxMessages: c.maxNumOfMessages,
})
if err != nil {
c.log.ErrorContext(spanCtx, "failed to pull pubsub for messages", slogfield.Error(err))
return nil, err
}
c.log.InfoContext(spanCtx, "received messages", slogfield.Int("num_of_messages", len(resp.ReceivedMessages)))
if len(resp.ReceivedMessages) == 0 {
return nil, queue.ErrNoItem
}
return resp.ReceivedMessages, nil
}
type pubsubAckClient interface {
Acknowledge(context.Context, *pubsubpb.AcknowledgeRequest, ...gax.CallOption) error
}
type batchAcknowledgeProcessorOptions struct {
commonOptions
inner queue.Processor[*pubsubpb.ReceivedMessage]
}
// BatchAcknowledgeProcessorOption are options for configuring the BatchAknowledgeProcessor.
type BatchAcknowledgeProcessorOption interface {
applyProcessor(*batchAcknowledgeProcessorOptions)
}
type batchAckProcessorOptionFunc func(*batchAcknowledgeProcessorOptions)
func (f batchAckProcessorOptionFunc) applyProcessor(bo *batchAcknowledgeProcessorOptions) {
f(bo)
}
// Processor configures the underlying single message queue.Processor
// which the BatchAcknowledgeProcessor calls when concurrently processing a
// batch of messages.
func Processor(p queue.Processor[*pubsubpb.ReceivedMessage]) BatchAcknowledgeProcessorOption {
return batchAckProcessorOptionFunc(func(bo *batchAcknowledgeProcessorOptions) {
bo.inner = p
})
}
// BatchAcknowledgeProcessor concurrently processes and acknowledges PubSub messages.
type BatchAcknowledgeProcessor struct {
log *slog.Logger
pubsub pubsubAckClient
subscription string
inner queue.Processor[*pubsubpb.ReceivedMessage]
}
// NewBatchAcknowledgeProcessor returns a fully initialized BatchAcknowledgeProcessor.
func NewBatchAcknowledgeProcessor(opts ...BatchAcknowledgeProcessorOption) *BatchAcknowledgeProcessor {
bo := &batchAcknowledgeProcessorOptions{
commonOptions: commonOptions{
logHandler: noop.LogHandler{},
},
}
for _, opt := range opts {
opt.applyProcessor(bo)
}
return &BatchAcknowledgeProcessor{
log: slog.New(bo.logHandler),
pubsub: bo.pubsub,
subscription: bo.subscription,
inner: bo.inner,
}
}
// Process implements the queue.Processor interface.
//
// Each message is processed concurrently using the processor
// that was provided to the BatchAcknowledgeProcessor when it was
// created. If the inner processor returns an error for a message,
// it will not be acknowledged in PubSub and will be reprocessed after
// the VisibilityTimeout expires. If no error is returned, the
// message will be collected with the other messages from the slice,
// msgs, to be acknowledged together in a single Acknowledge request to PubSub.
func (p *BatchAcknowledgeProcessor) Process(ctx context.Context, msgs []*pubsubpb.ReceivedMessage) error {
spanCtx, span := otel.Tracer("pubsub").Start(ctx, "BatchAcknowledgeProcessor.Process", trace.WithAttributes(
attribute.Int("num_of_messages", len(msgs)),
))
defer span.End()
msgCh := make(chan *pubsubpb.ReceivedMessage)
g, gctx := errgroup.WithContext(spanCtx)
for _, msg := range msgs {
msg := msg
g.Go(func() error {
err := p.inner.Process(gctx, msg)
if err != nil {
p.log.ErrorContext(gctx, "failed to process message", slogfield.Error(err))
return nil
}
msgCh <- msg
return nil
})
}
g2, _ := errgroup.WithContext(spanCtx)
g2.Go(func() error {
defer close(msgCh)
return g.Wait()
})
ackIds := make([]string, 0, len(msgs))
g2.Go(func() error {
for msg := range msgCh {
if msg == nil {
return nil
}
ackIds = append(ackIds, msg.AckId)
}
return nil
})
// Always try to acknowledge messages even if
// context has been cancelled.
_ = g2.Wait()
if len(ackIds) == 0 {
return nil
}
err := p.pubsub.Acknowledge(spanCtx, &pubsubpb.AcknowledgeRequest{
Subscription: p.subscription,
AckIds: ackIds,
})
if err != nil {
p.log.ErrorContext(
spanCtx,
"failed to batch acknowledge messages",
slogfield.Int("num_of_delete_entries", len(ackIds)),
slogfield.Error(err),
)
return err
}
return nil
}