/
client.go
153 lines (129 loc) · 3.93 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package grouper
import (
"sync"
"github.com/tedsuo/ifrit"
)
/*
DynamicClient provides a client with group controls and event notifications.
A client can use the insert channel to add members to the group. When the group
becomes full, the insert channel blocks until a running process exits the group.
Once there are no more members to be added, the client can close the dynamic
group, preventing new members from being added.
*/
type DynamicClient interface {
/*
EntranceListener provides a new buffered channel of entrance events, which are
emited every time an inserted process is ready. To help prevent race conditions,
every new channel is populated with previously emited events, up to it's buffer
size.
*/
EntranceListener() <-chan EntranceEvent
/*
ExitListener provides a new buffered channel of exit events, which are emited
every time an inserted process exits. To help prevent race conditions, every
new channel is populated with previously emited events, up to it's buffer size.
*/
ExitListener() <-chan ExitEvent
/*
CloseNotifier provides a new unbuffered channel, which will emit a single event
once the group has been closed.
*/
CloseNotifier() <-chan struct{}
/*
Inserter provides an unbuffered channel for adding members to a group. When the
group becomes full, the insert channel blocks until a running process exits.
Once the group is closed, insert channels block forever.
*/
Inserter() chan<- Member
/*
Close causes a dynamic group to become a static group. This means that no new
members may be inserted, and the group will exit once all members have
completed.
*/
Close()
Get(name string) (ifrit.Process, bool)
}
type memberRequest struct {
Name string
Response chan ifrit.Process
}
/*
dynamicClient implements DynamicClient.
*/
type dynamicClient struct {
insertChannel chan Member
getMemberChannel chan memberRequest
completeNotifier chan struct{}
closeNotifier chan struct{}
closeOnce *sync.Once
entranceBroadcaster *entranceEventBroadcaster
exitBroadcaster *exitEventBroadcaster
}
func newClient(bufferSize int) dynamicClient {
return dynamicClient{
insertChannel: make(chan Member),
getMemberChannel: make(chan memberRequest),
completeNotifier: make(chan struct{}),
closeNotifier: make(chan struct{}),
closeOnce: new(sync.Once),
entranceBroadcaster: newEntranceEventBroadcaster(bufferSize),
exitBroadcaster: newExitEventBroadcaster(bufferSize),
}
}
func (c dynamicClient) Get(name string) (ifrit.Process, bool) {
req := memberRequest{
Name: name,
Response: make(chan ifrit.Process),
}
select {
case c.getMemberChannel <- req:
p, ok := <-req.Response
if !ok {
return nil, false
}
return p, true
case <-c.completeNotifier:
return nil, false
}
}
func (c dynamicClient) memberRequests() chan memberRequest {
return c.getMemberChannel
}
func (c dynamicClient) Inserter() chan<- Member {
return c.insertChannel
}
func (c dynamicClient) insertEventListener() <-chan Member {
return c.insertChannel
}
func (c dynamicClient) EntranceListener() <-chan EntranceEvent {
return c.entranceBroadcaster.Attach()
}
func (c dynamicClient) broadcastEntrance(event EntranceEvent) {
c.entranceBroadcaster.Broadcast(event)
}
func (c dynamicClient) closeEntranceBroadcaster() {
c.entranceBroadcaster.Close()
}
func (c dynamicClient) ExitListener() <-chan ExitEvent {
return c.exitBroadcaster.Attach()
}
func (c dynamicClient) broadcastExit(event ExitEvent) {
c.exitBroadcaster.Broadcast(event)
}
func (c dynamicClient) closeExitBroadcaster() {
c.exitBroadcaster.Close()
}
func (c dynamicClient) closeBroadcasters() error {
c.entranceBroadcaster.Close()
c.exitBroadcaster.Close()
close(c.completeNotifier)
return nil
}
func (c dynamicClient) Close() {
c.closeOnce.Do(func() {
close(c.closeNotifier)
})
}
func (c dynamicClient) CloseNotifier() <-chan struct{} {
return c.closeNotifier
}