This repository has been archived by the owner on Feb 22, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
handle.go
374 lines (307 loc) · 9.41 KB
/
handle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
package client
import (
"bytes"
"encoding/json"
"errors"
"net/http"
"strings"
"time"
"github.com/seventv/api/data/events"
"github.com/seventv/common/utils"
"go.uber.org/zap"
"github.com/seventv/eventapi/internal/global"
)
func NewHandler(conn Connection) Handler {
return handler{conn}
}
type Handler interface {
Subscribe(gctx global.Context, m events.Message[json.RawMessage]) (error, bool)
Unsubscribe(gctx global.Context, m events.Message[json.RawMessage]) error
OnDispatch(gctx global.Context, msg events.Message[events.DispatchPayload])
OnResume(gctx global.Context, msg events.Message[json.RawMessage]) error
OnBridge(gctx global.Context, msg events.Message[json.RawMessage]) error
}
type handler struct {
conn Connection
}
const (
EVENT_TYPE_MAX_LENGTH = 64
SUBSCRIPTION_CONDITION_MAX = 10
SUBSCRIPTION_CONDITION_KEY_MAX_LENGTH = 64
SUBSCRIPTION_CONDITION_VALUE_MAX_LENGTH = 128
)
func (h handler) OnDispatch(gctx global.Context, msg events.Message[events.DispatchPayload]) {
var matches []uint32
if msg.Data.Whisper == "" {
// Filter by subscribed event types
ev, ok := h.conn.Events().Get(msg.Data.Type)
if !ok {
return // skip if not subscribed to this
}
matches = ev.Match(msg.Data.Conditions)
if len(matches) == 0 {
return
}
} else if msg.Data.Whisper != h.conn.SessionID() {
return // skip if event is whisper not for this session
}
// Dedupe
if msg.Data.Hash != nil {
ha := *msg.Data.Hash
if !h.conn.Cache().AddDispatch(ha) {
return // skip if already dispatched
}
}
// Handle effect
if msg.Data.Effect != nil {
for _, e := range msg.Data.Effect.AddSubscriptions {
_, ids, err := h.conn.Events().Subscribe(gctx, h.conn.Context(), e.Type, e.Condition, EventSubscriptionProperties{
TTL: utils.Ternary(e.TTL > 0, time.Now().Add(e.TTL), time.Time{}),
Auto: true,
})
if err != nil && !errors.Is(err, ErrAlreadySubscribed) {
zap.S().Errorw("failed to add subscription from dispatch",
"error", err,
)
}
// Handle TTL: remove the subscription after TTL
if e.TTL > 0 {
go func(ttl time.Duration, typ events.EventType, cond events.EventCondition) {
select {
case <-h.conn.Context().Done():
return
case <-time.After(ttl):
}
err = h.conn.Events().UnsubscribeWithID(ids)
if err != nil && !errors.Is(err, ErrNotSubscribed) {
zap.S().Errorw("failed to remove subscription from dispatch after TTL expire",
"error", err,
"ttl", ttl.Milliseconds(),
"type", typ,
"condition", cond,
)
}
}(e.TTL, e.Type, e.Condition)
}
}
for _, e := range msg.Data.Effect.RemoveSubscriptions {
_, err := h.conn.Events().Unsubscribe(gctx, e.Type, e.Condition)
if err != nil && !errors.Is(err, ErrNotSubscribed) {
zap.S().Errorw("failed to remove subscription from dispatch",
"error", err,
)
}
}
for _, ha := range msg.Data.Effect.RemoveHashes {
h.conn.Cache().ExpireDispatch(ha)
}
}
// connections with a buffer are dead connections where dispatches
// are being saved to allow for a graceful recovery via resuming
if h.conn.Buffer() != nil {
if err := h.conn.Buffer().Push(gctx, msg); err != nil {
zap.S().Errorw("failed to push dispatch to buffer",
"error", err,
)
}
return
}
msg.Data.Conditions = nil
msg.Data.Effect = nil
msg.Data.Hash = nil
msg.Data.Whisper = ""
msg.Data.Matches = matches
if err := h.conn.Write(msg.ToRaw()); err != nil {
zap.S().Errorw("failed to write dispatch to connection",
"error", err,
)
}
}
func (h handler) Subscribe(gctx global.Context, m events.Message[json.RawMessage]) (error, bool) {
msg, err := events.ConvertMessage[events.SubscribePayload](m)
if err != nil {
return err, false
}
t := msg.Data.Type
path := strings.Split(string(t), ".")
// Empty subscription event type
if t == "" {
h.conn.SendError("Missing event type", nil)
h.conn.SendClose(events.CloseCodeInvalidPayload, 0)
return nil, false
}
if len(path) < 2 {
h.conn.SendError("Bad event type path", nil)
h.conn.SendClose(events.CloseCodeInvalidPayload, 0)
return nil, false
}
// No targets: this requires authentication
if len(msg.Data.Condition) == 0 && h.conn.Actor() == nil {
h.conn.SendError("Wildcard event target subscription requires authentication", nil)
h.conn.SendClose(events.CloseCodeInsufficientPrivilege, 0)
return nil, false
}
// Too many subscriptions?
if h.conn.Events().Count() >= gctx.Config().API.SubscriptionLimit {
h.conn.SendError("Too Many Active Subscriptions!", nil)
h.conn.SendClose(events.CloseCodeRateLimit, 0)
return nil, false
}
// Validate: event type
if len(msg.Data.Type) > EVENT_TYPE_MAX_LENGTH {
h.conn.SendError("Event Type Too Large", map[string]any{
"event_type": msg.Data.Type,
"event_type_length": len(msg.Data.Type),
"event_type_length_most": EVENT_TYPE_MAX_LENGTH,
})
h.conn.SendClose(events.CloseCodeRateLimit, 0)
return nil, false
}
// Validate: condition
pos := -1
for k, v := range msg.Data.Condition {
pos++
if pos > SUBSCRIPTION_CONDITION_MAX {
h.conn.SendError("Subscription Condition Too Large", map[string]any{
"condition_keys": len(msg.Data.Condition),
"condition_keys_most": SUBSCRIPTION_CONDITION_MAX,
})
h.conn.SendClose(events.CloseCodeRateLimit, 0)
return nil, false
}
kL := len(k)
vL := len(v)
if kL > SUBSCRIPTION_CONDITION_KEY_MAX_LENGTH || vL > SUBSCRIPTION_CONDITION_VALUE_MAX_LENGTH {
h.conn.SendError("Subscription Condition Key Too Large", map[string]any{
"key": k,
"key_index": pos,
"value": v,
"key_length": kL,
"key_length_most": SUBSCRIPTION_CONDITION_KEY_MAX_LENGTH,
"value_length": vL,
"value_length_most": SUBSCRIPTION_CONDITION_VALUE_MAX_LENGTH,
})
h.conn.SendClose(events.CloseCodeRateLimit, 0)
return nil, false
}
}
// Add the event subscription
_, id, err := h.conn.Events().Subscribe(gctx, h.conn.Context(), t, msg.Data.Condition, EventSubscriptionProperties{})
if err != nil {
switch err {
case ErrAlreadySubscribed:
h.conn.SendError("Already subscribed to this event", nil)
h.conn.SendClose(events.CloseCodeAlreadySubscribed, 0)
return nil, false
default:
return err, false
}
}
_ = h.conn.SendAck(events.OpcodeSubscribe, utils.ToJSON(struct {
ID uint32 `json:"id"`
Type string `json:"type"`
Condition map[string]string `json:"condition"`
}{
ID: id,
Type: string(msg.Data.Type),
Condition: msg.Data.Condition,
}))
return nil, true
}
func (h handler) Unsubscribe(gctx global.Context, m events.Message[json.RawMessage]) error {
msg, err := events.ConvertMessage[events.UnsubscribePayload](m)
if err != nil {
return err
}
t := msg.Data.Type
if _, err = h.conn.Events().Unsubscribe(gctx, t, msg.Data.Condition); err != nil {
if err == ErrNotSubscribed {
h.conn.SendClose(events.CloseCodeNotSubscribed, 0)
return nil
}
return err
}
_ = h.conn.SendAck(events.OpcodeUnsubscribe, utils.ToJSON(struct {
Type string `json:"type"`
Condition map[string]string `json:"condition"`
}{
Type: string(msg.Data.Type),
Condition: msg.Data.Condition,
}))
return nil
}
func (h handler) OnResume(gctx global.Context, m events.Message[json.RawMessage]) error {
/*
msg, err := events.ConvertMessage[events.ResumePayload](m)
if err != nil {
return err
}
// Set up a new event buffer with the specified session ID
buf := NewEventBuffer(h.conn, msg.Data.SessionID, time.Minute)
messages, subs, err := buf.Recover(gctx)
subCount := 0
if err == nil {
// Reinstate subscriptions
for _, s := range subs {
for i := range s.Channel.ID {
cond := s.Channel.Conditions[i]
props := s.Channel.Properties[i]
_, _, err := h.conn.Events().Subscribe(gctx, h.conn.Context(), s.Type, cond, props)
if err != nil {
return err
}
subCount++
}
}
// Replay dispatches
for _, m := range messages {
h.OnDispatch(gctx, m)
}
} else {
h.conn.SendError("Resume Failed", map[string]any{
"error": err.Error(),
})
}*/
// Send ACK
_ = h.conn.SendAck(events.OpcodeResume, utils.ToJSON(struct {
Success bool `json:"success"`
DispatchesReplayed int `json:"dispatches_replayed"`
SubscriptionsRestored int `json:"subscriptions_restored"`
}{
Success: false,
DispatchesReplayed: 0,
SubscriptionsRestored: 0,
}))
/*
// Cleanup the redis data
if err = buf.Cleanup(gctx); err != nil {
zap.S().Errorw("failed to cleanup event buffer", "error", err)
}
*/
return nil
}
func (h handler) OnBridge(gctx global.Context, m events.Message[json.RawMessage]) error {
msg, err := events.ConvertMessage[events.BridgedCommandPayload[json.RawMessage]](m)
if err != nil {
return err
}
msg.Data.SessionID = h.conn.SessionID()
b, err := json.Marshal(msg.Data)
if err != nil {
return err
}
res, err := http.DefaultClient.Post(gctx.Config().API.BridgeURL, "application/json", bytes.NewReader(b))
if err != nil {
zap.S().Errorw("failed to bridge event", "error", err)
return err
}
var messages []events.Message[events.DispatchPayload]
if err = json.NewDecoder(res.Body).Decode(&messages); err != nil {
zap.S().Errorw("failed to decode bridged event", "error", err)
return err
}
for _, m := range messages {
h.OnDispatch(gctx, m)
}
return nil
}