Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor thumbs to avoid storage egress #1196

Merged
merged 5 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 16 additions & 5 deletions handlers/ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/pipeline"
"github.com/livepeer/catalyst-api/thumbnails"
)

type HandlersCollection struct {
Expand Down Expand Up @@ -43,6 +44,12 @@ func (h *HandlersCollection) NewFile() httprouter.Handle {
err error
)
reg := regexp.MustCompile(`[^/]+.m3u8$`)
// job.SegmentingTargetURL comes in the format the Mist wants, looking like:
// protocol://abc@123:s3.com/a/b/c/<something>.m3u8
// but since this endpoint receives both .ts segments and m3u8 updates, we strip off the filename
// and pass the one ffmpeg gives us to UploadToOSURL instead
targetURLBase := reg.ReplaceAllString(job.SegmentingTargetURL, "")

if reg.MatchString(filename) {
// ensure that playlist type in the manifest is set to vod
buf := bytes.Buffer{}
Expand Down Expand Up @@ -77,12 +84,16 @@ func (h *HandlersCollection) NewFile() httprouter.Handle {
errors.WriteHTTPInternalServerError(w, "Error reading body", err)
return
}

go func() {
if job.ThumbnailsTargetURL == nil {
return
}
if err := thumbnails.GenerateThumb(filename, content, job.ThumbnailsTargetURL); err != nil {
log.LogError(job.RequestID, "generate thumb failed", err, "in", path.Join(targetURLBase, filename), "out", job.ThumbnailsTargetURL)
}
}()
}
// job.SegmentingTargetURL comes in the format the Mist wants, looking like:
// protocol://abc@123:s3.com/a/b/c/<something>.m3u8
// but since this endpoint receives both .ts segments and m3u8 updates, we strip off the filename
// and pass the one ffmpeg gives us to UploadToOSURL instead
targetURLBase := reg.ReplaceAllString(job.SegmentingTargetURL, "")

if err := backoff.Retry(func() error {
err := clients.UploadToOSURL(targetURLBase, filename, bytes.NewReader(content), config.SEGMENT_WRITE_TIMEOUT)
Expand Down
22 changes: 10 additions & 12 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,22 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
}
} else {
job.SegmentingTargetURL = job.SourceFile
}
job.SegmentingDone = time.Now()
if job.HlsTargetURL != nil {
f.sendSourcePlayback(job)
}
job.ReportProgress(clients.TranscodeStatusPreparingCompleted, 1)

if job.ThumbnailsTargetURL != nil {
go func() {
log.Log(job.RequestID, "generating thumbs VTT")
err := thumbnails.GenerateThumbs(job.RequestID, job.SegmentingTargetURL, job.ThumbnailsTargetURL)
if job.ThumbnailsTargetURL == nil {
return
}
err := thumbnails.GenerateThumbsFromManifest(job.RequestID, job.SegmentingTargetURL, job.ThumbnailsTargetURL)
if err != nil {
log.LogError(job.RequestID, "generate thumbs failed", err, "in", job.SegmentingTargetURL, "out", job.ThumbnailsTargetURL)
} else {
log.Log(job.RequestID, "generate thumbs succeeded", "in", job.SegmentingTargetURL, "out", job.ThumbnailsTargetURL)
}
}()
}
job.SegmentingDone = time.Now()
if job.HlsTargetURL != nil {
f.sendSourcePlayback(job)
}
job.ReportProgress(clients.TranscodeStatusPreparingCompleted, 1)

// Transcode Beginning
log.Log(job.RequestID, "Beginning transcoding via FFMPEG/Livepeer pipeline")
Expand Down Expand Up @@ -168,7 +166,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {

// wait for thumbs background process
if job.ThumbnailsTargetURL != nil {
err := thumbnails.WaitForThumbs(job.RequestID, job.ThumbnailsTargetURL)
err := thumbnails.GenerateThumbsVTT(job.RequestID, job.SegmentingTargetURL, job.ThumbnailsTargetURL)
if err != nil {
log.LogError(job.RequestID, "waiting for thumbs failed", err, "out", job.ThumbnailsTargetURL)
} else {
Expand Down
199 changes: 134 additions & 65 deletions thumbnails/thumbnails.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -22,115 +25,180 @@ const resolution = "854:480"
const vttFilename = "thumbnails.vtt"
const outputDir = "thumbnails"

func GenerateThumbs(requestID, input string, output *url.URL) error {
inputURL, err := url.Parse(input)
if err != nil {
return err
}
// download and parse the manifest
var rc io.ReadCloser
// Wait a maximum of 5 mins for thumbnails to finish
var thumbWaitBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10)

func getMediaManifest(requestID string, input string) (*m3u8.MediaPlaylist, error) {
var (
rc io.ReadCloser
err error
)
err = backoff.Retry(func() error {
rc, err = clients.GetFile(context.Background(), requestID, input, nil)
return err
}, clients.DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error downloading manifest: %w", err)
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
manifest, playlistType, err := m3u8.DecodeFrom(rc, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}

if playlistType != m3u8.MEDIA {
return fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
return nil, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
}
mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
return nil, fmt.Errorf("failed to parse playlist as MediaPlaylist")
}
return mediaPlaylist, nil
}

tempDir, err := os.MkdirTemp(os.TempDir(), "thumbs-*")
func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
// download and parse the manifest
mediaPlaylist, err := getMediaManifest(requestID, input)
if err != nil {
return fmt.Errorf("failed to make temp dir: %w", err)
return err
}
defer os.RemoveAll(tempDir)

const layout = "15:04:05.000"
outputLocation := output.JoinPath(outputDir).String()
outputLocation := output.JoinPath(outputDir)
builder := &bytes.Buffer{}
_, err = builder.WriteString("WEBVTT\n")
if err != nil {
return err
}
var (
currentTime time.Time
segments = mediaPlaylist.GetAllSegments()
thumbOuts = make([]string, len(segments))
)
// loop through each segment, generate a thumbnail image and upload it to storage
for i, segment := range segments {
thumbOut, err := processSegment(inputURL, segment, tempDir, outputLocation)

var currentTime time.Time
// loop through each segment, generate a vtt entry for it
for _, segment := range mediaPlaylist.GetAllSegments() {
filename, err := thumbFilename(path.Base(segment.URI))
if err != nil {
return err
}
thumbOuts[i] = thumbOut
// check thumbnail file exists on storage
err = backoff.Retry(func() error {
_, err := clients.GetFile(context.Background(), requestID, outputLocation.JoinPath(filename).String(), nil)
return err
}, thumbWaitBackoff)
if err != nil {
return fmt.Errorf("failed to find thumb %s: %w", filename, err)
}

start := currentTime.Format(layout)
currentTime = currentTime.Add(time.Duration(segment.Duration) * time.Second)
end := currentTime.Format(layout)
_, err = builder.WriteString(fmt.Sprintf("%s --> %s\n%s\n\n", start, end, path.Base(thumbOut)))
_, err = builder.WriteString(fmt.Sprintf("%s --> %s\n%s\n\n", start, end, filename))
if err != nil {
return err
}
}

// parallelise the thumb uploads
uploadGroup, _ := errgroup.WithContext(context.Background())
uploadGroup.SetLimit(5)
for _, thumbOut := range thumbOuts {
thumbOut := thumbOut
uploadGroup.Go(func() error {
return backoff.Retry(func() error {
// upload thumbnail to storage
fileReader, err := os.Open(thumbOut)
if err != nil {
return err
}
defer fileReader.Close()
err = clients.UploadToOSURL(outputLocation, path.Base(thumbOut), fileReader, 2*time.Minute)
if err != nil {
return fmt.Errorf("failed to upload thumbnail %s: %w", thumbOut, err)
}
return nil
}, clients.UploadRetryBackoff())
})
// upload VTT file
vttContent := builder.Bytes()
err = backoff.Retry(func() error {
return clients.UploadToOSURLFields(outputLocation.String(), vttFilename, bytes.NewReader(vttContent), time.Minute, &drivers.FileProperties{ContentType: "text/vtt"})
}, clients.UploadRetryBackoff())
if err != nil {
return fmt.Errorf("failed to upload vtt: %w", err)
}
return nil
}

func GenerateThumb(segmentURI string, input []byte, output *url.URL) error {
tempDir, err := os.MkdirTemp(os.TempDir(), "thumbs-*")
if err != nil {
return fmt.Errorf("failed to make temp dir: %w", err)
}
defer os.RemoveAll(tempDir)
outputLocation := output.JoinPath(outputDir)

inFilename := filepath.Join(tempDir, segmentURI)
if err := os.WriteFile(inFilename, input, 0644); err != nil {
return err
}
err = uploadGroup.Wait()

filename, err := thumbFilename(segmentURI)
if err != nil {
return err
}

err = clients.UploadToOSURLFields(outputLocation, vttFilename, builder, time.Minute, &drivers.FileProperties{ContentType: "text/vtt"})
thumbOut := path.Join(tempDir, filename)
if err := processSegment(inFilename, thumbOut); err != nil {
return err
}

err = backoff.Retry(func() error {
// upload thumbnail to storage
fileReader, err := os.Open(thumbOut)
if err != nil {
return err
}
defer fileReader.Close()
err = clients.UploadToOSURL(outputLocation.String(), path.Base(thumbOut), fileReader, 2*time.Minute)
if err != nil {
return fmt.Errorf("failed to upload thumbnail %s: %w", thumbOut, err)
}
return nil
}, clients.UploadRetryBackoff())
if err != nil {
return fmt.Errorf("failed to upload vtt: %w", err)
return err
}

return nil
}

func processSegment(inputURL *url.URL, segment *m3u8.MediaSegment, tempDir string, outputLocation string) (string, error) {
segURL := inputURL.JoinPath("..", segment.URI)
signed, err := clients.SignURL(segURL)
func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error {
// parse manifest and generate one thumbnail per segment
mediaPlaylist, err := getMediaManifest(requestID, input)
if err != nil {
return "", fmt.Errorf("error signing segment url %s: %w", segURL, err)
return err
}
inputURL, err := url.Parse(input)
if err != nil {
return err
}

// parallelise the thumb uploads
uploadGroup, _ := errgroup.WithContext(context.Background())
uploadGroup.SetLimit(5)
for _, segment := range mediaPlaylist.GetAllSegments() {
segment := segment
uploadGroup.Go(func() error {
segURL := inputURL.JoinPath("..", segment.URI)
var (
rc io.ReadCloser
err error
)
// save the segment to memory
err = backoff.Retry(func() error {
rc, err = clients.GetFile(context.Background(), requestID, segURL.String(), nil)
return err
}, clients.DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error downloading manifest: %w", err)
}
bs, err := io.ReadAll(rc)
if err != nil {
return err
}

// generate thumbnail for the segment
return GenerateThumb(path.Base(segment.URI), bs, output)
})
}
return uploadGroup.Wait()
}

func processSegment(input string, thumbOut string) error {
// generate thumbnail
var ffmpegErr bytes.Buffer
thumbOut := path.Join(tempDir, fmt.Sprintf("keyframes_%d.jpg", segment.SeqId))
err = backoff.Retry(func() error {

err := backoff.Retry(func() error {
ffmpegErr = bytes.Buffer{}
return ffmpeg.
Input(signed, ffmpeg.KwArgs{"skip_frame": "nokey"}). // only extract key frames
Input(input, ffmpeg.KwArgs{"skip_frame": "nokey"}). // only extract key frames
Output(
thumbOut,
ffmpeg.KwArgs{
Expand All @@ -142,19 +210,20 @@ func processSegment(inputURL *url.URL, segment *m3u8.MediaSegment, tempDir strin
).OverWriteOutput().WithErrorOutput(&ffmpegErr).Run()
}, clients.DownloadRetryBackoff())
if err != nil {
return "", fmt.Errorf("error running ffmpeg for thumbnails %s [%s]: %w", segURL, ffmpegErr.String(), err)
return fmt.Errorf("error running ffmpeg for thumbnails %s [%s]: %w", input, ffmpegErr.String(), err)
}

return thumbOut, nil
return nil
}

// Wait a maximum of 5 mins for thumbnails to finish
var vttBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10)
var segmentPrefix = "index"

func WaitForThumbs(requestID string, output *url.URL) error {
vtt := output.JoinPath(outputDir, vttFilename).String()
return backoff.Retry(func() error {
_, err := clients.GetFile(context.Background(), requestID, vtt, nil)
return err
}, vttBackoff)
func thumbFilename(segmentURI string) (string, error) {
// segmentURI will be index%d.ts
index := strings.TrimSuffix(strings.TrimPrefix(segmentURI, segmentPrefix), ".ts")
i, err := strconv.ParseInt(index, 10, 32)
if err != nil {
return "", fmt.Errorf("thumbFilename failed for %s: %w", segmentURI, err)
}
return fmt.Sprintf("keyframes_%d.jpg", i), nil
}