-
Notifications
You must be signed in to change notification settings - Fork 10
/
call.go
225 lines (195 loc) · 6.11 KB
/
call.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright (c) 2022-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package rtc
import (
"fmt"
"sync"
"github.com/pion/webrtc/v3"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
type call struct {
id string
sessions map[string]*session
screenSession *session
metrics Metrics
mut sync.RWMutex
}
func (c *call) getSession(sessionID string) *session {
c.mut.RLock()
defer c.mut.RUnlock()
return c.sessions[sessionID]
}
func (c *call) addSession(cfg SessionConfig, rtcConn *webrtc.PeerConnection, closeCb func() error, log mlog.LoggerIFace) (*session, bool) {
c.mut.Lock()
defer c.mut.Unlock()
if s := c.sessions[cfg.SessionID]; s != nil {
return s, false
}
s := &session{
cfg: cfg,
rtcConn: rtcConn,
iceInCh: make(chan []byte, signalChSize*2),
sdpOfferInCh: make(chan webrtc.SessionDescription, signalChSize),
sdpAnswerInCh: make(chan webrtc.SessionDescription, signalChSize),
closeCh: make(chan struct{}),
closeCb: closeCb,
tracksCh: make(chan trackActionContext, tracksChSize),
outScreenTracks: make(map[string]*webrtc.TrackLocalStaticRTP),
remoteScreenTracks: make(map[string]*webrtc.TrackRemote),
screenRateMonitors: make(map[string]*RateMonitor),
log: log,
call: c,
}
c.sessions[cfg.SessionID] = s
return s, true
}
func (c *call) getScreenSession() *session {
c.mut.RLock()
defer c.mut.RUnlock()
return c.screenSession
}
func (c *call) setScreenSession(s *session) bool {
c.mut.Lock()
defer c.mut.Unlock()
if c.screenSession == nil {
c.screenSession = s
return true
}
return false
}
func (c *call) iterSessions(cb func(s *session)) {
c.mut.RLock()
defer c.mut.RUnlock()
for _, session := range c.sessions {
cb(session)
}
}
func (c *call) clearScreenState(screenSession *session) error {
c.mut.Lock()
defer c.mut.Unlock()
if screenSession == nil {
return fmt.Errorf("screenSession should not be nil")
}
if c.screenSession == nil {
return fmt.Errorf("call.screenSession should not be nil")
}
if c.screenSession != screenSession {
return fmt.Errorf("screenSession mismatch, call.screenSession=%s, screenSession=%s",
c.screenSession.cfg.SessionID, screenSession.cfg.SessionID)
}
for _, s := range c.sessions {
s.mut.Lock()
if s == c.screenSession {
s.clearScreenState()
c.screenSession = nil
} else if s.screenTrackSender != nil {
select {
case s.tracksCh <- trackActionContext{action: trackActionRemove, track: s.screenTrackSender.Track()}:
default:
s.log.Error("failed to send screen track: channel is full", mlog.String("sessionID", s.cfg.SessionID))
}
s.screenTrackSender = nil
}
s.mut.Unlock()
}
return nil
}
// handleSessionClose cleans up resources such as senders or receivers for the
// closing session.
// NOTE: this is expected to always be called under lock (call.mut).
func (c *call) handleSessionClose(us *session) {
us.log.Debug("handleSessionClose", mlog.String("sessionID", us.cfg.SessionID))
us.mut.Lock()
defer us.mut.Unlock()
cleanUp := func(sessionID string, sender *webrtc.RTPSender, track webrtc.TrackLocal) {
if isValidTrackID(track.ID()) {
c.metrics.DecRTPTracks(us.cfg.GroupID, "out", getTrackType(track.Kind()))
} else {
us.log.Warn("invalid track ID",
mlog.String("sessionID", sessionID),
mlog.String("trackID", track.ID()),
mlog.Any("trackKind", track.Kind()))
}
if err := sender.ReplaceTrack(nil); err != nil {
us.log.Error("failed to replace track on sender",
mlog.String("sessionID", sessionID),
mlog.String("trackID", track.ID()))
}
if err := sender.Stop(); err != nil {
us.log.Error("failed to stop sender for track",
mlog.String("sessionID", sessionID),
mlog.String("trackID", track.ID()))
}
}
// If the session getting closed was screen sharing we need to do some extra
// cleanup.
if us == c.screenSession {
c.screenSession = nil
for _, ss := range c.sessions {
if ss.cfg.SessionID == us.cfg.SessionID {
continue
}
ss.mut.Lock()
ss.screenTrackSender = nil
ss.mut.Unlock()
}
}
// First we cleanup any track the closing session may have been receiving and stop
// the associated senders.
for _, sender := range us.rtcConn.GetSenders() {
if track := sender.Track(); track != nil {
us.log.Debug("cleaning up out track on receiver",
mlog.String("sessionID", us.cfg.SessionID),
mlog.String("trackID", track.ID()),
)
cleanUp(us.cfg.SessionID, sender, track)
}
}
// We check whether the closing session was also sending any track
// (e.g. voice, screen).
outTracks := map[string]bool{}
if us.outVoiceTrack != nil {
outTracks[us.outVoiceTrack.ID()] = true
}
if us.outScreenAudioTrack != nil {
outTracks[us.outScreenAudioTrack.ID()] = true
}
for _, track := range us.outScreenTracks {
outTracks[track.ID()] = true
}
// Nothing left to do if the closing session wasn't sending anything.
if len(outTracks) == 0 {
us.log.Debug("no out tracks to cleanup, returning",
mlog.String("sessionID", us.cfg.SessionID))
return
}
// We finally go ahead and cleanup any tracks that the closing session may
// have been sending to other connected sessions.
for _, ss := range c.sessions {
if ss.cfg.SessionID == us.cfg.SessionID {
continue
}
ss.mut.Lock()
for _, sender := range ss.rtcConn.GetSenders() {
if track := sender.Track(); track != nil && outTracks[track.ID()] {
us.log.Debug("cleaning up out track on sender",
mlog.String("senderID", us.cfg.SessionID),
mlog.String("sessionID", ss.cfg.SessionID),
mlog.String("trackID", track.ID()),
)
// If it's a screen sharing track we should remove it as we normally would when
// sharing ends.
if track.Kind() == webrtc.RTPCodecTypeVideo {
select {
case ss.tracksCh <- trackActionContext{action: trackActionRemove, track: track}:
default:
ss.log.Error("failed to send screen track: channel is full", mlog.String("sessionID", ss.cfg.SessionID))
}
} else {
cleanUp(ss.cfg.SessionID, sender, track)
}
}
}
ss.mut.Unlock()
}
}