-
Notifications
You must be signed in to change notification settings - Fork 0
/
outgoingChannelLoadBalancer.go
160 lines (125 loc) · 3.91 KB
/
outgoingChannelLoadBalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package libp2p
import (
"context"
"sync"
p2p "github.com/kalyan3104/k-core-p2p"
)
var _ ChannelLoadBalancer = (*OutgoingChannelLoadBalancer)(nil)
const defaultSendChannel = "default send channel"
// OutgoingChannelLoadBalancer is a component that evenly balances requests to be sent
type OutgoingChannelLoadBalancer struct {
mut sync.RWMutex
chans []chan *SendableData
mainChan chan *SendableData
names []string
//namesChans is defined only for performance purposes as to fast search by name
//iteration is done directly on slices as that is used very often and is about 50x
//faster then an iteration over a map
namesChans map[string]chan *SendableData
cancelFunc context.CancelFunc
ctx context.Context //we need the context saved here in order to call appendChannel from exported func AddChannel
}
// NewOutgoingChannelLoadBalancer creates a new instance of a ChannelLoadBalancer instance
func NewOutgoingChannelLoadBalancer() *OutgoingChannelLoadBalancer {
ctx, cancelFunc := context.WithCancel(context.Background())
oclb := &OutgoingChannelLoadBalancer{
chans: make([]chan *SendableData, 0),
names: make([]string, 0),
namesChans: make(map[string]chan *SendableData),
mainChan: make(chan *SendableData),
cancelFunc: cancelFunc,
ctx: ctx,
}
oclb.appendChannel(defaultSendChannel)
return oclb
}
func (oplb *OutgoingChannelLoadBalancer) appendChannel(channel string) {
oplb.names = append(oplb.names, channel)
ch := make(chan *SendableData)
oplb.chans = append(oplb.chans, ch)
oplb.namesChans[channel] = ch
go func() {
for {
var obj *SendableData
select {
case obj = <-ch:
case <-oplb.ctx.Done():
log.Debug("closing OutgoingChannelLoadBalancer's append channel go routine")
return
}
oplb.mainChan <- obj
}
}()
}
// AddChannel adds a new channel to the throttler, if it does not exists
func (oplb *OutgoingChannelLoadBalancer) AddChannel(channel string) error {
if channel == defaultSendChannel {
return p2p.ErrChannelCanNotBeReAdded
}
oplb.mut.Lock()
defer oplb.mut.Unlock()
for _, name := range oplb.names {
if name == channel {
return nil
}
}
oplb.appendChannel(channel)
return nil
}
// RemoveChannel removes an existing channel from the throttler
func (oplb *OutgoingChannelLoadBalancer) RemoveChannel(channel string) error {
if channel == defaultSendChannel {
return p2p.ErrChannelCanNotBeDeleted
}
oplb.mut.Lock()
defer oplb.mut.Unlock()
index := -1
for idx, name := range oplb.names {
if name == channel {
index = idx
break
}
}
if index == -1 {
return p2p.ErrChannelDoesNotExist
}
sendableChan := oplb.chans[index]
//remove the index-th element in the chan slice
copy(oplb.chans[index:], oplb.chans[index+1:])
oplb.chans[len(oplb.chans)-1] = nil
oplb.chans = oplb.chans[:len(oplb.chans)-1]
//remove the index-th element in the names slice
copy(oplb.names[index:], oplb.names[index+1:])
oplb.names = oplb.names[:len(oplb.names)-1]
close(sendableChan)
delete(oplb.namesChans, channel)
return nil
}
// GetChannelOrDefault fetches the required channel or the default if the channel is not present
func (oplb *OutgoingChannelLoadBalancer) GetChannelOrDefault(channel string) chan *SendableData {
oplb.mut.RLock()
defer oplb.mut.RUnlock()
ch := oplb.namesChans[channel]
if ch != nil {
return ch
}
return oplb.chans[0]
}
// CollectOneElementFromChannels gets the waiting object from mainChan. It is a blocking call.
func (oplb *OutgoingChannelLoadBalancer) CollectOneElementFromChannels() *SendableData {
select {
case obj := <-oplb.mainChan:
return obj
case <-oplb.ctx.Done():
return nil
}
}
// Close finishes all started go routines in this instance
func (oplb *OutgoingChannelLoadBalancer) Close() error {
oplb.cancelFunc()
return nil
}
// IsInterfaceNil returns true if there is no value under the interface
func (oplb *OutgoingChannelLoadBalancer) IsInterfaceNil() bool {
return oplb == nil
}