-
Notifications
You must be signed in to change notification settings - Fork 178
/
engine.go
152 lines (128 loc) · 4.22 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package splitter
import (
"fmt"
"sync"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/network"
)
// Engine is the splitter engine, which maintains a list of registered engines
// and passes every event it receives to each of these engines in parallel.
type Engine struct {
enginesMu sync.RWMutex
unit *engine.Unit // used to manage concurrency & shutdown
log zerolog.Logger // used to log relevant actions with context
engines map[network.Engine]struct{} // stores registered engines
channel network.Channel // the channel that this splitter listens on
}
// New creates a new splitter engine.
func New(
log zerolog.Logger,
channel network.Channel,
) *Engine {
return &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "splitter").Logger(),
engines: make(map[network.Engine]struct{}),
channel: channel,
}
}
// RegisterEngine registers a new engine with the splitter. Events
// that are received by the splitter after the engine has registered
// will be passed down to it.
func (e *Engine) RegisterEngine(engine network.Engine) {
e.enginesMu.Lock()
defer e.enginesMu.Unlock()
e.engines[engine] = struct{}{}
}
// UnregisterEngine unregisters an engine with the splitter. After
// the engine has been unregistered, the splitter will stop passing
// events to it. If the given engine was never registered, this is
// a noop.
func (e *Engine) UnregisterEngine(engine network.Engine) {
e.enginesMu.Lock()
defer e.enginesMu.Unlock()
delete(e.engines, engine)
}
// Ready returns a ready channel that is closed once the engine has fully
// started.
func (e *Engine) Ready() <-chan struct{} {
return e.unit.Ready()
}
// Done returns a done channel that is closed once the engine has fully stopped.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}
// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
e.unit.Launch(func() {
e.enginesMu.RLock()
defer e.enginesMu.RUnlock()
for eng := range e.engines {
e.enginesMu.RUnlock()
eng.SubmitLocal(event)
e.enginesMu.RLock()
}
})
}
// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel network.Channel, originID flow.Identifier, event interface{}) {
e.unit.Launch(func() {
if channel != e.channel {
e.log.Fatal().Err(fmt.Errorf("received event on unknown channel")).Str("channel", channel.String())
}
e.enginesMu.RLock()
defer e.enginesMu.RUnlock()
for eng := range e.engines {
e.enginesMu.RUnlock()
eng.Submit(channel, originID, event)
e.enginesMu.RLock()
}
})
}
// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.unit.Do(func() error {
return e.process(func(downstream network.Engine) error {
return downstream.ProcessLocal(event)
})
})
}
// Process processes the given event from the node with the given origin ID
// in a blocking manner. It returns the potential processing error when
// done.
func (e *Engine) Process(channel network.Channel, originID flow.Identifier, event interface{}) error {
return e.unit.Do(func() error {
if channel != e.channel {
return fmt.Errorf("received event on unknown channel %s", channel)
}
return e.process(func(downstream network.Engine) error {
return downstream.Process(channel, originID, event)
})
})
}
// process calls the given function in parallel for all the engines that have
// registered with this splitter.
func (e *Engine) process(processFunc func(network.Engine) error) error {
count := 0
errors := make(chan error)
e.enginesMu.RLock()
for eng := range e.engines {
e.enginesMu.RUnlock()
count += 1
go func(downstream network.Engine) {
errors <- processFunc(downstream)
}(eng)
e.enginesMu.RLock()
}
e.enginesMu.RUnlock()
var multiErr *multierror.Error
for i := 0; i < count; i++ {
multiErr = multierror.Append(multiErr, <-errors)
}
return multiErr.ErrorOrNil()
}