Skip to content

Commit

Permalink
Implement Naive PLI Generator
Browse files Browse the repository at this point in the history
The Naive PLI Generator sends a PLI packet for each new track
that supports PLI, and then keep sending packets at
a constant interval.
  • Loading branch information
Antonito authored and Sean-Der committed Apr 24, 2023
1 parent 585f01c commit 049f4cd
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 2 deletions.
3 changes: 2 additions & 1 deletion AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Adam Kiss <masterada@gmail.com>
adamroach <adam@nostrum.com>
Aditya Kumar <k.aditya00@gmail.com>
aler9 <46489434+aler9@users.noreply.github.com>
Antoine <antoine@tenten.app>
Antoine Baché <antoine@tenten.app>
Atsushi Watanabe <atsushi.w@ieee.org>
Bobby Peck <rpeck@mux.com>
Expand All @@ -18,8 +19,8 @@ Kevin Caffrey <kcaffrey@gmail.com>
Maksim Nesterov <msnesterov@avito.ru>
Mathis Engelbart <mathis.engelbart@gmail.com>
Sean DuBois <sean@siobud.com>
ziminghua <565209960@qq.com>
Steffen Vogel <post@steffenvogel.de>
ziminghua <565209960@qq.com>

# List of contributors not appearing in Git history

