-
Notifications
You must be signed in to change notification settings - Fork 1
/
static_sender.go
95 lines (80 loc) · 2.37 KB
/
static_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package runner
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/pion/webrtc/v3"
"github.com/shigde/sfu/internal/rtp"
"github.com/shigde/sfu/internal/sample"
"github.com/shigde/sfu/pkg/client"
)
type StaticSender struct {
conf *rtp.RtpConfig
audioFile, videoFile string
spaceId string
streamId string
session *client.Session
isMainStream bool
}
func NewStaticSender(conf *rtp.RtpConfig, audioFile string, videoFile string, spaceId string, streamId string, session *client.Session, isMainStream bool) *StaticSender {
return &StaticSender{
conf,
audioFile,
videoFile,
spaceId,
streamId,
session,
isMainStream,
}
}
func (mr *StaticSender) Run(ctx context.Context, onEstablished chan<- struct{}) error {
localTracks := make([]webrtc.TrackLocal, 0, 2)
streamID := uuid.NewString()
videoTrack, err := sample.NewLocalFileLooperTrack(mr.videoFile, sample.WithStreamID(streamID))
if err != nil {
return fmt.Errorf("creating video track: %w", err)
}
localTracks = append(localTracks, videoTrack)
audioTrack, err := sample.NewLocalFileLooperTrack(mr.audioFile, sample.WithStreamID(streamID))
if err != nil {
return fmt.Errorf("creating audio track: %w", err)
}
localTracks = append(localTracks, audioTrack)
engine, err := rtp.NewEngine(mr.conf)
if err != nil {
return fmt.Errorf("setup webrtc engine: %w", err)
}
withOnEstablished := rtp.EndpointWithOnEstablishedListener(func() {
select {
case <-ctx.Done():
default:
onEstablished <- struct{}{}
}
})
endpoint, err := rtp.EstablishStaticIngressEndpoint(ctx, engine, localTracks, withOnEstablished)
if err != nil {
return fmt.Errorf("building new webrtc endpoint: %w", err)
}
offer, err := endpoint.GetLocalDescription(ctx)
if err != nil {
return fmt.Errorf("creating local offer: %w", err)
}
if mr.isMainStream {
if offer, err = rtp.MarkStreamAsMain(offer, streamID); err != nil {
return fmt.Errorf("marking straem as main stream: %w", err)
}
}
whipClient := client.NewWhip(client.WithSession(mr.session))
answer, err := whipClient.GetAnswer(mr.spaceId, mr.streamId, offer)
if err != nil {
return fmt.Errorf("getting answer from whip endpoint: %w", err)
}
err = endpoint.SetAnswer(answer)
if err != nil {
return fmt.Errorf("creating setting answer: %w", err)
}
select {
case <-ctx.Done():
return nil
}
}