/
tracks_manager.go
129 lines (103 loc) · 3.31 KB
/
tracks_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package sfu
import (
"sync"
"github.com/juju/errors"
"github.com/peer-calls/peer-calls/v4/server/identifiers"
"github.com/peer-calls/peer-calls/v4/server/logger"
"github.com/peer-calls/peer-calls/v4/server/pubsub"
"github.com/peer-calls/peer-calls/v4/server/transport"
)
const DataChannelName = "data"
type TracksManager struct {
log logger.Logger
mu sync.RWMutex
peerManagers map[identifiers.RoomID]*PeerManager
jitterBufferEnabled bool
}
func NewTracksManager(log logger.Logger, jitterBufferEnabled bool) *TracksManager {
return &TracksManager{
log: log.WithNamespaceAppended("tracks_manager"),
peerManagers: map[identifiers.RoomID]*PeerManager{},
jitterBufferEnabled: jitterBufferEnabled,
}
}
// Add adds a transport to the existing PeerManager. If the manager does not
// exist, it is created.
//
// NOTE: rooms are created when the peer joins the room over the WebSocket
// connection. The component in charge for this is the RoomManager.
//
// Add is called from two places:
// - When WebRTCTransports are created and peers join the room, or
// - When RoomManager event that a room was created: A server transport will
// be created for each configured node.
func (m *TracksManager) Add(room identifiers.RoomID, tr transport.Transport) (<-chan pubsub.PubTrackEvent, error) {
m.mu.Lock()
defer m.mu.Unlock()
log := m.log.WithCtx(logger.Ctx{
"room_id": room,
})
peerManager, ok := m.peerManagers[room]
if !ok {
log.Info("Add peer manager", nil)
jitterHandler := NewJitterHandler(
log,
m.jitterBufferEnabled,
)
peerManager = NewPeerManager(room, log, jitterHandler)
m.peerManagers[room] = peerManager
}
log = log.WithCtx(logger.Ctx{
"client_id": tr.ClientID(),
})
log.Info("Add peer", nil)
pubTrackEventsCh, err := peerManager.Add(tr)
if err != nil {
return nil, errors.Annotatef(err, "add transport")
}
go func() {
<-tr.Done()
m.mu.Lock()
defer m.mu.Unlock()
// Note: if this transport was already replaced in a previous call to Add,
// Remove won't actually do anything - it will just return an error, but
// there's no need to handle it.
if err := peerManager.Remove(tr); err != nil {
log.Error("Remove peer", errors.Trace(err), nil)
} else {
log.Info("Remove peer", nil)
}
// Since the server transports are created when room is created, and
// removed when a room is removed, we don't need to do anything special
// to count the number of non-server peers here.
//
// It is fine to check for the size because peerManager is only ever
// modified from this component, and we are under a lock.
if peerManager.Size() == 0 {
log.Info("Remove peer manager", nil)
peerManager.Close()
delete(m.peerManagers, room)
}
}()
return pubTrackEventsCh, nil
}
func (m *TracksManager) Sub(params SubParams) error {
m.mu.Lock()
defer m.mu.Unlock()
peerManager, ok := m.peerManagers[params.Room]
if !ok {
return errors.Errorf("room not found: %s", params.Room)
}
err := peerManager.Sub(params)
return errors.Trace(err)
}
func (m *TracksManager) Unsub(params SubParams) error {
m.mu.Lock()
defer m.mu.Unlock()
peerManager, ok := m.peerManagers[params.Room]
if !ok {
return errors.Errorf("room not found: %s", params.Room)
}
err := peerManager.Unsub(params)
return errors.Trace(err)
}