Skip to content

Commit

Permalink
Fix for recordings
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Mar 27, 2024
1 parent e5a312b commit 88620cf
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 22 deletions.
15 changes: 8 additions & 7 deletions handlers/ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ func (h *HandlersCollection) NewFile() httprouter.Handle {
return
}

if job.ThumbnailsTargetURL != nil {
go func() {
if err := thumbnails.GenerateThumb(filename, content, job.ThumbnailsTargetURL); err != nil {
log.LogError(job.RequestID, "generate thumb failed", err, "in", path.Join(targetURLBase, filename), "out", job.ThumbnailsTargetURL)
}
}()
}
go func() {
if job.ThumbnailsTargetURL == nil {
return
}
if err := thumbnails.GenerateThumb(filename, content, job.ThumbnailsTargetURL); err != nil {
log.LogError(job.RequestID, "generate thumb failed", err, "in", path.Join(targetURLBase, filename), "out", job.ThumbnailsTargetURL)
}
}()
}

if err := backoff.Retry(func() error {
Expand Down
10 changes: 10 additions & 0 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
}
} else {
job.SegmentingTargetURL = job.SourceFile

go func() {
if job.ThumbnailsTargetURL == nil {
return
}
err := thumbnails.GenerateThumbsFromManifest(job.RequestID, job.SegmentingTargetURL, job.ThumbnailsTargetURL)
if err != nil {
log.LogError(job.RequestID, "generate thumbs failed", err, "in", job.SegmentingTargetURL, "out", job.ThumbnailsTargetURL)
}
}()
}
job.SegmentingDone = time.Now()
if job.HlsTargetURL != nil {
Expand Down
69 changes: 58 additions & 11 deletions thumbnails/thumbnails.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/go-tools/drivers"
ffmpeg "github.com/u2takey/ffmpeg-go"
"golang.org/x/sync/errgroup"
)

const resolution = "854:480"
Expand All @@ -27,8 +28,7 @@ const outputDir = "thumbnails"
// Wait a maximum of 5 mins for thumbnails to finish
var thumbWaitBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10)

func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
// download and parse the manifest
func getMediaManifest(requestID string, input string) (*m3u8.MediaPlaylist, error) {
var (
rc io.ReadCloser
err error
Expand All @@ -38,19 +38,28 @@ func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
return err
}, clients.DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error downloading manifest: %w", err)
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
manifest, playlistType, err := m3u8.DecodeFrom(rc, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}

if playlistType != m3u8.MEDIA {
return fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
return nil, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
}
mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
return nil, fmt.Errorf("failed to parse playlist as MediaPlaylist")
}
return mediaPlaylist, nil
}

func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
// download and parse the manifest
mediaPlaylist, err := getMediaManifest(requestID, input)
if err != nil {
return err
}

const layout = "15:04:05.000"
Expand All @@ -60,12 +69,10 @@ func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
if err != nil {
return err
}
var (
currentTime time.Time
segments = mediaPlaylist.GetAllSegments()
)

var currentTime time.Time
// loop through each segment, generate a vtt entry for it
for _, segment := range segments {
for _, segment := range mediaPlaylist.GetAllSegments() {
filename, err := thumbFilename(segment.URI)
if err != nil {
return err
Expand Down Expand Up @@ -140,6 +147,46 @@ func GenerateThumb(segmentURI string, input []byte, output *url.URL) error {
return nil
}

func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error {
// parse manifest and generate one thumbnail per segment
mediaPlaylist, err := getMediaManifest(requestID, input)
if err != nil {
return err
}
inputURL, err := url.Parse(input)
if err != nil {
return err
}

// parallelise the thumb uploads
uploadGroup, _ := errgroup.WithContext(context.Background())
uploadGroup.SetLimit(5)
for _, segment := range mediaPlaylist.GetAllSegments() {
segment := segment
uploadGroup.Go(func() error {
segURL := inputURL.JoinPath("..", segment.URI)
var (
rc io.ReadCloser
err error
)
err = backoff.Retry(func() error {
rc, err = clients.GetFile(context.Background(), requestID, segURL.String(), nil)
return err
}, clients.DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error downloading manifest: %w", err)
}
bs, err := io.ReadAll(rc)
if err != nil {
return err
}

return GenerateThumb(segment.URI, bs, output)
})
}
return uploadGroup.Wait()
}

func processSegment(input string, thumbOut string) error {
// generate thumbnail
var ffmpegErr bytes.Buffer
Expand Down
31 changes: 27 additions & 4 deletions thumbnails/thumbnails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,43 @@ func generateThumb(t *testing.T, filename string, out *url.URL) {
}

func TestGenerateThumbs(t *testing.T) {
segmentPrefix = "seg-"
wd, err := os.Getwd()
require.NoError(t, err)

// Test the non-recording flow where GenerateThumb is called by handlers/ffmpeg/ffmpeg.go
outDir, err := os.MkdirTemp(os.TempDir(), "thumbs*")
require.NoError(t, err)
defer os.RemoveAll(outDir)
out, err := url.Parse(outDir)
require.NoError(t, err)

wd, err := os.Getwd()
require.NoError(t, err)

segmentPrefix = "seg-"
generateThumb(t, path.Join(wd, "..", "test/fixtures/seg-0.ts"), out)
generateThumb(t, path.Join(wd, "..", "test/fixtures/seg-1.ts"), out)
generateThumb(t, path.Join(wd, "..", "test/fixtures/seg-2.ts"), out)

testGenerateThumbsRun(t, outDir)

// Test the recording flow
outDir, err = os.MkdirTemp(os.TempDir(), "thumbs*")
require.NoError(t, err)
defer os.RemoveAll(outDir)
out, err = url.Parse(outDir)
require.NoError(t, err)

err = GenerateThumbsFromManifest("req ID", path.Join(wd, "..", "test/fixtures/tiny.m3u8"), out)
require.NoError(t, err)

testGenerateThumbsRun(t, outDir)
}

func testGenerateThumbsRun(t *testing.T, outDir string) {
out, err := url.Parse(outDir)
require.NoError(t, err)

wd, err := os.Getwd()
require.NoError(t, err)

err = GenerateThumbsVTT("req ID", path.Join(wd, "..", "test/fixtures/tiny.m3u8"), out)
require.NoError(t, err)

Expand Down

0 comments on commit 88620cf

Please sign in to comment.