-
Notifications
You must be signed in to change notification settings - Fork 42
/
websocket.go
95 lines (80 loc) · 2.69 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright (c) 2019-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package simulcontroller
import (
"errors"
"fmt"
"runtime"
"sync"
"github.com/mattermost/mattermost-server/v6/model"
)
// wsEventHandler listens for WebSocket events to be handled.
// This is used to model user behaviour by responding to certain events with
// the appropriate actions. It differs from userentity.wsEventHandler which is
// instead used to manage the internal user state.
func (c *SimulController) wsEventHandler(wg *sync.WaitGroup) {
semCount := runtime.NumCPU() * 8
semaphore := make(chan struct{}, semCount)
defer func() {
for i := 0; i < semCount; i++ {
semaphore <- struct{}{}
}
wg.Done()
}()
for ev := range c.user.Events() {
switch ev.EventType() {
case model.WebsocketEventTyping:
userId, ok := ev.GetData()["user_id"].(string)
if !ok || userId == "" {
c.status <- c.newErrorStatus(errors.New("simulcontroller: invalid data found in event data"))
break
}
user, err := c.user.Store().GetUser(userId)
if err != nil {
c.status <- c.newErrorStatus(fmt.Errorf("simulcontroller: GetUser failed %w", err))
break
}
// The user was found, we check if we have the status for it.
if user.Id != "" {
status, err := c.user.Store().Status(userId)
if err != nil {
c.status <- c.newErrorStatus(fmt.Errorf("simulcontroller: Status failed %w", err))
break
}
// If we can't find the user status in the store we fetch it.
if status.UserId == "" {
select {
case semaphore <- struct{}{}:
go fetchStatus(c, semaphore, user.Id)
default:
c.status <- c.newErrorStatus(errors.New("simulcontroller: dropping call"))
}
}
break
}
// We couldn't find the user so we fetch it and its status.
select {
case semaphore <- struct{}{}:
go fetchUserAndStatus(c, semaphore, userId)
default:
c.status <- c.newErrorStatus(errors.New("simulcontroller: dropping call"))
}
}
}
}
func fetchStatus(c *SimulController, sem chan struct{}, id string) {
defer func() { <-sem }()
if err := c.user.GetUsersStatusesByIds([]string{id}); err != nil {
c.status <- c.newErrorStatus(fmt.Errorf("simulcontroller: GetUsersStatusesByIds failed %w", err))
}
}
func fetchUserAndStatus(c *SimulController, sem chan struct{}, id string) {
defer func() { <-sem }()
if _, err := c.user.GetUsersByIds([]string{id}); err != nil {
c.status <- c.newErrorStatus(fmt.Errorf("simulcontroller: GetUsersByIds failed %w", err))
return
}
if err := c.user.GetUsersStatusesByIds([]string{id}); err != nil {
c.status <- c.newErrorStatus(fmt.Errorf("simulcontroller: GetUsersStatusesByIds failed %w", err))
}
}