forked from looplab/eventhorizon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventbus.go
231 lines (194 loc) · 5.57 KB
/
eventbus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Copyright (c) 2018 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"context"
"fmt"
"log"
"sync"
"time"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/codec/json"
)
// DefaultQueueSize is the default queue size per handler for publishing events.
var DefaultQueueSize = 1000
// EventBus is a local event bus that delegates handling of published events
// to all matching registered handlers, in order of registration.
type EventBus struct {
group *Group
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
wg sync.WaitGroup
codec eh.EventCodec
}
// NewEventBus creates a EventBus.
func NewEventBus(options ...Option) *EventBus {
b := &EventBus{
group: NewGroup(),
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan eh.EventBusError, 100),
codec: &json.EventCodec{},
}
// Apply configuration options.
for _, option := range options {
if option == nil {
continue
}
option(b)
}
return b
}
// Option is an option setter used to configure creation.
type Option func(*EventBus)
// WithCodec uses the specified codec for encoding events.
func WithCodec(codec eh.EventCodec) Option {
return func(b *EventBus) {
b.codec = codec
}
}
// WithGroup uses a specified group for transmitting events.
func WithGroup(g *Group) Option {
return func(b *EventBus) {
b.group = g
}
}
// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandlerType() eh.EventHandlerType {
return "eventbus"
}
// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
data, err := b.codec.MarshalEvent(ctx, event)
if err != nil {
return fmt.Errorf("could not marshal event: %w", err)
}
return b.group.publish(ctx, data)
}
// AddHandler implements the AddHandler method of the eventhorizon.EventBus interface.
func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error {
if m == nil {
return eh.ErrMissingMatcher
}
if h == nil {
return eh.ErrMissingHandler
}
// Check handler existence.
b.registeredMu.Lock()
defer b.registeredMu.Unlock()
if _, ok := b.registered[h.HandlerType()]; ok {
return eh.ErrHandlerAlreadyAdded
}
// Get or create the channel.
id := h.HandlerType().String()
ch := b.group.channel(id)
// Register handler.
b.registered[h.HandlerType()] = struct{}{}
// Handle until context is cancelled.
b.wg.Add(1)
go b.handle(ctx, m, h, ch)
return nil
}
// Errors implements the Errors method of the eventhorizon.EventBus interface.
func (b *EventBus) Errors() <-chan eh.EventBusError {
return b.errCh
}
// Wait for all channels to close in the event bus group
func (b *EventBus) Wait() {
b.wg.Wait()
b.group.close()
}
type evt struct {
ctxVals map[string]interface{}
event eh.Event
}
// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {
defer b.wg.Done()
for {
select {
case data := <-ch:
// Artificial delay to simulate network.
time.Sleep(time.Millisecond)
event, ctx, err := b.codec.UnmarshalEvent(ctx, data)
if err != nil {
err = fmt.Errorf("could not unmarshal event: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
return
}
// Ignore non-matching events.
if !m.Match(event) {
continue
}
// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
}
case <-ctx.Done():
return
}
}
}
// Group is a publishing group shared by multiple event busses locally, if needed.
type Group struct {
bus map[string]chan []byte
busMu sync.RWMutex
}
// NewGroup creates a Group.
func NewGroup() *Group {
return &Group{
bus: map[string]chan []byte{},
}
}
func (g *Group) channel(id string) <-chan []byte {
g.busMu.Lock()
defer g.busMu.Unlock()
if ch, ok := g.bus[id]; ok {
return ch
}
ch := make(chan []byte, DefaultQueueSize)
g.bus[id] = ch
return ch
}
func (g *Group) publish(ctx context.Context, b []byte) error {
g.busMu.RLock()
defer g.busMu.RUnlock()
for _, ch := range g.bus {
// Marshal and unmarshal the context to both simulate only sending data
// that would be sent over a network bus and also break any relationship
// with the old context.
select {
case ch <- b:
default:
log.Printf("eventhorizon: publish queue full in local event bus")
}
}
return nil
}
// Closes all the open channels after handling is done.
func (g *Group) close() {
for _, ch := range g.bus {
close(ch)
}
g.bus = nil
}