-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispatcher.go
117 lines (102 loc) · 3.11 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package gotgbot
import (
"encoding/json"
"runtime/debug"
"sort"
"go.uber.org/zap"
"github.com/veerpratap6870/gotgbot/ext"
)
// RawUpdate alias to json.RawMessage
type RawUpdate json.RawMessage
// Dispatcher Store data for the dispatcher to work as expected; such as the incoming update channel,
// handler mappings and maximum number of goroutines allowed to be run at once
type Dispatcher struct {
Bot *ext.Bot
MaxRoutines int
updates chan *RawUpdate
handlers map[int][]Handler
handlerGroups *[]int
}
const DefaultMaxDispatcherRoutines = 50
func NewDispatcher(bot *ext.Bot, updates chan *RawUpdate) *Dispatcher {
return &Dispatcher{
Bot: bot,
MaxRoutines: DefaultMaxDispatcherRoutines,
updates: updates,
handlers: map[int][]Handler{},
handlerGroups: &[]int{},
}
}
// Start Begin dispatching updates
func (d Dispatcher) Start() {
limiter := make(chan struct{}, d.MaxRoutines)
for upd := range d.updates {
select {
case limiter <- struct{}{}:
default:
// There is value in having this as a warn, but its also causing logspam... so let's not.
d.Bot.Logger.Debugf("update dispatcher has reached limit of %d", d.MaxRoutines)
limiter <- struct{}{} // make sure to send anyway
}
go func(upd *RawUpdate) {
d.ProcessRawUpdate(upd)
<-limiter
}(upd)
}
}
type EndGroups struct{}
type ContinueGroups struct{}
func (eg EndGroups) Error() string { return "Group iteration ended" }
func (eg ContinueGroups) Error() string { return "Group iteration has continued" }
func (d Dispatcher) ProcessRawUpdate(upd *RawUpdate) {
defer func() {
if r := recover(); r != nil {
d.Bot.Logger.Error(r)
debug.PrintStack()
}
}()
update, err := initUpdate(*upd, *d.Bot)
if err != nil {
d.Bot.Logger.Errorw("failed to init update while processing", zap.Error(err))
return
}
d.ProcessUpdate(update)
}
func (d Dispatcher) ProcessUpdate(update *Update) {
for _, groupNum := range *d.handlerGroups {
for _, handler := range d.handlers[groupNum] {
if res, err := handler.CheckUpdate(update); res {
err := handler.HandleUpdate(update, d)
if err != nil {
switch err.(type) {
case EndGroups:
return
case ContinueGroups:
continue
default:
d.Bot.Logger.Warnw("error handling update", err.Error())
}
}
break // move to next group
} else if err != nil {
d.Bot.Logger.Errorw("failed to check update while processing", zap.Error(err))
return
}
}
}
}
// AddHandler adds a new handler to the dispatcher. The dispatcher will call CheckUpdate() to see whether the handler
// should be executed, and then HandleUpdate() to execute it.
func (d Dispatcher) AddHandler(handler Handler) {
// *d.handlers = append(*d.handlers, handler)
d.AddHandlerToGroup(handler, 0)
}
// AddHandlerToGroup adds a handler to a specific group; lowest number will be processed first.
func (d Dispatcher) AddHandlerToGroup(handler Handler, group int) {
currHandlers, ok := d.handlers[group]
if !ok {
*d.handlerGroups = append(*d.handlerGroups, group)
sort.Ints(*d.handlerGroups)
}
d.handlers[group] = append(currHandlers, handler)
}