/
hub.go
125 lines (100 loc) · 2.53 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package marvin
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"golang.org/x/sync/errgroup"
)
type Hub struct {
buses map[BusName]Bus
reactors map[ReactorName]Reactor
events chan Event
replies chan Reply
errs chan error
reactorChs map[ReactorName]chan Event
busChs map[BusName]chan Reply
}
func New() *Hub {
return &Hub{
events: make(chan Event),
errs: make(chan error),
reactors: make(map[ReactorName]Reactor),
buses: make(map[BusName]Bus),
replies: make(chan Reply),
reactorChs: make(map[ReactorName]chan Event),
busChs: make(map[BusName]chan Reply),
}
}
func (h *Hub) Run() error {
// Alright, so we're gonna set up a context here, and then an error group,
// which will run all the channels and reactors, so we'll shut down if any
// of them error out.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eg, ctx := errgroup.WithContext(ctx)
h.startComponents(ctx, eg)
go h.sigChan(ctx, cancel)
go h.ioLoop(ctx)
return eg.Wait()
}
func (h *Hub) startComponents(ctx context.Context, eg *errgroup.Group) {
for name, bus := range h.buses {
replyCh := make(chan Reply)
h.busChs[name] = replyCh
bb := BusBundle{
Events: h.events,
Replies: replyCh,
Errors: h.errs,
}
slog.Info("starting bus", "name", name)
eg.Go(h.wrapBusFunc(ctx, bus.Run, bb))
}
for name, reactor := range h.reactors {
slog.Info("starting reactor", "name", name)
evtCh := make(chan Event)
h.reactorChs[name] = evtCh
bundle := ReactorBundle{
Events: evtCh,
Replies: h.replies,
Errors: h.errs,
}
eg.Go(h.wrapReactorFunc(ctx, reactor.Run, bundle))
}
}
func (h *Hub) sigChan(ctx context.Context, cancel context.CancelFunc) {
// We're also going to set up a signal channel, so we can shut down on
// SIGINT or SIGKILL.
sigChan := make(chan os.Signal, 2)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
select {
case sig := <-sigChan:
slog.Info("shutting after catching signal", "signal", sig)
cancel()
case <-ctx.Done():
// just exit
}
}
func (h *Hub) ioLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case err := <-h.errs:
slog.Debug("caught non-fatal error", "err", err)
case event := <-h.events:
slog.LogAttrs(ctx, slog.LevelDebug,
"dispatching event",
slog.Uint64("id", event.ID()),
slog.String("text", event.Text),
)
event.setWatchdog(h.replies)
for _, ch := range h.reactorChs {
ch <- event
}
case reply := <-h.replies:
h.busChs[reply.Bus] <- reply
}
}
}