Skip to content

Commit

Permalink
Move to new Track API
Browse files Browse the repository at this point in the history
See v2.0.0 Release Notes[0] for all changes

Resolves #405

[0] https://github.com/pions/webrtc/wiki/v2.0.0-Release-Notes#media-api
  • Loading branch information
Sean-Der committed Feb 26, 2019
1 parent f5d11df commit 6aeb342
Show file tree
Hide file tree
Showing 18 changed files with 711 additions and 533 deletions.
4 changes: 3 additions & 1 deletion datachannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ func TestDataChannel_MessagesAreOrdered(t *testing.T) {
out := make(chan int)
inner := func(msg DataChannelMessage) {
// randomly sleep
// NB: The big.Int/crypto.Rand is overkill but makes the linter happy
// math/rand a weak RNG, but this does not need to be secure. Ignore with #nosec
/* #nosec */
randInt, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
/* #nosec */
if err != nil {
t.Fatalf("Failed to get random sleep duration: %s", err)
}
Expand Down
17 changes: 11 additions & 6 deletions examples/gstreamer-receive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,31 @@ func gstreamerReceiveMain() {

// Set a handler for when a new remote track starts, this handler creates a gstreamer pipeline
// for the given codec
peerConnection.OnTrack(func(track *webrtc.Track) {
peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
go func() {
ticker := time.NewTicker(time.Second * 3)
for range ticker.C {
err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC})
err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()})
if err != nil {
fmt.Println(err)
}
}
}()

codec := track.Codec
fmt.Printf("Track has started, of type %d: %s \n", track.PayloadType, codec.Name)
codec := track.Codec()
fmt.Printf("Track has started, of type %d: %s \n", track.PayloadType(), codec.Name)
pipeline := gst.CreatePipeline(codec.Name)
pipeline.Start()
buf := make([]byte, 1400)
for {
p := <-track.Packets
pipeline.Push(p.Raw)
i, err := track.Read(buf)
if err != nil {
panic(err)
}

pipeline.Push(buf[:i])
}
})

Expand Down
9 changes: 5 additions & 4 deletions examples/gstreamer-send-offer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"math/rand"

"github.com/pions/webrtc"

Expand Down Expand Up @@ -34,7 +35,7 @@ func main() {
})

// Create a audio track
opusTrack, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeOpus, "audio", "pion1")
opusTrack, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil {
panic(err)
}
Expand All @@ -44,7 +45,7 @@ func main() {
}

// Create a video track
vp8Track, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeVP8, "video", "pion2")
vp8Track, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -79,8 +80,8 @@ func main() {
}

// Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusTrack.Samples, "audiotestsrc").Start()
gst.CreatePipeline(webrtc.VP8, vp8Track.Samples, "videotestsrc").Start()
gst.CreatePipeline(webrtc.Opus, opusTrack, "audiotestsrc").Start()
gst.CreatePipeline(webrtc.VP8, vp8Track, "videotestsrc").Start()

// Block forever
select {}
Expand Down
9 changes: 5 additions & 4 deletions examples/gstreamer-send/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"math/rand"

"github.com/pions/webrtc"

Expand Down Expand Up @@ -39,7 +40,7 @@ func main() {
})

// Create a audio track
opusTrack, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeOpus, "audio", "pion1")
opusTrack, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil {
panic(err)
}
Expand All @@ -49,7 +50,7 @@ func main() {
}

