Skip to content

Commit

Permalink
Implement RTPDump Reader/Writer/Player
Browse files Browse the repository at this point in the history
RTPDump is a widely-implemented file format for saving RTP
packet dumps without the overhead of UDP and IP headers found
in pcap dumps. libWebRTC, Wireshark, and RTPTools all have
an implementation.

For more information see:
https://www.cs.columbia.edu/irt/software/rtptools

Relates to #549
  • Loading branch information
maxhawkins committed Mar 25, 2019
1 parent 5ee8b1a commit 7c61ba2
Show file tree
Hide file tree
Showing 6 changed files with 818 additions and 0 deletions.
103 changes: 103 additions & 0 deletions pkg/media/rtpdump/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package rtpdump

import (
"bufio"
"io"
"regexp"
"sync"
)

// The file starts with #!rtpplay1.0 address/port\n
var preambleRegexp = regexp.MustCompile(`#\!rtpplay1\.0 \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d{1,5}\n`)

// Reader reads the RTPDump file format
type Reader struct {
readerMu sync.Mutex
reader io.Reader
}

// NewReader opens a new Reader and immediately reads the Header from the start
// of the input stream.
func NewReader(r io.Reader) (*Reader, Header, error) {
var hdr Header

bio := bufio.NewReader(r)

// Look ahead to see if there's a valid preamble
peek, err := bio.Peek(preambleLen)
if err == io.EOF {
return nil, hdr, errMalformed
}
if err != nil {
return nil, hdr, err
}
if !preambleRegexp.Match(peek) {
return nil, hdr, errMalformed
}

// consume the preamble
_, _, err = bio.ReadLine()
if err == io.EOF {
return nil, hdr, errMalformed
}
if err != nil {
return nil, hdr, err
}

hBuf := make([]byte, headerLen)
_, err = io.ReadFull(bio, hBuf)
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, hdr, errMalformed
}
if err != nil {
return nil, hdr, err
}

if err := hdr.Unmarshal(hBuf); err != nil {
return nil, hdr, err
}

return &Reader{
reader: bio,
}, hdr, nil
}

// Next returns the next Packet in the Reader input stream
func (r *Reader) Next() (Packet, error) {
r.readerMu.Lock()
defer r.readerMu.Unlock()

hBuf := make([]byte, pktHeaderLen)

_, err := io.ReadFull(r.reader, hBuf)
if err == io.ErrUnexpectedEOF {
return Packet{}, errMalformed
}
if err != nil {
return Packet{}, err
}

var h packetHeader
if err := h.Unmarshal(hBuf); err != nil {
return Packet{}, err
}

if h.Length == 0 {
return Packet{}, errMalformed
}

payload := make([]byte, h.Length-pktHeaderLen)
_, err = io.ReadFull(r.reader, payload)
if err == io.ErrUnexpectedEOF {
return Packet{}, errMalformed
}
if err != nil {
return Packet{}, err
}

return Packet{
Offset: h.Offset,
IsRTCP: h.PacketLength == 0,
Payload: payload,
}, nil
}
282 changes: 282 additions & 0 deletions pkg/media/rtpdump/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
package rtpdump

import (
"bytes"
"io"
"net"
"reflect"
"testing"
"time"
)

var validPreamble = []byte("#!rtpplay1.0 224.2.0.1/3456\n")

func TestReader(t *testing.T) {
for _, test := range []struct {
Name string
Data []byte
WantHeader Header
WantPackets []Packet
WantErr error
}{
{
Name: "empty",
Data: nil,
WantErr: errMalformed,
},
{
Name: "hashbang missing ip/port",
Data: append(
[]byte("#!rtpplay1.0 \n"),
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
),
WantErr: errMalformed,
},
{
Name: "hashbang missing port",
Data: append(
[]byte("#!rtpplay1.0 0.0.0.0\n"),
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
),
WantErr: errMalformed,
},
{
Name: "valid empty file",
Data: append(
validPreamble,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
0x01, 0x01, 0x01, 0x01,
0x22, 0xB8, 0x00, 0x00,
),
WantHeader: Header{
Start: time.Unix(1, 0).UTC(),
Source: net.IPv4(1, 1, 1, 1),
Port: 8888,
},
},
{
Name: "malformed packet header",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header
0x00,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantErr: errMalformed,
},
{
Name: "short packet payload",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=1048575
0xFF, 0xFF, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet paylaod
0x00,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantErr: errMalformed,
},
{
Name: "empty packet payload",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=0
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantErr: errMalformed,
},
{
Name: "valid rtcp packet",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=20, pLen=0, off=1
0x00, 0x14, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// packet payload (BYE)
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantPackets: []Packet{
{
Offset: 1,
IsRTCP: true,
Payload: []byte{
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
},
},
},
WantErr: nil,
},
{
Name: "truncated rtcp packet",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=9, pLen=0, off=1
0x00, 0x09, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// invalid payload
0x81,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantPackets: []Packet{
{
Offset: 1,
IsRTCP: true,
Payload: []byte{0x81},
},
},
},
{
Name: "two valid packets",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=20, pLen=0, off=1
0x00, 0x14, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// packet payload (BYE)
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
// packet header len=33, pLen=25, off=2
0x00, 0x21, 0x19, 0x00,
0x00, 0x00, 0x00, 0x02,
// packet payload (RTP)
0x90, 0x60, 0x69, 0x8f,
0xd9, 0xc2, 0x93, 0xda,
0x1c, 0x64, 0x27, 0x82,
0x00, 0x01, 0x00, 0x01,
0xFF, 0xFF, 0xFF, 0xFF,
0x98, 0x36, 0xbe, 0x88,
0x9e,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantPackets: []Packet{
{
Offset: 1,
IsRTCP: true,
Payload: []byte{
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
},
},
{
Offset: 2,
IsRTCP: false,
Payload: []byte{
0x90, 0x60, 0x69, 0x8f,
0xd9, 0xc2, 0x93, 0xda,
0x1c, 0x64, 0x27, 0x82,
0x00, 0x01, 0x00, 0x01,
0xFF, 0xFF, 0xFF, 0xFF,
0x98, 0x36, 0xbe, 0x88,
0x9e,
},
},
},
WantErr: nil,
},
} {
r, hdr, err := NewReader(bytes.NewReader(test.Data))
if err != nil {
if got, want := err, test.WantErr; got != want {
t.Fatalf("NewReader(%s) err=%v want %v", test.Name, got, want)
}
continue
}

if got, want := hdr, test.WantHeader; !reflect.DeepEqual(got, want) {
t.Fatalf("%q Header = %#v, want %#v", test.Name, got, want)
}

var nextErr error
var packets []Packet
for {
pkt, err := r.Next()
if err == io.EOF {
break
}
if err != nil {
nextErr = err
break
}

packets = append(packets, pkt)
}

if got, want := nextErr, test.WantErr; got != want {
t.Fatalf("%s err=%v want %v", test.Name, got, want)
}
if got, want := packets, test.WantPackets; !reflect.DeepEqual(got, want) {
t.Fatalf("%q packets=%#v, want %#v", test.Name, got, want)
}
}
}

0 comments on commit 7c61ba2

Please sign in to comment.