3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ by anyone. With the following tenets in mind.
* [Transport Wide Congestion Control Feedback](https://github.com/pion/interceptor/tree/master/pkg/twcc)
* [Packet Dump](https://github.com/pion/interceptor/tree/master/pkg/packetdump)
* [Google Congestion Control](https://github.com/pion/interceptor/tree/master/pkg/gcc)
* [Stats](https://github.com/pion/interceptor/tree/master/pkg/stats) A [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation
* [Interval PLI](https://github.com/pion/interceptor/tree/master/pkg/intervalpli) Generate PLI on a interval. Useful when no decoder is available.

### Planned Interceptors
* Bandwidth Estimation
- [NADA](https://tools.ietf.org/html/rfc8698)
* JitterBuffer, re-order packets and wait for arrival
* [FlexFec](https://tools.ietf.org/html/draft-ietf-payload-flexible-fec-scheme-20)
* [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation
* [RTCP Feedback for Congestion Control](https://datatracker.ietf.org/doc/html/rfc8888) the standardized alternative to TWCC.

### Interceptor Public API
Expand Down
175 changes: 175 additions & 0 deletions pkg/intervalpli/generator_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package intervalpli

import (
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
)

// GeneratorInterceptor interceptor sends PLI packets.
// Implements PLI in a naive way: sends a PLI for each new track that support PLI, periodically.
type GeneratorInterceptor struct {
interceptor.NoOp

interval time.Duration
streams sync.Map
immediatePLINeeded chan []uint32

log logging.LeveledLogger
m sync.Mutex
wg sync.WaitGroup

close chan struct{}
}

// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor.
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
r := &GeneratorInterceptor{
interval: 3 * time.Second,
log: logging.NewDefaultLoggerFactory().NewLogger("pli_generator"),
immediatePLINeeded: make(chan []uint32, 1),
close: make(chan struct{}),
}

for _, opt := range opts {
if err := opt(r); err != nil {
return nil, err
}
}

return r, nil
}

func (r *GeneratorInterceptor) isClosed() bool {
select {
case <-r.close:
return true
default:
return false
}
}

// Close closes the interceptor.
func (r *GeneratorInterceptor) Close() error {
defer r.wg.Wait()
r.m.Lock()
defer r.m.Unlock()

if !r.isClosed() {
close(r.close)
}

return nil
}

// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (r *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
r.m.Lock()
defer r.m.Unlock()

if r.isClosed() {
return writer
}

r.wg.Add(1)

go r.loop(writer)

return writer
}

func (r *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer r.wg.Done()

ticker, tickerChan := r.createLoopTicker()

defer func() {
if ticker != nil {
ticker.Stop()
}
}()

for {
select {
case ssrcs := <-r.immediatePLINeeded:
r.writePLIs(rtcpWriter, ssrcs)

case <-tickerChan:
ssrcs := make([]uint32, 0)

r.streams.Range(func(k, value interface{}) bool {
key, ok := k.(uint32)
if !ok {
return false
}

ssrcs = append(ssrcs, key)
return true
})

r.writePLIs(rtcpWriter, ssrcs)

case <-r.close:
return
}
}
}

func (r *GeneratorInterceptor) createLoopTicker() (*time.Ticker, <-chan time.Time) {
if r.interval > 0 {
ticker := time.NewTicker(r.interval)
return ticker, ticker.C
}

return nil, make(chan time.Time)
}

func (r *GeneratorInterceptor) writePLIs(rtcpWriter interceptor.RTCPWriter, ssrcs []uint32) {
if len(ssrcs) == 0 {
return
}

pkts := []rtcp.Packet{}

for _, ssrc := range ssrcs {
pkts = append(pkts, &rtcp.PictureLossIndication{MediaSSRC: ssrc})
}

if _, err := rtcpWriter.Write(pkts, interceptor.Attributes{}); err != nil {
r.log.Warnf("failed sending: %+v", err)
}
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (r *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
if !streamSupportPli(info) {
return reader
}

r.streams.Store(info.SSRC, nil)
// New streams need to receive a PLI as soon as possible.
r.ForcePLI(info.SSRC)

return reader
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (r *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
r.streams.Delete(info.SSRC)
}

// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
// change in the future. The returned method will be called once per packet batch.
func (r *GeneratorInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
return reader
}

// ForcePLI sends a PLI request to the tracks matching the provided SSRCs.
func (r *GeneratorInterceptor) ForcePLI(ssrc ...uint32) {
r.immediatePLINeeded <- ssrc
}
84 changes: 84 additions & 0 deletions pkg/intervalpli/generator_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package intervalpli

import (
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/stretchr/testify/assert"
)

func TestPLIGeneratorInterceptor_Unsupported(t *testing.T) {
i, err := NewGeneratorInterceptor(
GeneratorInterval(time.Millisecond*10),
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.Nil(t, err)

streamSSRC := uint32(123456)
stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: streamSSRC,
MimeType: "video/h264",
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

timeout := time.NewTimer(100 * time.Millisecond)
defer timeout.Stop()
select {
case <-timeout.C:
return
case <-stream.WrittenRTCP():
assert.FailNow(t, "should not receive any PIL")
}
}

func TestPLIGeneratorInterceptor(t *testing.T) {
i, err := NewGeneratorInterceptor(
GeneratorInterval(time.Second*1),
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.Nil(t, err)

streamSSRC := uint32(123456)
stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: streamSSRC,
ClockRate: 90000,
MimeType: "video/h264",
RTCPFeedback: []interceptor.RTCPFeedback{
{Type: "nack", Parameter: "pli"},
},
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

pkts := <-stream.WrittenRTCP()
assert.Equal(t, len(pkts), 1)
sr, ok := pkts[0].(*rtcp.PictureLossIndication)
assert.True(t, ok)
assert.Equal(t, &rtcp.PictureLossIndication{MediaSSRC: streamSSRC}, sr)

// Should not have another packet immediately...
func() {
timeout := time.NewTimer(100 * time.Millisecond)
defer timeout.Stop()
select {
case <-timeout.C:
return
case <-stream.WrittenRTCP():
assert.FailNow(t, "should not receive any PIL")
}
}()

// ... but should receive one 1sec later.
pkts = <-stream.WrittenRTCP()
assert.Equal(t, len(pkts), 1)
sr, ok = pkts[0].(*rtcp.PictureLossIndication)
assert.True(t, ok)
assert.Equal(t, &rtcp.PictureLossIndication{MediaSSRC: streamSSRC}, sr)
}
26 changes: 26 additions & 0 deletions pkg/intervalpli/generator_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package intervalpli

import (
"time"

"github.com/pion/logging"
)

// GeneratorOption can be used to configure GeneratorInterceptor.
type GeneratorOption func(r *GeneratorInterceptor) error

// GeneratorLog sets a logger for the interceptor.
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.log = log
return nil
}
}

// GeneratorInterval sets send interval for the interceptor.
func GeneratorInterval(interval time.Duration) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.interval = interval
return nil
}
}
14 changes: 14 additions & 0 deletions pkg/intervalpli/pli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Package intervalpli is an interceptor that requests PLI on a static interval. Useful when bridging protocols that don't have receiver feedback
package intervalpli

import "github.com/pion/interceptor"

func streamSupportPli(info *interceptor.StreamInfo) bool {
for _, fb := range info.RTCPFeedback {
if fb.Type == "nack" && fb.Parameter == "pli" {
return true
}
}

return false
}

0 comments on commit 049f4cd

Please sign in to comment.