// Create a video track
vp8Track, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeVP8, "video", "pion2")
vp8Track, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -84,8 +85,8 @@ func main() {
fmt.Println(signal.Encode(answer))

// Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusTrack.Samples, *audioSrc).Start()
gst.CreatePipeline(webrtc.VP8, vp8Track.Samples, *videoSrc).Start()
gst.CreatePipeline(webrtc.Opus, opusTrack, *audioSrc).Start()
gst.CreatePipeline(webrtc.VP8, vp8Track, *videoSrc).Start()

// Block forever
select {}
Expand Down
11 changes: 5 additions & 6 deletions examples/internal/gstreamer-src/gst.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {
// Pipeline is a wrapper for a GStreamer Pipeline
type Pipeline struct {
Pipeline *C.GstElement
in chan<- media.Sample
track *webrtc.Track
// stop acts as a signal that this pipeline is stopped
// any pending sends to Pipeline.in should be cancelled
stop chan interface{}
Expand All @@ -35,7 +35,7 @@ var pipelines = make(map[int]*Pipeline)
var pipelinesLock sync.Mutex

// CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(codecName string, in chan<- media.Sample, pipelineSrc string) *Pipeline {
func CreatePipeline(codecName string, track *webrtc.Track, pipelineSrc string) *Pipeline {
pipelineStr := "appsink name=appsink"
switch codecName {
case webrtc.VP8:
Expand All @@ -60,7 +60,7 @@ func CreatePipeline(codecName string, in chan<- media.Sample, pipelineSrc string

pipeline := &Pipeline{
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
in: in,
track: track,
id: len(pipelines),
codecName: codecName,
}
Expand Down Expand Up @@ -105,9 +105,8 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i
}
// We need to be able to cancel this function even f pipeline.in isn't being serviced
// When pipeline.stop is closed the sending of data will be cancelled.
select {
case pipeline.in <- media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}:
case <-pipeline.stop:
if err := pipeline.track.WriteSample(media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}); err != nil {
panic(err)
}
} else {
fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineID))
Expand Down
12 changes: 9 additions & 3 deletions examples/janus-gateway/streaming/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func main() {
fmt.Printf("Connection State has changed %s \n", connectionState.String())
})

peerConnection.OnTrack(func(track *webrtc.Track) {
if track.Codec.Name == webrtc.Opus {
peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
if track.Codec().Name == webrtc.Opus {
return
}

Expand All @@ -62,8 +62,14 @@ func main() {
if err != nil {
panic(err)
}

for {
err = i.AddPacket(<-track.Packets)
packet, err := track.ReadRTP()
if err != nil {
panic(err)
}

err = i.AddPacket(packet)
if err != nil {
panic(err)
}
Expand Down
9 changes: 5 additions & 4 deletions examples/janus-gateway/video-room/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"log"
"math/rand"

janus "github.com/notedit/janus-go"
"github.com/pions/webrtc"
Expand Down Expand Up @@ -54,7 +55,7 @@ func main() {
})

// Create a audio track
opusTrack, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeOpus, "audio", "pion1")
opusTrack, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil {
panic(err)
}
Expand All @@ -64,7 +65,7 @@ func main() {
}

// Create a video track
vp8Track, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeVP8, "video", "pion2")
vp8Track, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -134,8 +135,8 @@ func main() {
}

// Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusTrack.Samples, "audiotestsrc").Start()
gst.CreatePipeline(webrtc.VP8, vp8Track.Samples, "videotestsrc").Start()
gst.CreatePipeline(webrtc.Opus, opusTrack, "audiotestsrc").Start()
gst.CreatePipeline(webrtc.VP8, vp8Track, "videotestsrc").Start()
}

select {}
Expand Down
15 changes: 10 additions & 5 deletions examples/save-to-disk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,32 @@ func main() {
// Set a handler for when a new remote track starts, this handler saves buffers to disk as
// an ivf file, since we could have multiple video tracks we provide a counter.
// In your application this is where you would handle/process video
peerConnection.OnTrack(func(track *webrtc.Track) {
peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
go func() {
ticker := time.NewTicker(time.Second * 3)
for range ticker.C {
err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC})
err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()})
if err != nil {
fmt.Println(err)
}
}
}()

if track.Codec.Name == webrtc.VP8 {
if track.Codec().Name == webrtc.VP8 {
fmt.Println("Got VP8 track, saving to disk as output.ivf")
i, err := ivfwriter.New("output.ivf")
if err != nil {
panic(err)
}

for {
err = i.AddPacket(<-track.Packets)
packet, err := track.ReadRTP()
if err != nil {
panic(err)
}

err = i.AddPacket(packet)
if err != nil {
panic(err)
}
Expand Down
54 changes: 19 additions & 35 deletions examples/sfu/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"

"github.com/pions/rtcp"
"github.com/pions/rtp"
"github.com/pions/webrtc"

"github.com/pions/webrtc/examples/internal/signal"
Expand Down Expand Up @@ -84,41 +82,38 @@ func main() {
panic(err)
}

inboundSSRC := make(chan uint32)
inboundPayloadType := make(chan uint8)

outboundRTP := []chan<- *rtp.Packet{}
var outboundRTPLock sync.RWMutex
localTrackChan := make(chan *webrtc.Track)
// Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers
peerConnection.OnTrack(func(track *webrtc.Track) {
peerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it
go func() {
ticker := time.NewTicker(rtcpPLIInterval)
for range ticker.C {
if err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC}); err != nil {
if err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}); err != nil {
fmt.Println(err)
}
}
}()

inboundSSRC <- track.SSRC
inboundPayloadType <- track.PayloadType
// Create a local track, all our SFU clients will be fed via this track
localTrack, err := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), "video", "pion")
if err != nil {
panic(err)
}
localTrackChan <- localTrack

rtpBuf := make([]byte, 1400)
for {
rtpPacket := <-track.Packets

outboundRTPLock.RLock()
for _, outChan := range outboundRTP {
outPacket := rtpPacket
outPacket.Payload = append([]byte{}, outPacket.Payload...)
select {
case outChan <- outPacket:
default:
}
i, err := remoteTrack.Read(rtpBuf)
if err != nil {
panic(err)
}

if _, err = localTrack.Write(rtpBuf[:i]); err != nil {
panic(err)
}
outboundRTPLock.RUnlock()
}
})

Expand All @@ -143,8 +138,7 @@ func main() {
// Get the LocalDescription and take it to base64 so we can paste in browser
fmt.Println(signal.Encode(answer))

outboundSSRC := <-inboundSSRC
outboundPayloadType := <-inboundPayloadType
localTrack := <-localTrackChan
for {
fmt.Println("")
fmt.Println("Curl an base64 SDP to start sendonly peer connection")
Expand All @@ -158,21 +152,11 @@ func main() {
panic(err)
}

// Create a single VP8 Track to send videa
vp8Track, err := peerConnection.NewRawRTPTrack(outboundPayloadType, outboundSSRC, "video", "pion")
if err != nil {
panic(err)
}

_, err = peerConnection.AddTrack(vp8Track)
_, err = peerConnection.AddTrack(localTrack)
if err != nil {
panic(err)
}

outboundRTPLock.Lock()
outboundRTP = append(outboundRTP, vp8Track.RawRTP)
outboundRTPLock.Unlock()

// Set the remote SessionDescription
err = peerConnection.SetRemoteDescription(recvOnlyOffer)
if err != nil {
Expand Down
Loading

0 comments on commit 6aeb342

Please sign in to comment.