-
Notifications
You must be signed in to change notification settings - Fork 211
/
runner.go
120 lines (103 loc) · 2.42 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package model
import (
"math/rand"
"reflect"
"strconv"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
)
type model interface {
OnMessage(Messenger, Message)
}
func newCluster(logger log.Log, rng *rand.Rand) *cluster {
return &cluster{logger: logger, rng: rng}
}
type cluster struct {
rng *rand.Rand
logger log.Log
models []model
}
func (r *cluster) nextid() string {
return strconv.Itoa(len(r.models))
}
func (r *cluster) add(m model) *cluster {
r.models = append(r.models, m)
return r
}
func (r *cluster) addCore() *cluster {
id := r.nextid()
return r.add(newCore(r.rng, id, r.logger.Named("core-"+id)))
}
func (r *cluster) addHare() *cluster {
return r.add(newHare(r.rng))
}
func (r *cluster) addBeacon() *cluster {
return r.add(newBeacon(r.rng))
}
func (r *cluster) iterate(f func(m model)) {
for _, m := range r.models {
f(m)
}
}
func newFailingRunner(c *cluster,
messenger Messenger,
monitors []Monitor,
rng *rand.Rand,
probability [2]int,
) *failingRunner {
return &failingRunner{
cluster: c,
messenger: messenger,
monitors: monitors,
rng: rng,
probability: probability,
}
}
type failingRunner struct {
cluster *cluster
messenger Messenger
monitors []Monitor
// probability that messages in failables will fail by the model
probability [2]int
rng *rand.Rand
lid types.LayerID
failables map[reflect.Type]struct{}
}
func (r *failingRunner) next() {
r.lid = r.lid.Add(1)
r.messenger.Send(MessageLayerStart{LayerID: r.lid})
r.consume()
r.messenger.Send(MessageLayerEnd{LayerID: r.lid})
r.consume()
}
func (r *failingRunner) failable(events ...Message) *failingRunner {
if r.failables == nil {
r.failables = map[reflect.Type]struct{}{}
}
for _, ev := range events {
r.failables[reflect.TypeOf(ev)] = struct{}{}
}
return r
}
func (r *failingRunner) isFailable(ev Message) bool {
if r.failables == nil {
return true
}
_, exist := r.failables[reflect.TypeOf(ev)]
return exist
}
func (r *failingRunner) consume() {
for msg := r.messenger.PopMessage(); msg != nil; msg = r.messenger.PopMessage() {
if r.isFailable(msg) && r.probability[0] > r.rng.Intn(r.probability[1]) {
continue
}
r.cluster.iterate(func(m model) {
m.OnMessage(r.messenger, msg)
})
}
for ev := r.messenger.PopEvent(); ev != nil; ev = r.messenger.PopEvent() {
for _, monitor := range r.monitors {
monitor.OnEvent(ev)
}
}
}