Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/lksdk_output/lksdk_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
)

type SampleProvider interface {
QueueLength() int
Close() error
}
type KeyFrameEmitter interface {
Expand Down Expand Up @@ -343,6 +344,15 @@ func (s *LKSDKOutput) AddOutputs(o ...SampleProvider) {
s.lock.Unlock()
}

func (s *LKSDKOutput) GetOutputs() []SampleProvider {
s.lock.Lock()
ret := make([]SampleProvider, len(s.outputs))
copy(ret, s.outputs)
s.lock.Unlock()

return ret
}

func (s *LKSDKOutput) closeOutput() {
s.logger.Debugw("disconnecting from room")

Expand Down
108 changes: 78 additions & 30 deletions pkg/media/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
opusFrameSize = 20

pixelsPerEncoderThread = 640 * 480

queueCapacity = 5
)

// Output manages GStreamer elements that converts & encodes video to the specification that's
Expand All @@ -50,12 +52,15 @@ type Output struct {
enc *gst.Element
sink *app.Sink
outputSync *utils.TrackOutputSynchronizer
isPlayingTooSlow func() bool
trackStatsGatherer *stats.MediaTrackStatGatherer
queue *utils.BlockingQueue[*sample]

localTrack atomic.Pointer[lksdk_output.LocalTrack]
stopDropping func()

closed core.Fuse
closed core.Fuse
pipelineErr atomic.Pointer[error]
}

type sample struct {
Expand All @@ -76,8 +81,8 @@ type AudioOutput struct {
codec livekit.AudioCodec
}

func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer, outputSync *utils.TrackOutputSynchronizer, statsGatherer *stats.LocalMediaStatsGatherer) (*VideoOutput, error) {
e, err := newVideoOutput(codec, outputSync)
func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, statsGatherer *stats.LocalMediaStatsGatherer) (*VideoOutput, error) {
e, err := newVideoOutput(codec, outputSync, isPlayingTooSlow)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -238,8 +243,8 @@ func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer, outputS
return e, nil
}

func NewAudioOutput(options *livekit.IngressAudioEncodingOptions, outputSync *utils.TrackOutputSynchronizer, statsGatherer *stats.LocalMediaStatsGatherer) (*AudioOutput, error) {
e, err := newAudioOutput(options.AudioCodec, outputSync)
func NewAudioOutput(options *livekit.IngressAudioEncodingOptions, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, statsGatherer *stats.LocalMediaStatsGatherer) (*AudioOutput, error) {
e, err := newAudioOutput(options.AudioCodec, outputSync, isPlayingTooSlow)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,8 +347,8 @@ func NewAudioOutput(options *livekit.IngressAudioEncodingOptions, outputSync *ut
return e, nil
}

func newVideoOutput(codec livekit.VideoCodec, outputSync *utils.TrackOutputSynchronizer) (*VideoOutput, error) {
e, err := newOutput(outputSync)
func newVideoOutput(codec livekit.VideoCodec, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool) (*VideoOutput, error) {
e, err := newOutput(outputSync, isPlayingTooSlow)
if err != nil {
return nil, err
}
Expand All @@ -361,8 +366,8 @@ func newVideoOutput(codec livekit.VideoCodec, outputSync *utils.TrackOutputSynch
return o, nil
}

func newAudioOutput(codec livekit.AudioCodec, outputSync *utils.TrackOutputSynchronizer) (*AudioOutput, error) {
e, err := newOutput(outputSync)
func newAudioOutput(codec livekit.AudioCodec, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool) (*AudioOutput, error) {
e, err := newOutput(outputSync, isPlayingTooSlow)
if err != nil {
return nil, err
}
Expand All @@ -380,17 +385,21 @@ func newAudioOutput(codec livekit.AudioCodec, outputSync *utils.TrackOutputSynch
return o, nil
}

func newOutput(outputSync *utils.TrackOutputSynchronizer) (*Output, error) {
func newOutput(outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool) (*Output, error) {
sink, err := app.NewAppSink()
if err != nil {
return nil, err
}

e := &Output{
sink: sink,
outputSync: outputSync,
queue: utils.NewBlockingQueue[*sample](queueCapacity),
sink: sink,
outputSync: outputSync,
isPlayingTooSlow: isPlayingTooSlow,
}

e.start()

return e, nil
}

Expand Down Expand Up @@ -438,20 +447,19 @@ func (e *Output) handleEOS(_ *app.Sink) {
e.Close()
}

func (e *Output) writeSample(s *media.Sample, pts time.Duration) error {

func (e *Output) writeSample(s *sample) error {
if e.closed.IsBroken() {
return io.EOF
}

// Synchronize the outputs before the network jitter buffer to avoid old samples stuck
// in the channel from increasing the whole pipeline delay.
drop, err := e.outputSync.WaitForMediaTime(pts)
drop, err := e.outputSync.WaitForMediaTime(s.ts, e.isPlayingTooSlow())
if err != nil {
return err
}
if drop {
e.logger.Debugw("Dropping sample", "timestamp", pts)
e.logger.Debugw("Dropping sample", "timestamp", s.ts)
e.trackStatsGatherer.PacketLost(1)
return nil
}
Expand All @@ -465,25 +473,52 @@ func (e *Output) writeSample(s *media.Sample, pts time.Duration) error {

// WriteSample seems to return successfully even if the Peer Connection disconnected.
// We need to return success to the caller even if the PC is disconnected to allow for reconnections
err = localTrack.WriteSample(*s, nil)
err = localTrack.WriteSample(*s.s, nil)
if err != nil {
return err
}

e.trackStatsGatherer.MediaReceived(int64(len(s.Data)))
e.trackStatsGatherer.MediaReceived(int64(len(s.s.Data)))

return nil
}

func (e *Output) start() {
go func() {
for {
s, err := e.queue.PopFront()
if err != nil {
// Closing
return
}
err = e.writeSample(s)
if err != nil {
// Store the first write error
e.pipelineErr.CompareAndSwap(nil, &err)
}
}
}()
}

func (e *Output) QueueLength() int {
return e.queue.QueueLength()
}

func (e *Output) Close() error {

e.closed.Break()
e.outputSync.Close()
e.queue.Close()

return nil
}

func (e *VideoOutput) handleSample(sink *app.Sink) gst.FlowReturn {
// Return an error if the last write failed
if errPtr := e.pipelineErr.Load(); errPtr != nil {
return errors.ErrorToGstFlowReturn(*errPtr)
}

// Pull the sample that triggered this callback
s := sink.PullSample()
if s == nil {
Expand All @@ -506,15 +541,25 @@ func (e *VideoOutput) handleSample(sink *app.Sink) gst.FlowReturn {

ts := time.Duration(segment.ToRunningTime(gst.FormatTime, uint64(pts)))

err := e.writeSample(&media.Sample{
Data: buffer.Bytes(),
Duration: time.Duration(duration),
}, ts)
sample := &sample{
s: &media.Sample{
Data: buffer.Bytes(),
Duration: time.Duration(duration),
},
ts: ts,
}

return errors.ErrorToGstFlowReturn(err)
e.queue.PushBack(sample)

return gst.FlowOK
}

func (e *AudioOutput) handleSample(sink *app.Sink) gst.FlowReturn {
// Return an error if the last write failed
if errPtr := e.pipelineErr.Load(); errPtr != nil {
return errors.ErrorToGstFlowReturn(*errPtr)
}

// Pull the sample that triggered this callback
s := sink.PullSample()
if s == nil {
Expand All @@ -537,17 +582,20 @@ func (e *AudioOutput) handleSample(sink *app.Sink) gst.FlowReturn {

ts := time.Duration(segment.ToRunningTime(gst.FormatTime, uint64(pts)))

var err error

switch e.codec {
case livekit.AudioCodec_OPUS:
err = e.writeSample(&media.Sample{
Data: buffer.Bytes(),
Duration: time.Duration(duration),
}, ts)
sample := &sample{
s: &media.Sample{
Data: buffer.Bytes(),
Duration: time.Duration(duration),
},
ts: ts,
}

e.queue.PushBack(sample)
}

return errors.ErrorToGstFlowReturn(err)
return gst.FlowOK
}

func getVideoEncoderThreadCount(layer *livekit.VideoLayer) uint {
Expand Down
37 changes: 35 additions & 2 deletions pkg/media/webrtc_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package media

import (
"context"
"math"
"sync"

"github.com/frostbyte73/core"
Expand All @@ -33,6 +34,10 @@ import (
putils "github.com/livekit/protocol/utils"
)

const (
targetMinQueueLength = 2
)

type WebRTCSink struct {
params *params.Params
onFailure func()
Expand Down Expand Up @@ -92,7 +97,7 @@ func NewWebRTCSink(ctx context.Context, p *params.Params, onFailure func(), stat
}

func (s *WebRTCSink) addAudioTrack() (*Output, error) {
output, err := NewAudioOutput(s.params.AudioEncodingOptions, s.outputSync.AddTrack(), s.statsGatherer)
output, err := NewAudioOutput(s.params.AudioEncodingOptions, s.outputSync.AddTrack(), s.isPlayingTooSlow, s.statsGatherer)
if err != nil {
logger.Errorw("could not create output", err)
return nil, err
Expand Down Expand Up @@ -138,14 +143,42 @@ func (s *WebRTCSink) addAudioTrack() (*Output, error) {
return output.Output, nil
}

func (s *WebRTCSink) isPlayingTooSlow() bool {
s.lock.Lock()
sdkOut := s.sdkOut
s.lock.Unlock()

if sdkOut == nil {
return false
}

if !s.params.Live {
// output back pressure sets the play rate for VOD
return false
}

o := sdkOut.GetOutputs()
minQueueLength := math.MaxInt
for _, out := range o {
minQueueLength = min(minQueueLength, out.QueueLength())
}

if minQueueLength > targetMinQueueLength {
logger.Debugw("playing too slow", "minQueueLength", minQueueLength)
return true
}

return false
}

func (s *WebRTCSink) addVideoTrack(w, h int) ([]*Output, error) {
outputs := make([]*Output, 0)
sbArray := make([]lksdk_output.SampleProvider, 0)

sortedLayers := filterAndSortLayersByQuality(s.params.VideoEncodingOptions.Layers, w, h)

for _, layer := range sortedLayers {
output, err := NewVideoOutput(s.params.VideoEncodingOptions.VideoCodec, layer, s.outputSync.AddTrack(), s.statsGatherer)
output, err := NewVideoOutput(s.params.VideoEncodingOptions.VideoCodec, layer, s.outputSync.AddTrack(), s.isPlayingTooSlow, s.statsGatherer)
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/params/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"os"
"path"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -50,6 +51,7 @@ type Params struct {

AudioEncodingOptions *livekit.IngressAudioEncodingOptions
VideoEncodingOptions *livekit.IngressVideoEncodingOptions
Live bool

// connection info
WsUrl string
Expand Down Expand Up @@ -151,6 +153,7 @@ func GetParams(ctx context.Context, psrpcClient rpc.IOInfoClient, conf *config.C
Config: conf,
AudioEncodingOptions: audioEncodingOptions,
VideoEncodingOptions: videoEncodingOptions,
Live: getLive(info),
Token: token,
WsUrl: wsUrl,
RelayToken: relayToken,
Expand Down Expand Up @@ -180,6 +183,19 @@ func UpdateTranscodingEnabled(info *livekit.IngressInfo) {
}
}

func getLive(info *livekit.IngressInfo) bool {
switch info.InputType {
case livekit.IngressInput_URL_INPUT:
if strings.HasPrefix(info.Url, "http://") || strings.HasPrefix(info.Url, "https://") {
return false
} else {
return true
}
default:
return true
}
}

func getLoggerFields(info *livekit.IngressInfo, loggingFields map[string]string) []interface{} {
fields := []interface{}{"ingressID", info.IngressId, "resourceID", info.State.ResourceId, "roomName", info.RoomName, "participantIdentity", info.ParticipantIdentity}
for k, v := range loggingFields {
Expand Down
Loading