-
Notifications
You must be signed in to change notification settings - Fork 41
/
wsHub.go
169 lines (151 loc) · 4.83 KB
/
wsHub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package api
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/getsentry/sentry-go"
"github.com/gin-gonic/gin"
"github.com/joschahenningsen/TUM-Live/dao"
"github.com/joschahenningsen/TUM-Live/tools"
"github.com/joschahenningsen/TUM-Live/tools/realtime"
log "github.com/sirupsen/logrus"
"strconv"
"strings"
"sync"
"time"
)
var wsMapLock sync.RWMutex
var sessionsMap = map[uint][]*sessionWrapper{}
const (
TypeServerInfo = "info"
TypeServerWarn = "warn"
TypeServerErr = "error"
)
type sessionWrapper struct {
session *realtime.Context
isAdminOfCourse bool
}
var connHandler = func(context *realtime.Context) {
foundContext, exists := context.Get("TUMLiveContext") // get gin context
if !exists {
sentry.CaptureException(errors.New("context should exist but doesn't"))
return
}
tumLiveContext := foundContext.(tools.TUMLiveContext)
isAdmin := false
if tumLiveContext.User != nil {
isAdmin = tumLiveContext.User.IsAdminOfCourse(*tumLiveContext.Course)
}
sessionData := sessionWrapper{context, isAdmin}
wsMapLock.Lock()
sessionsMap[tumLiveContext.Stream.ID] = append(sessionsMap[tumLiveContext.Stream.ID], &sessionData)
wsMapLock.Unlock()
msg, _ := json.Marshal(gin.H{"viewers": len(sessionsMap[tumLiveContext.Stream.ID])})
err := context.Send(msg)
if err != nil {
log.WithError(err).Error("can't write initial stats to session")
}
var uid uint = 0
if tumLiveContext.User != nil {
uid = tumLiveContext.User.ID
}
if tumLiveContext.Course.ChatEnabled {
sendServerMessageWithBackoff(context, uid, tumLiveContext.Stream.ID, "Welcome to the chatroom! Please be nice to each other and stay on topic if you want this feature to stay active.", TypeServerInfo)
}
if !tumLiveContext.Course.AnonymousChatEnabled && tumLiveContext.Course.ChatEnabled {
sendServerMessageWithBackoff(context, uid, tumLiveContext.Stream.ID, "The broadcaster disabled anonymous messaging for this stream.", TypeServerWarn)
}
}
// sendServerMessageWithBackoff sends a message to the client(if it didn't send a message to this user in the last 10 Minutes and the client is logged in)
func sendServerMessageWithBackoff(session *realtime.Context, userId uint, streamId uint, msg string, t string) {
if userId == 0 {
return
}
cacheKey := fmt.Sprintf("shouldSendServerMsg_%d_%d", userId, streamId)
// if the user has sent a message in the last 10 Minutes, don't send a message
_, shouldSkip := tools.GetCacheItem(cacheKey)
if shouldSkip {
return
}
msgBytes, _ := json.Marshal(gin.H{"server": msg, "type": t})
err := session.Send(msgBytes)
if err != nil {
log.WithError(err).Error("can't write server message to session")
}
// set cache item with ttl, so the user won't get a message for 10 Minutes
tools.SetCacheItem(cacheKey, true, time.Minute*10)
}
// sendServerMessage sends a server message to the client(s)
func sendServerMessage(msg string, t string, sessions ...*realtime.Context) {
msgBytes, _ := json.Marshal(gin.H{"server": msg, "type": t})
for _, session := range sessions {
err := session.Send(msgBytes)
if err != nil {
log.WithError(err).Error("can't write server message to session")
}
}
}
func BroadcastStats(streamsDao dao.StreamsDao) {
for sID, sessions := range sessionsMap {
if len(sessions) == 0 {
continue
}
stream, err := streamsDao.GetStreamByID(context.Background(), fmt.Sprintf("%d", sID))
if err != nil || stream.Recording {
continue
}
msg, _ := json.Marshal(gin.H{"viewers": len(sessions)})
broadcastStream(sID, msg)
}
}
func cleanupSessions() {
for id, sessions := range sessionsMap {
roomName := strings.Replace(ChatRoomName, ":streamID", strconv.Itoa(int(id)), -1)
var newSessions []*sessionWrapper
for i, session := range sessions {
if RealtimeInstance.IsSubscribed(roomName, session.session.Client.Id) {
newSessions = append(newSessions, sessions[i])
}
}
wsMapLock.Lock()
sessionsMap[id] = newSessions
wsMapLock.Unlock()
}
}
func broadcastStream(streamID uint, msg []byte) {
sessions, f := sessionsMap[streamID]
if !f {
return
}
wsMapLock.Lock()
sessions = removeClosed(sessions)
wsMapLock.Unlock()
for _, wrapper := range sessions {
_ = wrapper.session.Send(msg) // ignore "session closed" error, nothing we can do about it at this point
}
}
func broadcastStreamToAdmins(streamID uint, msg []byte) {
sessions, f := sessionsMap[streamID]
if !f {
return
}
wsMapLock.Lock()
sessions = removeClosed(sessions)
wsMapLock.Unlock()
for _, wrapper := range sessions {
if wrapper.isAdminOfCourse {
_ = wrapper.session.Send(msg)
}
}
}
// removeClosed removes session where IsClosed() is true.
func removeClosed(sessions []*sessionWrapper) []*sessionWrapper {
var newSessions []*sessionWrapper
for _, wrapper := range sessions {
if RealtimeInstance.IsConnected(wrapper.session.Client.Id) {
newSessions = append(newSessions, wrapper)
}
}
return newSessions
}