/
drainer.go
116 lines (91 loc) · 2.36 KB
/
drainer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package runtime
import (
"context"
"errors"
"sync"
)
// NewDrainer creates new drainer.
func NewDrainer() *Drainer {
return &Drainer{
shutdown: make(chan struct{}, 1),
}
}
// Drainer is used in controllers to ensure graceful shutdown.
type Drainer struct {
subscriptionsMu sync.Mutex
draining bool
subscriptions []*DrainSubscription
shutdown chan struct{}
}
// Drain initializes drain sequence waits for it to succeed until the context is canceled.
func (d *Drainer) Drain(ctx context.Context) error {
d.subscriptionsMu.Lock()
if d.draining {
d.subscriptionsMu.Unlock()
return errors.New("already draining")
}
d.draining = true
for _, s := range d.subscriptions {
select {
case s.events <- DrainEvent{}:
default:
}
}
d.subscriptionsMu.Unlock()
for {
d.subscriptionsMu.Lock()
l := len(d.subscriptions)
d.subscriptionsMu.Unlock()
if l == 0 {
return nil
}
select {
case <-d.shutdown:
case <-ctx.Done():
return ctx.Err()
}
}
}
// Subscribe should be called from a controller that needs graceful shutdown.
func (d *Drainer) Subscribe() *DrainSubscription {
d.subscriptionsMu.Lock()
defer d.subscriptionsMu.Unlock()
subscription := &DrainSubscription{
events: make(chan DrainEvent, 1),
drainer: d,
}
if d.draining {
subscription.events <- DrainEvent{}
}
d.subscriptions = append(d.subscriptions, subscription)
return subscription
}
// DrainSubscription keeps ingoing and outgoing events channels.
type DrainSubscription struct {
drainer *Drainer
events chan DrainEvent
}
// EventCh returns drain events channel.
func (s *DrainSubscription) EventCh() <-chan DrainEvent {
return s.events
}
// Cancel the subscription which triggers drain to shutdown.
func (s *DrainSubscription) Cancel() {
s.drainer.subscriptionsMu.Lock()
for i, sub := range s.drainer.subscriptions {
if sub == s {
s.drainer.subscriptions = append(s.drainer.subscriptions[:i], s.drainer.subscriptions[i+1:]...)
break
}
}
s.drainer.subscriptionsMu.Unlock()
select {
case s.drainer.shutdown <- struct{}{}:
default:
}
}
// DrainEvent is sent to the events channel when drainer starts the shutdown sequence.
type DrainEvent struct{}