-
Notifications
You must be signed in to change notification settings - Fork 1
/
hub.go
287 lines (254 loc) · 10.8 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package sessions
import (
"context"
"errors"
"time"
"github.com/google/uuid"
"github.com/pion/webrtc/v3"
"github.com/shigde/sfu/internal/metric"
"github.com/shigde/sfu/internal/rtp"
"golang.org/x/exp/slog"
)
var (
errHubAlreadyClosed = errors.New("Hub was already closed")
errHubDispatchTimeOut = errors.New("Hub dispatch timeout")
hubDispatchTimeout = 3 * time.Second
)
type liveStreamSender interface {
AddTrack(track webrtc.TrackLocal)
RemoveTrack(track webrtc.TrackLocal)
}
type Hub struct {
ctx context.Context
LiveStreamId uuid.UUID
sessionRepo *SessionRepository
sender liveStreamSender
reqChan chan *hubRequest
tracks map[string]*rtp.TrackInfo // trackID --> TrackInfo
metricNodes map[string]metric.GraphNode // sessionId --> metric Node
hubMetricNode metric.GraphNode
}
func NewHub(ctx context.Context, sessionRepo *SessionRepository, liveStream uuid.UUID, sender liveStreamSender) *Hub {
tracks := make(map[string]*rtp.TrackInfo)
metricNodes := make(map[string]metric.GraphNode)
requests := make(chan *hubRequest)
hubMetricNode := metric.GraphNodeUpdate(metric.BuildNode(liveStream.String(), liveStream.String(), "Hub"))
hub := &Hub{
ctx,
liveStream,
sessionRepo,
sender,
requests,
tracks,
metricNodes,
hubMetricNode,
}
go hub.run()
go func(done <-chan struct{}, node metric.GraphNode) {
<-done
metric.GraphNodeDelete(node)
}(ctx.Done(), hubMetricNode)
return hub
}
func (h *Hub) run() {
slog.Info("lobby.Hub: run")
for {
select {
case trackEvent := <-h.reqChan:
switch trackEvent.kind {
case addTrack:
h.onAddTrack(trackEvent)
case removeTrack:
h.onRemoveTrack(trackEvent)
case getTrackList:
h.onGetTrackList(trackEvent)
case muteTrack:
h.onMuteTrack(trackEvent)
}
case <-h.ctx.Done():
slog.Info("lobby.Hub: closed Hub")
return
}
}
}
func (h *Hub) DispatchAddTrack(ctx context.Context, track *rtp.TrackInfo) {
select {
case h.reqChan <- &hubRequest{ctx: ctx, kind: addTrack, track: track}:
slog.Debug("lobby.Hub: dispatch add track", "streamId", track.GetTrackLocal().StreamID(), "track", track.GetTrackLocal().ID(), "kind", track.GetTrackLocal().Kind(), "purpose", track.Purpose.ToString())
case <-h.ctx.Done():
slog.Warn("lobby.Hub: dispatch add track even on closed Hub")
case <-time.After(hubDispatchTimeout):
slog.Error("lobby.Hub: dispatch add track - interrupted because dispatch timeout")
}
}
func (h *Hub) DispatchRemoveTrack(ctx context.Context, track *rtp.TrackInfo) {
select {
case h.reqChan <- &hubRequest{ctx: ctx, kind: removeTrack, track: track}:
slog.Debug("lobby.Hub: dispatch remove track", "streamId", track.GetTrackLocal().StreamID(), "track", track.GetTrackLocal().ID(), "kind", track.GetTrackLocal().Kind(), "purpose", track.Purpose.ToString())
case <-h.ctx.Done():
slog.Warn("lobby.Hub: dispatch remove track even on closed Hub")
case <-time.After(hubDispatchTimeout):
slog.Error("lobby.Hub: dispatch remove track - interrupted because dispatch timeout")
}
}
func (h *Hub) DispatchMuteTrack(ctx context.Context, track *rtp.TrackInfo) {
select {
case h.reqChan <- &hubRequest{ctx: ctx, kind: muteTrack, track: track}:
slog.Debug("lobby.Hub: dispatch mute track", "id", track.GetId(), "purpose", track.Purpose.ToString())
case <-h.ctx.Done():
slog.Warn("lobby.Hub: dispatch mute track even on closed Hub")
case <-time.After(hubDispatchTimeout):
slog.Error("lobby.Hub: dispatch mute track - interrupted because dispatch timeout")
}
}
// getTrackList Is called from the Egress endpoints when the connection is established.
// In ths wax the egress endpoints can receive the current tracks of the lobby
// The session set this methode as callback to the egress egress
func (h *Hub) getTrackList(ctx context.Context, sessionId uuid.UUID, filters ...filterHubTracks) ([]*rtp.TrackInfo, error) {
var hubList []*rtp.TrackInfo
trackListChan := make(chan []*rtp.TrackInfo)
select {
case h.reqChan <- &hubRequest{ctx: ctx, kind: getTrackList, trackListChan: trackListChan}:
case <-h.ctx.Done():
slog.Warn("lobby.Hub: get track list on closed Hub")
return nil, errHubAlreadyClosed
case <-time.After(hubDispatchTimeout):
slog.Error("lobby.Hub: get track list - interrupted because dispatch timeout")
return nil, errHubDispatchTimeOut
}
select {
case hubList = <-trackListChan:
case <-h.ctx.Done():
slog.Warn("lobby.Hub: get track list on closed Hub")
case <-time.After(hubDispatchTimeout):
slog.Error("lobby.Hub: get track list - interrupted because dispatch timeout")
}
list := make([]*rtp.TrackInfo, 0)
for _, track := range hubList {
// If we have no filter, we can add the track to the list
if len(filters) == 0 {
list = append(list, track)
h.increaseNodeGraphStats(sessionId.String(), rtp.EgressEndpoint, track.Purpose)
continue
}
// Check filter
canAddTrackToList := true
for _, f := range filters {
// If one filter not be true we will not add the track to the list
if !f(track) {
canAddTrackToList = false
break
}
}
if canAddTrackToList {
list = append(list, track)
h.increaseNodeGraphStats(sessionId.String(), rtp.EgressEndpoint, track.Purpose)
}
}
return list, nil
}
func (h *Hub) onAddTrack(event *hubRequest) {
slog.Debug("lobby.Hub: add track", "sourceSessionId", event.track.SessionId, "streamId", event.track.GetTrackLocal().StreamID(), "track", event.track.GetTrackLocal().ID(), "kind", event.track.GetTrackLocal().Kind(), "purpose", event.track.Purpose.ToString())
h.increaseNodeGraphStats(event.track.SessionId.String(), rtp.IngressEndpoint, event.track.Purpose)
h.hubMetricNode = metric.GraphNodeUpdateInc(h.hubMetricNode, event.track.Purpose.ToString())
if event.track.GetPurpose() == rtp.PurposeMain {
slog.Debug("lobby.Hub: add live track ro sender", "streamId", event.track.GetTrackLocal().StreamID(), "track", event.track.GetTrackLocal().ID(), "kind", event.track.GetTrackLocal().Kind(), "purpose", event.track.Purpose.ToString())
h.sender.AddTrack(event.track.GetTrackLocal())
}
h.tracks[event.track.GetTrackLocal().ID()] = event.track
h.sessionRepo.Iter(func(s *Session) {
// If a session has just been created, this call blocks for seconds.
// This is because the ice gathering sometimes takes seconds. That's why we don't block the call
if !s.initComplete() {
return
}
slog.Debug("bug-1: hub-add", "session", s.Id, "trackId", event.track.GetTrackLocal().ID(), "kind", event.track.GetTrackLocal().Kind())
if filterForSession(s.Id)(event.track) {
slog.Debug("lobby.Hub: add egress track to session", "sessionId", s.Id, "sourceSessionId", event.track.SessionId, "streamId", event.track.GetTrackLocal().StreamID(), "track", event.track.GetTrackLocal().ID(), "kind", event.track.GetTrackLocal().Kind(), "purpose", event.track.Purpose.ToString())
s.addTrack(event.ctx, event.track)
}
})
}
func (h *Hub) onRemoveTrack(event *hubRequest) {
slog.Debug("lobby.Hub: remove track", "sourceSessionId", event.track.SessionId, "streamId", event.track.GetTrackLocal().StreamID(), "track", event.track.GetTrackLocal().ID(), "kind", event.track.GetTrackLocal().Kind(), "purpose", event.track.Purpose.ToString())
h.hubMetricNode = metric.GraphNodeUpdateDec(h.hubMetricNode, event.track.Purpose.ToString())
h.decreaseNodeGraphStats(event.track.SessionId.String(), rtp.IngressEndpoint, event.track.Purpose)
if event.track.GetPurpose() == rtp.PurposeMain {
h.sender.RemoveTrack(event.track.GetTrackLocal())
}
if _, ok := h.tracks[event.track.GetTrackLocal().ID()]; ok {
delete(h.tracks, event.track.GetTrackLocal().ID())
}
h.sessionRepo.Iter(func(s *Session) {
// If a session has just been created, this call blocks for seconds.
// This is because the ice gathering sometimes takes seconds. That's why we don't block the call
if !s.initComplete() {
return
}
if filterForSession(s.Id)(event.track) {
slog.Debug("lobby.Hub: remove egress track from session", "sessionId", s.Id, "sourceSessionId", event.track.SessionId, "streamId", event.track.GetTrackLocal().StreamID(), "track", event.track.GetTrackLocal().ID(), "kind", event.track.GetTrackLocal().Kind(), "purpose", event.track.Purpose.ToString())
slog.Debug("bug-1: hub-remove", "session", s.Id, "trackId", event.track.GetTrackLocal().ID(), "kind", event.track.GetTrackLocal().Kind())
s.removeTrack(event.ctx, event.track)
h.decreaseNodeGraphStats(s.Id.String(), rtp.EgressEndpoint, event.track.Purpose)
}
})
}
func (h *Hub) onGetTrackList(event *hubRequest) {
list := make([]*rtp.TrackInfo, 0, len(h.tracks))
for _, track := range h.tracks {
list = append(list, track)
}
select {
case event.trackListChan <- list:
case <-h.ctx.Done():
slog.Warn("lobby.Hub: onGetTrackList on closed Hub")
case <-time.After(hubDispatchTimeout):
slog.Error("lobby.Hub: onGetTrackList - interrupted because dispatch timeout")
}
}
func (h *Hub) onMuteTrack(event *hubRequest) {
slog.Debug("lobby.Hub: mute track", "sourceSessionId", event.track.SessionId, "streamId", "purpose", event.track.Purpose.ToString())
h.sessionRepo.Iter(func(s *Session) {
if filterForSession(s.Id)(event.track) {
slog.Debug("lobby.Hub: mute egress track from session", "sessionId", s.Id, "sourceSessionId", event.track.SessionId, event.track.Purpose.ToString())
go func(session *Session) {
// If a session has just been created, this call blocks for seconds.
// This is because the ice gathering sometimes takes seconds. That's why we don't block the call
session.muteTrack(event.ctx, event.track)
}(s)
}
})
}
func (h *Hub) increaseNodeGraphStats(sessionId string, endpointType rtp.EndpointType, purpose rtp.Purpose) {
index := endpointType.ToString() + sessionId
metricNode, ok := h.metricNodes[index]
if !ok {
// if metric not found create the egress and the edge from egress to lobby
metricNode = metric.BuildNode(sessionId, h.LiveStreamId.String(), endpointType.ToString())
metric.GraphAddEdge(sessionId, h.LiveStreamId.String(), endpointType.ToString())
}
h.metricNodes[index] = metric.GraphNodeUpdateInc(metricNode, purpose.ToString())
}
func (h *Hub) decreaseNodeGraphStats(sessionId string, endpointType rtp.EndpointType, purpose rtp.Purpose) {
index := endpointType.ToString() + sessionId
if metricNode, ok := h.metricNodes[index]; ok {
metricNode = metric.GraphNodeUpdateDec(metricNode, purpose.ToString())
if metricNode.Tracks == 0 && metricNode.MainTracks == 0 {
metric.GraphDeleteEdge(sessionId, h.LiveStreamId.String(), endpointType.ToString())
delete(h.metricNodes, index)
return
}
h.metricNodes[index] = metricNode
}
}
type filterHubTracks func(*rtp.TrackInfo) bool
func filterForSession(sessionId uuid.UUID) filterHubTracks {
return func(track *rtp.TrackInfo) bool {
return sessionId.String() != track.GetSessionId().String()
}
}
func filterForNotMain() filterHubTracks {
return func(track *rtp.TrackInfo) bool {
return track.Purpose != rtp.PurposeMain
}
}