forked from ipfs/kubo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
closer.go
183 lines (154 loc) · 5.66 KB
/
closer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package ctxcloser
import (
"sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// CloseFunc is a function used to close a ContextCloser
type CloseFunc func() error
var nilCloseFunc = func() error { return nil }
// ContextCloser is an interface for services able to be opened and closed.
// It has a parent Context, and Children. But ContextCloser is not a proper
// "tree" like the Context tree. It is more like a Context-WaitGroup hybrid.
// It models a main object with a few children objects -- and, unlike the
// context -- concerns itself with the parent-child closing semantics:
//
// - Can define a CloseFunc (func() error) to be run at Close time.
// - Children call Children().Add(1) to be waited upon
// - Children can select on <-Closing() to know when they should shut down.
// - Close() will wait until all children call Children().Done()
// - <-Closed() signals when the service is completely closed.
//
// ContextCloser can be embedded into the main object itself. In that case,
// the closeFunc (if a member function) has to be set after the struct
// is intialized:
//
// type service struct {
// ContextCloser
// net.Conn
// }
//
// func (s *service) close() error {
// return s.Conn.Close()
// }
//
// func newService(ctx context.Context, c net.Conn) *service {
// s := &service{c}
// s.ContextCloser = NewContextCloser(ctx, s.close)
// return s
// }
//
type ContextCloser interface {
// Context is the context of this ContextCloser. It is "sort of" a parent.
Context() context.Context
// Children is a sync.Waitgroup for all children goroutines that should
// shut down completely before this service is said to be "closed".
// Follows the semantics of WaitGroup:
//
// Children().Add(1) // add one more dependent child
// Children().Done() // child signals it is done
//
Children() *sync.WaitGroup
// AddCloserChild registers a dependent ContextCloser child. The child will
// be closed when this parent is closed, and waited upon to finish. It is
// the functional equivalent of the following:
//
// go func(parent, child ContextCloser) {
// parent.Children().Add(1) // add one more dependent child
// <-parent.Closing() // wait until parent is closing
// child.Close() // signal child to close
// parent.Children().Done() // child signals it is done
// }(a, b)
//
AddCloserChild(c ContextCloser)
// Close is a method to call when you wish to stop this ContextCloser
Close() error
// Closing is a signal to wait upon, like Context.Done().
// It fires when the object should be closing (but hasn't yet fully closed).
// The primary use case is for child goroutines who need to know when
// they should shut down. (equivalent to Context().Done())
Closing() <-chan struct{}
// Closed is a method to wait upon, like Context.Done().
// It fires when the entire object is fully closed.
// The primary use case is for external listeners who need to know when
// this object is completly done, and all its children closed.
Closed() <-chan struct{}
}
// contextCloser is an OpenCloser with a cancellable context
type contextCloser struct {
ctx context.Context
cancel context.CancelFunc
// called to run the close logic.
closeFunc CloseFunc
// closed is released once the close function is done.
closed chan struct{}
// wait group for child goroutines
children sync.WaitGroup
// sync primitive to ensure the close logic is only called once.
closeOnce sync.Once
// error to return to clients of Close().
closeErr error
}
// NewContextCloser constructs and returns a ContextCloser. It will call
// cf CloseFunc before its Done() Wait signals fire.
func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser {
if cf == nil {
cf = nilCloseFunc
}
ctx, cancel := context.WithCancel(ctx)
c := &contextCloser{
ctx: ctx,
cancel: cancel,
closeFunc: cf,
closed: make(chan struct{}),
}
c.Children().Add(1) // we're a child goroutine, to be waited upon.
go c.closeOnContextDone()
return c
}
func (c *contextCloser) Context() context.Context {
return c.ctx
}
func (c *contextCloser) Children() *sync.WaitGroup {
return &c.children
}
func (c *contextCloser) AddCloserChild(child ContextCloser) {
c.children.Add(1)
go func(parent, child ContextCloser) {
<-parent.Closing() // wait until parent is closing
child.Close() // signal child to close
parent.Children().Done() // child signals it is done
}(c, child)
}
// Close is the external close function. it's a wrapper around internalClose
// that waits on Closed()
func (c *contextCloser) Close() error {
c.internalClose()
<-c.Closed() // wait until we're totally done.
return c.closeErr
}
func (c *contextCloser) Closing() <-chan struct{} {
return c.Context().Done()
}
func (c *contextCloser) Closed() <-chan struct{} {
return c.closed
}
func (c *contextCloser) internalClose() {
go c.closeOnce.Do(c.closeLogic)
}
// the _actual_ close process.
func (c *contextCloser) closeLogic() {
// this function should only be called once (hence the sync.Once).
// and it will panic at the bottom (on close(c.closed)) otherwise.
c.cancel() // signal that we're shutting down (Closing)
c.closeErr = c.closeFunc() // actually run the close logic
c.children.Wait() // wait till all children are done.
close(c.closed) // signal that we're shut down (Closed)
}
// if parent context is shut down before we call Close explicitly,
// we need to go through the Close motions anyway. Hence all the sync
// stuff all over the place...
func (c *contextCloser) closeOnContextDone() {
<-c.Context().Done() // wait until parent (context) is done.
c.internalClose()
c.Children().Done()
}