-
Notifications
You must be signed in to change notification settings - Fork 1
/
signal.go
148 lines (126 loc) · 4.48 KB
/
signal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package sessions
import (
"context"
"errors"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/pion/webrtc/v3"
"github.com/shigde/sfu/internal/lobby/clients"
"github.com/shigde/sfu/internal/rtp"
"github.com/shigde/sfu/pkg/message"
"golang.org/x/exp/slog"
)
var errTimeoutByWaitingForMessenger = errors.New("timeout by waiting for messenger")
var errSessionClosedByWaitingForMessenger = errors.New("session closed by waiting for messenger")
var waitingTimeOut = 10 * time.Second
type signal struct {
id uuid.UUID
sessionCtx context.Context
session uuid.UUID
user uuid.UUID
offerer *rtp.Endpoint // The offerer is always an egress endpoint or nil
answerer *rtp.Endpoint // The answerer is always an ingress endpoint or nil
onMuteCbk func(_ *message.Mute)
messenger *clients.Messenger
offerNumber atomic.Uint32
receivedMessenger chan struct{}
}
func newSignal(sessionCtx context.Context, session uuid.UUID, user uuid.UUID) *signal {
return &signal{
id: uuid.New(),
sessionCtx: sessionCtx,
session: session,
user: user,
receivedMessenger: make(chan struct{}),
}
}
func (s *signal) OnSenderChannel(channel *webrtc.DataChannel) {
slog.Debug("signal: get signal channel and create messenger", "sessionId", s.session, "userId", s.user)
s.messenger = clients.NewMessenger(channel)
// we register this signaler for datachannel messages after we received a webrtc channel
s.messenger.Register(s)
s.stopWaitingForMessenger()
}
func (s *signal) OnSilentChannel(_ *webrtc.DataChannel) {
// we crete an egress data channel because we do not want munging the sdp in case of not added tracks to egress egress
}
func (s *signal) stopWaitingForMessenger() {
select {
case <-s.receivedMessenger:
default:
close(s.receivedMessenger)
<-s.receivedMessenger
}
}
func (s *signal) setAnswerer(endpoint *rtp.Endpoint) {
s.answerer = endpoint
}
func (s *signal) setOfferer(endpoint *rtp.Endpoint) {
s.offerer = endpoint
s.offerNumber.Store(0)
}
func (s *signal) OnNegotiationNeeded(offer webrtc.SessionDescription) {
if _, err := s.messenger.SendOffer(&offer, s.nextOffer()); err != nil {
slog.Error("lobby.sessionEgressHandler: on negotiated was trigger with error", "err", err, "sessionId", s.session, "user", s.user)
}
}
func (s *signal) OnAnswer(sdp *webrtc.SessionDescription, number uint32) {
if s.offerer == nil {
slog.Warn("lobby.signal: no offerer exists to get this answer onAnswer", "number", number, "sessionId", s.session, "user", s.user)
return
}
// ignore if offer outdated
current := s.currentOffer()
if current != number {
slog.Debug("lobby.signal: onAnswer ignore", "number", number, "currentNumber", current, "sessionId", s.session, "userId", s.user)
return
}
slog.Debug("lobby.signal: onAnswer set", "number", number, "currentNumber", current, "sessionId", s.session, "user", s.user)
if err := s.offerer.SetAnswer(sdp); err != nil {
slog.Error("lobby.signal: on answer was trigger with error", "err", err, "sessionId", s.session, "userId", s.user)
}
s.offerer.SetInitComplete()
}
func (s *signal) OnOffer(sdp *webrtc.SessionDescription, responseId uint32, number uint32) {
slog.Debug("lobby.signal: onAnswer set", "number", number, "sessionId", s.session, "user", s.user)
if s.answerer == nil {
slog.Warn("lobby.signal: no answerer exists to answer this offer onOffer", "number", number, "sessionId", s.session, "user", s.user)
return
}
answer, err := s.answerer.SetNewOffer(sdp)
if err != nil {
slog.Error("lobby.signal: on answer was trigger with error", "err", err, "sessionId", s.session, "userId", s.user)
}
if _, err := s.messenger.SendAnswer(answer, responseId, number); err != nil {
slog.Error("lobby.signal: on answer was trigger with error", "err", err, "sessionId", s.session, "userId", s.user)
}
}
func (s *signal) OnMute(mute *message.Mute) {
if s.onMuteCbk != nil {
s.onMuteCbk(mute)
}
}
func (s *signal) nextOffer() uint32 {
return s.offerNumber.Add(1)
}
func (s *signal) currentOffer() uint32 {
return s.offerNumber.Load()
}
func (s *signal) GetId() uuid.UUID {
return s.id
}
func (s *signal) waitForMessengerSetupFinished() <-chan error {
err := make(chan error)
go func() {
defer close(err)
select {
case <-s.receivedMessenger:
case <-s.sessionCtx.Done():
err <- errSessionClosedByWaitingForMessenger
case <-time.After(waitingTimeOut):
err <- errTimeoutByWaitingForMessenger
}
}()
return err
}