forked from googleapis/google-cloud-go
/
keepalive.go
182 lines (158 loc) · 5.16 KB
/
keepalive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"sync"
"time"
"golang.org/x/net/context"
)
// keepAlive keeps track of which Messages need to have their deadline extended, and
// periodically extends them.
// Messages are tracked by Ack ID.
type keepAlive struct {
s service
Ctx context.Context // The context to use when extending deadlines.
Sub string // The full name of the subscription.
ExtensionTick <-chan time.Time // ExtensionTick supplies the frequency with which to make extension requests.
Deadline time.Duration // How long to extend messages for each time they are extended. Should be greater than ExtensionTick frequency.
MaxExtension time.Duration // How long to keep extending each message's ack deadline before automatically removing it.
mu sync.Mutex
// key: ackID; value: time at which ack deadline extension should cease.
items map[string]time.Time
dr drain
wg sync.WaitGroup
}
// Start initiates the deadline extension loop. Stop must be called once keepAlive is no longer needed.
func (ka *keepAlive) Start() {
ka.items = make(map[string]time.Time)
ka.dr = drain{Drained: make(chan struct{})}
ka.wg.Add(1)
go func() {
defer ka.wg.Done()
for {
select {
case <-ka.Ctx.Done():
// Don't bother waiting for items to be removed: we can't extend them any more.
return
case <-ka.dr.Drained:
return
case <-ka.ExtensionTick:
live, expired := ka.getAckIDs()
ka.wg.Add(1)
go func() {
defer ka.wg.Done()
ka.extendDeadlines(live)
}()
for _, id := range expired {
ka.Remove(id)
}
}
}
}()
}
// Add adds an ack id to be kept alive.
// It should not be called after Stop.
func (ka *keepAlive) Add(ackID string) {
ka.mu.Lock()
defer ka.mu.Unlock()
ka.items[ackID] = time.Now().Add(ka.MaxExtension)
ka.dr.SetPending(true)
}
// Remove removes ackID from the list to be kept alive.
func (ka *keepAlive) Remove(ackID string) {
ka.mu.Lock()
defer ka.mu.Unlock()
// Note: If users NACKs a message after it has been removed due to
// expiring, Remove will be called twice with same ack id. This is OK.
delete(ka.items, ackID)
ka.dr.SetPending(len(ka.items) != 0)
}
// Stop waits until all added ackIDs have been removed, and cleans up resources.
// Stop may only be called once.
func (ka *keepAlive) Stop() {
ka.mu.Lock()
ka.dr.Drain()
ka.mu.Unlock()
ka.wg.Wait()
}
// getAckIDs returns the set of ackIDs that are being kept alive.
// The set is divided into two lists: one with IDs that should continue to be kept alive,
// and the other with IDs that should be dropped.
func (ka *keepAlive) getAckIDs() (live, expired []string) {
ka.mu.Lock()
defer ka.mu.Unlock()
return getKeepAliveAckIDs(ka.items)
}
func getKeepAliveAckIDs(items map[string]time.Time) (live, expired []string) {
now := time.Now()
for id, expiry := range items {
if expiry.Before(now) {
expired = append(expired, id)
} else {
live = append(live, id)
}
}
return live, expired
}
const maxExtensionAttempts = 2
func (ka *keepAlive) extendDeadlines(ackIDs []string) {
head, tail := ka.s.splitAckIDs(ackIDs)
for len(head) > 0 {
for i := 0; i < maxExtensionAttempts; i++ {
if ka.s.modifyAckDeadline(ka.Ctx, ka.Sub, ka.Deadline, head) == nil {
break
}
}
// NOTE: Messages whose deadlines we fail to extend will
// eventually be redelivered and this is a documented behaviour
// of the API.
//
// NOTE: If we fail to extend deadlines here, this
// implementation will continue to attempt extending the
// deadlines for those ack IDs the next time the extension
// ticker ticks. By then the deadline will have expired.
// Re-extending them is harmless, however.
//
// TODO: call Remove for ids which fail to be extended.
head, tail = ka.s.splitAckIDs(tail)
}
}
// A drain (once started) indicates via a channel when there is no work pending.
type drain struct {
started bool
pending bool
// Drained is closed once there are no items outstanding if Drain has been called.
Drained chan struct{}
}
// Drain starts the drain process. This cannot be undone.
func (d *drain) Drain() {
d.started = true
d.closeIfDrained()
}
// SetPending sets whether there is work pending or not. It may be called multiple times before or after Drain.
func (d *drain) SetPending(pending bool) {
d.pending = pending
d.closeIfDrained()
}
func (d *drain) closeIfDrained() {
if !d.pending && d.started {
// Check to see if d.Drained is closed before closing it.
// This allows SetPending(false) to be safely called multiple times.
select {
case <-d.Drained:
default:
close(d.Drained)
}
}
}