-
Notifications
You must be signed in to change notification settings - Fork 30
/
lib_nodes_oneway.go
169 lines (147 loc) · 4.5 KB
/
lib_nodes_oneway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package lib
import (
"log"
"math/rand"
"reflect"
"time"
)
type oneWayNode struct {
index, size int
state []byte
inNeighborsChannels []<-chan []byte
outNeighborsChannels []chan<- []byte
inNeighbors []Node
outNeighbors []Node
inNeighborsCases []reflect.SelectCase
stats statsNode
}
func (v *oneWayNode) ReceiveMessage(index int) []byte {
message := <-v.inNeighborsChannels[index]
if message != nil {
v.stats.receivedMessages++
}
return message
}
func (v *oneWayNode) ReceiveAnyMessage() (int, []byte) {
for {
index, value, ok := reflect.Select(v.inNeighborsCases)
if !ok {
continue
}
message := value.Interface().([]byte)
if message != nil {
v.stats.receivedMessages++
}
return index, message
}
}
func (v *oneWayNode) ReceiveMessageIfAvailable(index int) []byte {
neighborsCasesDefault := make([]reflect.SelectCase, 2)
neighborsCasesDefault[0] = v.inNeighborsCases[index]
neighborsCasesDefault[1] = reflect.SelectCase{Dir: reflect.SelectDefault}
_, value, ok := reflect.Select(neighborsCasesDefault)
if !ok {
return nil
}
message := value.Interface().([]byte)
if message != nil {
v.stats.receivedMessages++
}
return message
}
func (v *oneWayNode) ReceiveMessageWithTimeout(index int, timeout time.Duration) []byte {
neighborsCasesTimeout := make([]reflect.SelectCase, 2)
neighborsCasesTimeout[0] = v.inNeighborsCases[index]
neighborsCasesTimeout[1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))}
num, value, ok := reflect.Select(neighborsCasesTimeout)
if !ok || num == 1 {
return nil
}
message := value.Interface().([]byte)
if message != nil {
v.stats.receivedMessages++
}
return message
}
func (v *oneWayNode) SendMessage(index int, message []byte) {
log.Println("Node", v.GetIndex(), "sends message to neighbor", index)
v.outNeighborsChannels[index] <- message
if message != nil {
v.stats.sentMessages++
}
}
func (v *oneWayNode) GetInChannelsCount() int {
return len(v.inNeighborsChannels)
}
func (v *oneWayNode) GetOutChannelsCount() int {
return len(v.outNeighborsChannels)
}
func (v *oneWayNode) GetInNeighbors() []Node {
return v.inNeighbors
}
func (v *oneWayNode) GetOutNeighbors() []Node {
return v.outNeighbors
}
func (v *oneWayNode) GetIndex() int {
return v.index
}
func (v *oneWayNode) GetState() []byte {
return v.state
}
func (v *oneWayNode) SetState(state []byte) {
v.state = state
}
func (v *oneWayNode) GetSize() int {
return v.size
}
func (v *oneWayNode) StartProcessing() {
<-v.stats.inConfirm
log.Println("Node", v.GetIndex(), "started")
}
func (v *oneWayNode) FinishProcessing(finish bool) {
log.Println("Node", v.GetIndex(), "finished")
v.stats.outConfirm <- counterMessage{
finish: finish,
sentMessages: v.stats.sentMessages,
receivedMessages: v.stats.receivedMessages,
}
v.stats.sentMessages, v.stats.receivedMessages = 0, 0
}
func (v *oneWayNode) IgnoreFutureMessages() {
go func() {
for {
_, _, _ = reflect.Select(v.inNeighborsCases)
}
}()
}
func (v *oneWayNode) Close() {
for _, channel := range v.outNeighborsChannels {
close(channel)
}
for _, channel := range v.inNeighborsChannels {
for range channel {
}
}
}
func (v *oneWayNode) shuffleTopology() {
rand.Shuffle(len(v.inNeighborsChannels), func(i, j int) {
v.inNeighborsChannels[i], v.inNeighborsChannels[j] = v.inNeighborsChannels[j], v.inNeighborsChannels[i]
v.inNeighbors[i], v.inNeighbors[j] = v.inNeighbors[j], v.inNeighbors[i]
v.inNeighborsCases[i], v.inNeighborsCases[j] = v.inNeighborsCases[j], v.inNeighborsCases[i]
v.outNeighborsChannels[i], v.outNeighborsChannels[j] = v.outNeighborsChannels[j], v.outNeighborsChannels[i]
v.outNeighbors[i], v.outNeighbors[j] = v.outNeighbors[j], v.outNeighbors[i]
})
}
func addOneWayConnection(firstNode, secondNode *oneWayNode, channel chan []byte) {
firstNode.outNeighborsChannels = append(firstNode.outNeighborsChannels, channel)
firstNode.outNeighbors = append(firstNode.outNeighbors, secondNode)
secondNode.inNeighborsChannels = append(secondNode.inNeighborsChannels, channel)
secondNode.inNeighbors = append(secondNode.inNeighbors, firstNode)
secondNode.inNeighborsCases = append(
secondNode.inNeighborsCases,
reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)})
}
func addTwoWayConnection(firstNode, secondNode *oneWayNode, firstChan, secondChan chan []byte) {
addOneWayConnection(firstNode, secondNode, firstChan)
addOneWayConnection(secondNode, firstNode, secondChan)
}