Skip to content

Commit

Permalink
Merge cec9b4e into bb83935
Browse files Browse the repository at this point in the history
  • Loading branch information
maxhawkins committed Apr 4, 2019
2 parents bb83935 + cec9b4e commit 3bed23a
Show file tree
Hide file tree
Showing 6 changed files with 828 additions and 0 deletions.
103 changes: 103 additions & 0 deletions pkg/media/rtpdump/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package rtpdump

import (
"bufio"
"io"
"regexp"
"sync"
)

// The file starts with #!rtpplay1.0 address/port\n
var preambleRegexp = regexp.MustCompile(`#\!rtpplay1\.0 \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d{1,5}\n`)

// Reader reads the RTPDump file format
type Reader struct {
readerMu sync.Mutex
reader io.Reader
}

// NewReader opens a new Reader and immediately reads the Header from the start
// of the input stream.
func NewReader(r io.Reader) (*Reader, Header, error) {
var hdr Header

bio := bufio.NewReader(r)

// Look ahead to see if there's a valid preamble
peek, err := bio.Peek(preambleLen)
if err == io.EOF {
return nil, hdr, errMalformed
}
if err != nil {
return nil, hdr, err
}
if !preambleRegexp.Match(peek) {
return nil, hdr, errMalformed
}

// consume the preamble
_, _, err = bio.ReadLine()
if err == io.EOF {
return nil, hdr, errMalformed
}
if err != nil {
return nil, hdr, err
}

hBuf := make([]byte, headerLen)
_, err = io.ReadFull(bio, hBuf)
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, hdr, errMalformed
}
if err != nil {
return nil, hdr, err
}

if err := hdr.Unmarshal(hBuf); err != nil {
return nil, hdr, err
}

return &Reader{
reader: bio,
}, hdr, nil
}

// Next returns the next Packet in the Reader input stream
func (r *Reader) Next() (Packet, error) {
r.readerMu.Lock()
defer r.readerMu.Unlock()

hBuf := make([]byte, pktHeaderLen)

_, err := io.ReadFull(r.reader, hBuf)
if err == io.ErrUnexpectedEOF {
return Packet{}, errMalformed
}
if err != nil {
return Packet{}, err
}

var h packetHeader
if err = h.Unmarshal(hBuf); err != nil {
return Packet{}, err
}

if h.Length == 0 {
return Packet{}, errMalformed
}

payload := make([]byte, h.Length-pktHeaderLen)
_, err = io.ReadFull(r.reader, payload)
if err == io.ErrUnexpectedEOF {
return Packet{}, errMalformed
}
if err != nil {
return Packet{}, err
}

return Packet{
Offset: h.offset(),
IsRTCP: h.PacketLength == 0,
Payload: payload,
}, nil
}
282 changes: 282 additions & 0 deletions pkg/media/rtpdump/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
package rtpdump

import (
"bytes"
"io"
"net"
"reflect"
"testing"
"time"
)

var validPreamble = []byte("#!rtpplay1.0 224.2.0.1/3456\n")

func TestReader(t *testing.T) {
for _, test := range []struct {
Name string
Data []byte
WantHeader Header
WantPackets []Packet
WantErr error
}{
{
Name: "empty",
Data: nil,
WantErr: errMalformed,
},
{
Name: "hashbang missing ip/port",
Data: append(
[]byte("#!rtpplay1.0 \n"),
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
),
WantErr: errMalformed,
},
{
Name: "hashbang missing port",
Data: append(
[]byte("#!rtpplay1.0 0.0.0.0\n"),
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
),
WantErr: errMalformed,
},
{
Name: "valid empty file",
Data: append(
validPreamble,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
0x01, 0x01, 0x01, 0x01,
0x22, 0xB8, 0x00, 0x00,
),
WantHeader: Header{
Start: time.Unix(1, 0).UTC(),
Source: net.IPv4(1, 1, 1, 1),
Port: 8888,
},
},
{
Name: "malformed packet header",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header
0x00,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantErr: errMalformed,
},
{
Name: "short packet payload",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=1048575
0xFF, 0xFF, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet paylaod
0x00,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantErr: errMalformed,
},
{
Name: "empty packet payload",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=0
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantErr: errMalformed,
},
{
Name: "valid rtcp packet",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=20, pLen=0, off=1
0x00, 0x14, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// packet payload (BYE)
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantPackets: []Packet{
{
Offset: time.Millisecond,
IsRTCP: true,
Payload: []byte{
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
},
},
},
WantErr: nil,
},
{
Name: "truncated rtcp packet",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=9, pLen=0, off=1
0x00, 0x09, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// invalid payload
0x81,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantPackets: []Packet{
{
Offset: time.Millisecond,
IsRTCP: true,
Payload: []byte{0x81},
},
},
},
{
Name: "two valid packets",
Data: append(
validPreamble,
// header
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// packet header len=20, pLen=0, off=1
0x00, 0x14, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// packet payload (BYE)
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
// packet header len=33, pLen=25, off=2
0x00, 0x21, 0x19, 0x00,
0x00, 0x00, 0x00, 0x02,
// packet payload (RTP)
0x90, 0x60, 0x69, 0x8f,
0xd9, 0xc2, 0x93, 0xda,
0x1c, 0x64, 0x27, 0x82,
0x00, 0x01, 0x00, 0x01,
0xFF, 0xFF, 0xFF, 0xFF,
0x98, 0x36, 0xbe, 0x88,
0x9e,
),
WantHeader: Header{
Start: time.Unix(0, 0).UTC(),
Source: net.IPv4(0, 0, 0, 0),
Port: 0,
},
WantPackets: []Packet{
{
Offset: time.Millisecond,
IsRTCP: true,
Payload: []byte{
0x81, 0xcb, 0x00, 0x0c,
0x90, 0x2f, 0x9e, 0x2e,
0x03, 0x46, 0x4f, 0x4f,
},
},
{
Offset: 2 * time.Millisecond,
IsRTCP: false,
Payload: []byte{
0x90, 0x60, 0x69, 0x8f,
0xd9, 0xc2, 0x93, 0xda,
0x1c, 0x64, 0x27, 0x82,
0x00, 0x01, 0x00, 0x01,
0xFF, 0xFF, 0xFF, 0xFF,
0x98, 0x36, 0xbe, 0x88,
0x9e,
},
},
},
WantErr: nil,
},
} {
r, hdr, err := NewReader(bytes.NewReader(test.Data))
if err != nil {
if got, want := err, test.WantErr; got != want {
t.Fatalf("NewReader(%s) err=%v want %v", test.Name, got, want)
}
continue
}

if got, want := hdr, test.WantHeader; !reflect.DeepEqual(got, want) {
t.Fatalf("%q Header = %#v, want %#v", test.Name, got, want)
}

var nextErr error
var packets []Packet
for {
pkt, err := r.Next()
if err == io.EOF {
break
}
if err != nil {
nextErr = err
break
}

packets = append(packets, pkt)
}

if got, want := nextErr, test.WantErr; got != want {
t.Fatalf("%s err=%v want %v", test.Name, got, want)
}
if got, want := packets, test.WantPackets; !reflect.DeepEqual(got, want) {
t.Fatalf("%q packets=%#v, want %#v", test.Name, got, want)
}
}
}

0 comments on commit 3bed23a

Please sign in to comment.