Video segment #13

Merged
merged 8 commits into from Apr 15, 2017
@@ -3,33 +3,22 @@ package segmenter
import (
"bytes"
"context"
+ "errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"strconv"
"time"
- "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/golang/glog"
"github.com/kz26/m3u8"
+ "github.com/livepeer/lpms/stream"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/format/rtmp"
)
-type VideoFormat uint32
-
-var (
- HLS = MakeVideoFormatType(avFormatTypeMagic + 1)
- RTMP = MakeVideoFormatType(avFormatTypeMagic + 1)
-)
-
-func MakeVideoFormatType(base uint32) (c VideoFormat) {
- c = VideoFormat(base) << videoFormatOtherBits
- return
-}
-
-const avFormatTypeMagic = 577777
-const videoFormatOtherBits = 1
+var ErrSegmenterTimeout = errors.New("SegmenterTimeout")
type SegmenterOptions struct {
EnforceKeyframe bool //Enforce each segment starts with a keyframe
@@ -38,14 +27,14 @@ type SegmenterOptions struct {
type VideoSegment struct {
Codec av.CodecType
- Format VideoFormat
+ Format stream.VideoFormat
Length time.Duration
Data []byte
Name string
}
type VideoPlaylist struct {
- Format VideoFormat
+ Format stream.VideoFormat
// Data []byte
Data *m3u8.MediaPlaylist
}
@@ -54,13 +43,14 @@ type VideoSegmenter interface{}
//FFMpegVideoSegmenter segments a RTMP stream by invoking FFMpeg and monitoring the file system.
type FFMpegVideoSegmenter struct {
- WorkDir string
- LocalRtmpUrl string
- StrmID string
- curSegment int
- curPlaylist *m3u8.MediaPlaylist
- curWaitTime time.Duration
- SegLen time.Duration
+ WorkDir string
+ LocalRtmpUrl string
+ StrmID string
+ curSegment int
+ curPlaylist *m3u8.MediaPlaylist
+ curPlWaitTime time.Duration
+ curSegWaitTime time.Duration
+ SegLen time.Duration
}
func NewFFMpegVideoSegmenter(workDir string, strmID string, localRtmpUrl string, segLen time.Duration) *FFMpegVideoSegmenter {
@@ -117,8 +107,9 @@ func (s *FFMpegVideoSegmenter) RTMPToHLS(ctx context.Context, opt SegmenterOptio
//PollSegment monitors the filesystem and returns a new segment as it becomes available
func (s *FFMpegVideoSegmenter) PollSegment(ctx context.Context) (*VideoSegment, error) {
var length time.Duration
- tsfn := s.WorkDir + "/" + s.StrmID + "_" + strconv.Itoa(s.curSegment) + ".ts"
- seg, err := pollSegment(ctx, tsfn, time.Millisecond*100, s.SegLen)
+ curTsfn := s.WorkDir + "/" + s.StrmID + "_" + strconv.Itoa(s.curSegment) + ".ts"
+ nextTsfn := s.WorkDir + "/" + s.StrmID + "_" + strconv.Itoa(s.curSegment+1) + ".ts"
+ seg, err := s.pollSegment(ctx, curTsfn, nextTsfn, time.Millisecond*100)
if err != nil {
return nil, err
}
@@ -132,7 +123,7 @@ func (s *FFMpegVideoSegmenter) PollSegment(ctx context.Context) (*VideoSegment,
s.curSegment = s.curSegment + 1
glog.Infof("Segment: %v, len:%v", name, len(seg))
- return &VideoSegment{Codec: av.H264, Format: HLS, Length: length, Data: seg, Name: name}, err
+ return &VideoSegment{Codec: av.H264, Format: stream.HLS, Length: length, Data: seg, Name: name}, err
}
//PollPlaylist monitors the filesystem and returns a new playlist as it becomes available
@@ -145,7 +136,7 @@ func (s *FFMpegVideoSegmenter) PollPlaylist(ctx context.Context) (*VideoPlaylist
lastPl = s.curPlaylist.Encode().Bytes()
}
- pl, err := pollPlaylist(ctx, plfn, time.Millisecond*100, lastPl)
+ pl, err := s.pollPlaylist(ctx, plfn, time.Millisecond*100, lastPl)
if err != nil {
return nil, err
}
@@ -157,10 +148,10 @@ func (s *FFMpegVideoSegmenter) PollPlaylist(ctx context.Context) (*VideoPlaylist
}
s.curPlaylist = p
- return &VideoPlaylist{Format: HLS, Data: p}, err
+ return &VideoPlaylist{Format: stream.HLS, Data: p}, err
}
-func pollPlaylist(ctx context.Context, fn string, sleepTime time.Duration, lastFile []byte) (f []byte, err error) {
+func (s *FFMpegVideoSegmenter) pollPlaylist(ctx context.Context, fn string, sleepTime time.Duration, lastFile []byte) (f []byte, err error) {
for {
if _, err := os.Stat(fn); err == nil {
if err != nil {
@@ -183,6 +174,7 @@ func pollPlaylist(ctx context.Context, fn string, sleepTime time.Duration, lastF
// fmt.Printf("p.Segments: %v\n", p.Segments[0])
// fmt.Printf("lf: %s \ncf: %s \ncomp:%v\n\n", lastFile, curFile, bytes.Compare(lastFile, curFile))
if lastFile == nil || bytes.Compare(lastFile, curFile) != 0 {
+ s.curPlWaitTime = 0
return content, nil
}
}
@@ -194,21 +186,25 @@ func pollPlaylist(ctx context.Context, fn string, sleepTime time.Duration, lastF
default:
}
+ if s.curPlWaitTime >= 10*s.SegLen {
+ return nil, ErrSegmenterTimeout
+ }
time.Sleep(sleepTime)
+ s.curPlWaitTime = s.curPlWaitTime + sleepTime
}
}
-func pollSegment(ctx context.Context, fn string, sleepTime time.Duration, segLen time.Duration) (f []byte, err error) {
+func (s *FFMpegVideoSegmenter) pollSegment(ctx context.Context, curFn string, nextFn string, sleepTime time.Duration) (f []byte, err error) {
for {
- if _, err := os.Stat(fn); err == nil {
- // fmt.Printf("FileName: %v, FileSize: %v \n\n", fn, info.Size())
- time.Sleep(segLen)
- // fmt.Printf("FileName: %v, FileSize: %v \n\n", fn, info.Size())
- content, err := ioutil.ReadFile(fn)
+ //Because FFMpeg keeps appending to the current segment until it's full before moving onto the next segment, we monitor the existance of
+ //the next file as a signal for the completion of the current segment.
+ if _, err := os.Stat(nextFn); err == nil {
+ content, err := ioutil.ReadFile(curFn)
if err != nil {
return nil, err
}
+ s.curSegWaitTime = 0
return content, err
}
@@ -217,7 +213,11 @@ func pollSegment(ctx context.Context, fn string, sleepTime time.Duration, segLen
return nil, ctx.Err()
default:
}
+ if s.curSegWaitTime > 10*s.SegLen {
+ return nil, ErrSegmenterTimeout
+ }
time.Sleep(sleepTime)
+ s.curSegWaitTime = s.curSegWaitTime + sleepTime
}
}
@@ -86,7 +86,7 @@ func TestSegmenter(t *testing.T) {
t.Errorf("Got error: %v", err)
}
- if pl.Format != HLS {
+ if pl.Format != stream.HLS {
t.Errorf("Expecting HLS Playlist, got %v", pl.Format)
}
@@ -115,7 +115,7 @@ func TestSegmenter(t *testing.T) {
t.Errorf("Expecting H264 segment, got: %v", seg.Codec)
}
- if seg.Format != HLS {
+ if seg.Format != stream.HLS {
t.Errorf("Expecting HLS segment, got %v", seg.Format)
}
@@ -201,3 +201,62 @@ func TestPollSegmentError(t *testing.T) {
t.Errorf("Expect to exceed deadline, but got: %v", err)
}
}
+
+func TestPollPlaylistTimeout(t *testing.T) {
+ wd, _ := os.Getwd()
+ workDir := wd + "/tmp"
+ os.RemoveAll(workDir)
+ os.Mkdir(workDir, 0700)
+
+ newPl := `#EXTM3U
+#EXT-X-VERSION:3
+#EXT-X-MEDIA-SEQUENCE:0
+#EXT-X-ALLOW-CACHE:YES
+#EXT-X-TARGETDURATION:7
+#EXTINF:2.066000,
+test_0.ts
+`
+ err := ioutil.WriteFile(workDir+"/test.m3u8", []byte(newPl), 0755)
+ if err != nil {
+ t.Errorf("Error writing playlist: %v", err)
+ }
+
+ vs := NewFFMpegVideoSegmenter(workDir, "test", "", time.Millisecond*100)
+ ctx := context.Background()
+ pl, err := vs.PollPlaylist(ctx)
+ if pl == nil {
+ t.Errorf("Expecting playlist, got nil")
+ }
+
+ pl, err = vs.PollPlaylist(ctx)
+ if err != ErrSegmenterTimeout {
+ t.Errorf("Expecting timeout error, got %v", err)
+ }
+}
+
+func TestPollSegTimeout(t *testing.T) {
+ wd, _ := os.Getwd()
+ workDir := wd + "/tmp"
+ os.RemoveAll(workDir)
+ os.Mkdir(workDir, 0700)
+
+ newSeg := `some random data`
+ err := ioutil.WriteFile(workDir+"/test_0.ts", []byte(newSeg), 0755)
+ err = ioutil.WriteFile(workDir+"/test_1.ts", []byte(newSeg), 0755)
+ if err != nil {
+ t.Errorf("Error writing playlist: %v", err)
+ }
+
+ vs := NewFFMpegVideoSegmenter(workDir, "test", "", time.Millisecond*100)
+ ctx := context.Background()
+ seg, err := vs.PollSegment(ctx)
+ if seg == nil {
+ t.Errorf("Expecting seg, got nil")
+ }
+
+ seg, err = vs.PollSegment(ctx)
+ if err != ErrSegmenterTimeout {
+ t.Errorf("Expecting timeout, got %v", err)
+ }
+
+}
View
@@ -13,7 +13,7 @@ var ErrNotFound = errors.New("Not Found")
type HLSDemuxer interface {
//This method should ONLY push a playlist onto a chan when it's a NEW playlist
- WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error)
+ PollPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error)
//This method should ONLY push a segment onto a chan when it's a NEW segment
WaitAndPopSegment(ctx context.Context, name string) ([]byte, error)
}
@@ -40,7 +40,7 @@ func NewHLSBuffer() *HLSBuffer {
}
func (b *HLSBuffer) WritePlaylist(p m3u8.MediaPlaylist) error {
-
+ // fmt.Println("Writing playlist")
b.lock.Lock()
b.plCache = p
b.plCacheNew = true
@@ -58,11 +58,15 @@ func (b *HLSBuffer) WriteSegment(name string, s []byte) error {
func (b *HLSBuffer) WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error) {
for {
-
+ b.lock.Lock()
if b.plCacheNew {
- return b.plCache, nil
+ defer b.lock.Unlock()
+
b.plCacheNew = false
+ return b.plCache, nil
}
+ b.lock.Unlock()

This comment has been minimized.

@dob

dob Apr 14, 2017

Member

Why not just defer this immediately after locking? Because the 1 second sleep can cause a deadlock?

@dob

dob Apr 14, 2017

Member

Why not just defer this immediately after locking? Because the 1 second sleep can cause a deadlock?

+
time.Sleep(time.Second * 1)
select {
case <-ctx.Done():
@@ -75,6 +79,7 @@ func (b *HLSBuffer) WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist,
func (b *HLSBuffer) WaitAndPopSegment(ctx context.Context, name string) ([]byte, error) {
for {
+ // fmt.Printf("HLSBuffer %v: segment keys: %v. Current name: %v\n", &b, b.sq.Keys(), name)
seg, found := b.sq.Get(name)
// glog.Infof("GetSegment: %v, %v", name, found)
if found {
View
@@ -20,6 +20,21 @@ var ErrBufferItemType = errors.New("Buffer Item Type Not Recognized")
var ErrDroppedRTMPStream = errors.New("RTMP Stream Stopped Without EOF")
var ErrHttpReqFailed = errors.New("Http Request Failed")
+type VideoFormat uint32
+
+var (
+ HLS = MakeVideoFormatType(avFormatTypeMagic + 1)
+ RTMP = MakeVideoFormatType(avFormatTypeMagic + 2)
+)
+
+func MakeVideoFormatType(base uint32) (c VideoFormat) {
+ c = VideoFormat(base) << videoFormatOtherBits
+ return
+}
+
+const avFormatTypeMagic = 577777
+const videoFormatOtherBits = 1
+
type RTMPEOF struct{}
type streamBuffer struct {
@@ -70,7 +85,7 @@ type Stream interface {
WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error
WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error
WriteHLSSegmentToStream(seg HLSSegment) error
- ReadHLSFromStream(buffer HLSMuxer) error
+ ReadHLSFromStream(ctx context.Context, buffer HLSMuxer) error
}
type VideoStream struct {
@@ -133,6 +148,18 @@ func (s *VideoStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser)
}
}
+func (s *VideoStream) WriteRTMPHeader(h []av.CodecData) {
+ s.buffer.push(h)
+}
+
+func (s *VideoStream) WriteRTMPPacket(p av.Packet) {
+ s.buffer.push(p)
+}
+
+func (s *VideoStream) WriteRTMPTrailer() {
+ s.buffer.push(RTMPEOF{})
+}
+
//WriteRTMPToStream writes a video stream from src into the stream.
func (s *VideoStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error {
defer src.Close()
@@ -191,18 +218,25 @@ func (s *VideoStream) WriteHLSSegmentToStream(seg HLSSegment) error {
}
//ReadHLSFromStream reads an HLS stream into an HLSBuffer
-func (s *VideoStream) ReadHLSFromStream(buffer HLSMuxer) error {
+func (s *VideoStream) ReadHLSFromStream(ctx context.Context, mux HLSMuxer) error {
for {
+ // fmt.Printf("Buffer len: %v\n", s.buffer.len())
item, err := s.buffer.poll(s.HLSTimeout)
if err != nil {
return err
}
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
switch item.(type) {
case m3u8.MediaPlaylist:
- buffer.WritePlaylist(item.(m3u8.MediaPlaylist))
+ mux.WritePlaylist(item.(m3u8.MediaPlaylist))
case HLSSegment:
- buffer.WriteSegment(item.(HLSSegment).Name, item.(HLSSegment).Data)
+ mux.WriteSegment(item.(HLSSegment).Name, item.(HLSSegment).Data)
default:
return ErrBufferItemType
}
Oops, something went wrong.