-
Notifications
You must be signed in to change notification settings - Fork 3
/
distribution.go
279 lines (246 loc) · 9.04 KB
/
distribution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
package policy
import (
"github.com/paust-team/pirius/bootstrapping"
"github.com/paust-team/pirius/bootstrapping/topic"
"github.com/paust-team/pirius/helper"
"github.com/paust-team/pirius/logger"
"go.uber.org/zap"
"sync"
)
type DistributionPolicyExecutor struct {
Executor
flusher
}
func NewDistributionPolicyExecutor(bootstrapper *bootstrapping.BootstrapService) *DistributionPolicyExecutor {
return &DistributionPolicyExecutor{
flusher: flusher{
bootstrapper: bootstrapper,
mu: sync.Mutex{},
},
}
}
// OnPublisherAdded : when a publisher connects, it either sets the fragment active or adds a new one until it completes num_subscribers.
// then assign newly active fragments to each subscription one by one (round-robin)
func (d *DistributionPolicyExecutor) OnPublisherAdded(id string, topicName string, host string) error {
lock := d.bootstrapper.NewTopicLock(topicName)
if err := lock.Lock(); err != nil {
return err
}
defer lock.Unlock()
// get topic fragment mappings
fragMappings, err := d.GetTopicFragmentMappings(topicName)
if err != nil {
return err
}
pubsFragmentIds, err := getFragmentsToPublish(d.bootstrapper.CoordClientTopicWrapper, topicName, id, fragMappings)
if err != nil {
return err
}
subscribers, err := d.bootstrapper.GetSubscribers(topicName)
if err != nil {
return err
}
numSubscribers := len(subscribers)
numPublishFragments := len(pubsFragmentIds)
// when only one subscriber exists or does not exist, just active one fragment.
// if not, set the number of fragments to the number of subscribers.
if numPublishFragments < numSubscribers {
// assign new fragment
tempFragMappings := make(topic.FragMappingInfo)
for _, fragmentId := range pubsFragmentIds {
tempFragMappings[fragmentId] = topic.FragInfo{}
}
for i := 0; i < numSubscribers-numPublishFragments; i++ {
newFragmentId, err := getAssignableFragmentId(topicName, tempFragMappings)
if err != nil {
return err
}
tempFragMappings[newFragmentId] = topic.FragInfo{}
pubsFragmentIds = append(pubsFragmentIds, newFragmentId)
}
} else if numPublishFragments > numSubscribers {
if numSubscribers == 0 {
pubsFragmentIds = pubsFragmentIds[0:1]
} else {
pubsFragmentIds = pubsFragmentIds[0:numSubscribers]
}
}
// assert numPublishFragments == numSubscribers
// set publishing fragments as active
for _, fragmentId := range pubsFragmentIds {
fragMappings[fragmentId] = topic.FragInfo{
State: topic.Active,
PublisherId: id,
Address: host,
}
}
d.UpdateTopicFragments(topicName, fragMappings)
logger.Info("update fragments to active state", zap.String("topic", topicName), zap.Uints("fragments", pubsFragmentIds))
// update subscription info
// round-robin fragment assignment per subscriber
subscriptionMappings, err := d.GetSubscriptionMappings(topicName)
if err != nil {
return err
}
selectFragment := helper.RoundRobinSelection(pubsFragmentIds)
for _, subscriberId := range subscribers {
subsFragmentIds := subscriptionMappings[subscriberId]
subscriptionMappings[subscriberId] = append(subsFragmentIds, selectFragment())
}
d.UpdateSubscriptionMappings(topicName, subscriptionMappings)
logger.Info("update subscription info", zap.String("topic", topicName))
return nil
}
// OnPublisherRemoved : when a publisher is disconnected, set fragment as inactive and remove fragments from subscriptions
func (d *DistributionPolicyExecutor) OnPublisherRemoved(id string, topicName string) error {
lock := d.bootstrapper.NewTopicLock(topicName)
if err := lock.Lock(); err != nil {
return err
}
defer lock.Unlock()
fragMappings, err := d.GetTopicFragmentMappings(topicName)
if err != nil {
return err
}
pubsFragmentIds, err := findPublisherFragments(d.bootstrapper.CoordClientTopicWrapper, topicName, id)
if err != nil {
return err
}
for _, fragmentId := range pubsFragmentIds {
fragMappings[fragmentId] = topic.FragInfo{
State: topic.Inactive,
PublisherId: id,
Address: "",
}
}
d.UpdateTopicFragments(topicName, fragMappings)
logger.Info("update fragments to inactive state", zap.String("topic", topicName), zap.Uints("fragments", pubsFragmentIds))
// update subscription info
subscriptionMappings, err := d.GetSubscriptionMappings(topicName)
if err != nil {
return err
}
// exclude removed fragments from subscriptions
for subscriberId, subsFragmentIds := range subscriptionMappings {
var newSubsFragmentIds []uint
for _, fragmentId := range subsFragmentIds {
if !helper.IsContains(fragmentId, pubsFragmentIds) { // exclude inactive pubs fragments
newSubsFragmentIds = append(newSubsFragmentIds, fragmentId)
}
}
subscriptionMappings[subscriberId] = newSubsFragmentIds
}
d.UpdateSubscriptionMappings(topicName, subscriptionMappings)
logger.Info("update subscription info", zap.String("topic", topicName))
return nil
}
// OnSubscriberAdded : when a subscriber connected, create new subscription for it and assign new 1*num_publisher fragments.
func (d *DistributionPolicyExecutor) OnSubscriberAdded(id string, topicName string) error {
lock := d.bootstrapper.NewTopicLock(topicName)
if err := lock.Lock(); err != nil {
return err
}
defer lock.Unlock()
fragMappings, err := d.GetTopicFragmentMappings(topicName)
if err != nil {
return err
}
var subsFragmentIds []uint
subscriptionMappings, err := d.GetSubscriptionMappings(topicName)
if err != nil {
return err
}
if publisherInfoMap, numActivePublishers := topic.ConvertToPublisherInfo(fragMappings); numActivePublishers != 0 {
// if other subscriber does not exist, assign all active fragments of each alive publishers.
// else, create a new fragment or change an inactive fragment of each publisher
if len(subscriptionMappings[id]) == numActivePublishers {
logger.Info("skip assign subscription of subscriber",
zap.String("topic", topicName),
zap.String("subscriber", id),
zap.Int("num-active-publishers", numActivePublishers),
zap.Uints("assigned-fragments", subscriptionMappings[id]))
return nil
} else if len(subscriptionMappings) == 0 {
for _, info := range publisherInfoMap {
if !info.Alive {
continue
}
subsFragmentIds = append(subsFragmentIds, info.ActiveFragments...)
}
} else {
for publisherId, info := range publisherInfoMap {
if !info.Alive {
continue
}
var newFragmentId uint
if len(info.InActiveFragments) == 0 {
newFragmentId, err = getAssignableFragmentId(topicName, fragMappings)
if err != nil {
return err
}
} else {
newFragmentId = info.InActiveFragments[0]
}
// update fragment info from new fragment assignment or state transition of inactive to active
fragMappings[newFragmentId] = topic.FragInfo{
State: topic.Active,
PublisherId: publisherId,
Address: info.Address,
}
subsFragmentIds = append(subsFragmentIds, newFragmentId)
}
d.UpdateTopicFragments(topicName, fragMappings)
}
}
// update subscription info
subscriptionMappings[id] = subsFragmentIds // assign new subscription for added subscriber
d.UpdateSubscriptionMappings(topicName, subscriptionMappings)
logger.Info("add subscription of subscriber",
zap.String("topic", topicName), zap.String("subscriber", id), zap.Uints("fragments", subsFragmentIds))
return nil
}
// OnSubscriberRemoved : when a subscriber disconnected, delete subscription of it and set subscribing fragments as stale
func (d *DistributionPolicyExecutor) OnSubscriberRemoved(id string, topicName string) error {
lock := d.bootstrapper.NewTopicLock(topicName)
if err := lock.Lock(); err != nil {
return err
}
defer lock.Unlock()
subscriptionMappings, err := d.GetSubscriptionMappings(topicName)
if err != nil {
return err
}
// remove a subscription of removed subscriber
subscribingFragmentIds := subscriptionMappings[id]
delete(subscriptionMappings, id)
d.UpdateSubscriptionMappings(topicName, subscriptionMappings)
logger.Info("remove subscription of subscriber", zap.String("topic", topicName), zap.String("subscriber", id))
// set subscribing fragments as stale when number of active fragment is greater than 1
fragMappings, err := d.GetTopicFragmentMappings(topicName)
if err != nil {
return err
}
var activeFragmentIds []uint
for fragmentId, mapping := range fragMappings {
if mapping.State == topic.Active {
activeFragmentIds = append(activeFragmentIds, fragmentId)
}
}
if helper.HasAllElements(activeFragmentIds, subscribingFragmentIds) {
logger.Info("skip update fragments: at least one fragment of each publisher should be active state",
zap.String("topic", topicName),
zap.Uints("total-active-fragments", activeFragmentIds),
zap.Uints("subscribing-fragments", subscribingFragmentIds))
return nil
}
for _, fragmentId := range subscribingFragmentIds {
fragMappings[fragmentId] = topic.FragInfo{
State: topic.Stale,
PublisherId: fragMappings[fragmentId].PublisherId,
Address: "",
}
}
d.UpdateTopicFragments(topicName, fragMappings)
logger.Info("update fragments to stale state", zap.String("topic", topicName), zap.Uints("fragments", subscribingFragmentIds))
return nil
}