-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
coalescing_context.go
135 lines (123 loc) · 3.36 KB
/
coalescing_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package libkbfs
import (
"reflect"
"sync"
"time"
"golang.org/x/net/context"
)
// CoalescingContext allows many contexts to be treated as one. It waits on
// all its contexts' Context.Done() channels, and when all of them have
// returned, this CoalescingContext is canceled. At any point, a context can be
// added to the list, and will subsequently also be part of the wait condition.
// TODO: add timeout channel in case there is a goroutine leak.
type CoalescingContext struct {
context.Context
doneCh chan struct{}
mutateCh chan context.Context
selects []reflect.SelectCase
start sync.Once
}
const (
mutateChanSelectIndex int = 0
closeChanSelectIndex int = 1
numExplicitlyHandledSelects int = 2
)
func (ctx *CoalescingContext) loop() {
for {
chosen, val, _ := reflect.Select(ctx.selects)
switch chosen {
case mutateChanSelectIndex:
// request to mutate the select list
newCase := val.Interface().(context.Context)
if newCase != nil {
ctx.appendContext(newCase)
}
case closeChanSelectIndex:
// Done
close(ctx.doneCh)
return
default:
// The chosen channel has been closed. Remove it from our select list.
ctx.selects = append(ctx.selects[:chosen], ctx.selects[chosen+1:]...)
// If we have no more selects available, the request is done.
if len(ctx.selects) == numExplicitlyHandledSelects {
close(ctx.doneCh)
return
}
}
}
}
func (ctx *CoalescingContext) appendContext(other context.Context) {
ctx.selects = append(ctx.selects, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(other.Done()),
})
}
// NewCoalescingContext creates a new CoalescingContext. The context _must_ be
// canceled to avoid a goroutine leak.
func NewCoalescingContext(parent context.Context) (*CoalescingContext, context.CancelFunc) {
ctx := &CoalescingContext{
// Make the parent's `Value()` method available to consumers of this
// context. For example, this maintains the parent's log debug tags.
// TODO: Make _all_ parents' values available.
Context: parent,
doneCh: make(chan struct{}),
mutateCh: make(chan context.Context),
}
closeCh := make(chan struct{})
ctx.selects = []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.mutateCh),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(closeCh),
},
}
ctx.appendContext(parent)
cancelFunc := func() {
select {
case <-closeCh:
default:
close(closeCh)
}
}
return ctx, cancelFunc
}
func (ctx *CoalescingContext) startLoop() {
ctx.start.Do(func() {
go ctx.loop()
})
}
// Deadline overrides the default parent's Deadline().
func (ctx *CoalescingContext) Deadline() (time.Time, bool) {
return time.Time{}, false
}
// Done returns a channel that is closed when the CoalescingContext is
// canceled.
func (ctx *CoalescingContext) Done() <-chan struct{} {
ctx.startLoop()
return ctx.doneCh
}
// Err returns context.Canceled if the CoalescingContext has been canceled, and
// nil otherwise.
func (ctx *CoalescingContext) Err() error {
ctx.startLoop()
select {
case <-ctx.doneCh:
return context.Canceled
default:
}
return nil
}
// AddContext adds a context to the set of contexts that we're waiting on.
func (ctx *CoalescingContext) AddContext(other context.Context) error {
ctx.startLoop()
select {
case ctx.mutateCh <- other:
return nil
case <-ctx.doneCh:
return context.Canceled
}
}