-
Notifications
You must be signed in to change notification settings - Fork 1k
/
options.go
147 lines (127 loc) · 4.69 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Original source: github.com/micro/go-micro/v3/events/options.go
package events
import "time"
// PublishOptions contains all the options which can be provided when publishing an event
type PublishOptions struct {
// Metadata contains any keys which can be used to query the data, for example a customer id
Metadata map[string]string
// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used
Timestamp time.Time
}
// PublishOption sets attributes on PublishOptions
type PublishOption func(o *PublishOptions)
// WithMetadata sets the Metadata field on PublishOptions
func WithMetadata(md map[string]string) PublishOption {
return func(o *PublishOptions) {
o.Metadata = md
}
}
// WithTimestamp sets the timestamp field on PublishOptions
func WithTimestamp(t time.Time) PublishOption {
return func(o *PublishOptions) {
o.Timestamp = t
}
}
// ConsumeOptions contains all the options which can be provided when subscribing to a topic
type ConsumeOptions struct {
// Group is the name of the consumer group, if two consumers have the same group the events
// are distributed between them
Group string
// Offset is the time from which the messages should be consumed from. If not provided then
// the messages will be consumed starting from the moment the Subscription starts.
Offset time.Time
// AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered.
// If false specifies that each message need ts to be manually acknowledged by the subscriber.
// If processing is successful the message should be ack'ed to remove the message from the stream.
// If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will
// remain on the stream to be processed again.
AutoAck bool
AckWait time.Duration
// RetryLimit indicates number of times a message is retried
RetryLimit int
// CustomRetries indicates whether to use RetryLimit
CustomRetries bool
}
// ConsumeOption sets attributes on ConsumeOptions
type ConsumeOption func(o *ConsumeOptions)
// WithGroup sets the consumer group to be part of when consuming events
func WithGroup(q string) ConsumeOption {
return func(o *ConsumeOptions) {
o.Group = q
}
}
// WithOffset sets the offset time at which to start consuming events
func WithOffset(t time.Time) ConsumeOption {
return func(o *ConsumeOptions) {
o.Offset = t
}
}
// WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received
// the message is requeued in case auto ack is turned off
func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption {
return func(o *ConsumeOptions) {
o.AutoAck = ack
o.AckWait = ackWait
}
}
// WithRetryLimit sets the RetryLimit field on ConsumeOptions.
// Set to -1 for infinite retries (default)
func WithRetryLimit(retries int) ConsumeOption {
return func(o *ConsumeOptions) {
o.RetryLimit = retries
o.CustomRetries = true
}
}
func (s ConsumeOptions) GetRetryLimit() int {
if !s.CustomRetries {
return -1
}
return s.RetryLimit
}
// WriteOptions contains all the options which can be provided when writing an event to a store
type WriteOptions struct {
// TTL is the duration the event should be recorded for, a zero value TTL indicates the event should
// be stored indefinately
TTL time.Duration
}
// WriteOption sets attributes on WriteOptions
type WriteOption func(o *WriteOptions)
// WithTTL sets the TTL attribute on WriteOptions
func WithTTL(d time.Duration) WriteOption {
return func(o *WriteOptions) {
o.TTL = d
}
}
// ReadOptions contains all the options which can be provided when reading events from a store
type ReadOptions struct {
// Limit the number of results to return
Limit uint
// Offset the results by this number, useful for paginated queries
Offset uint
}
// ReadOption sets attributes on ReadOptions
type ReadOption func(o *ReadOptions)
// ReadLimit sets the limit attribute on ReadOptions
func ReadLimit(l uint) ReadOption {
return func(o *ReadOptions) {
o.Limit = 1
}
}
// ReadOffset sets the offset attribute on ReadOptions
func ReadOffset(l uint) ReadOption {
return func(o *ReadOptions) {
o.Offset = 1
}
}