Skip to content

Commit

Permalink
parallel segment push (#129)
Browse files Browse the repository at this point in the history
* avoid load spikes on segment-encode-end
  • Loading branch information
AlexKordic committed Nov 2, 2022
1 parent 491de10 commit fe520dd
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 55 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ var DefaultBroadcasterURL = fmt.Sprintf("http://127.0.0.1:%d", DefaultBroadcaste
const DefaultCustomAPIUrl = "https://origin.livepeer.com/api/"

var RecordingCallback string = "http://127.0.0.1:8008/recording/status"

var TranscodingParallelJobs int = 5
151 changes: 96 additions & 55 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"fmt"
"net/url"
"sync"
"time"

"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
Expand Down Expand Up @@ -80,12 +82,10 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
// Grab some useful parameters to be used later from the TranscodeSegmentRequest
sourceManifestOSURL := transcodeRequest.UploadURL
transcodeProfiles := transcodeRequest.Profiles
callbackURL := transcodeRequest.CallbackURL

// If Profiles haven't been overridden, use the default set
if len(transcodeProfiles) == 0 {
transcodeProfiles = defaultTranscodeProfiles

if isInputVideoBiggerThanDefaults(inputInfo) {
videoTrack, err := inputInfo.GetVideoTrack()
if err != nil {
Expand Down Expand Up @@ -115,59 +115,40 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st

// Generate a unique ID to use when talking to the Broadcaster
manifestID := "manifest-" + config.RandomTrailer(8)

// Iterate through the segment URLs and transcode them
// Use channel to queue segments
queue := make(chan segmentInfo, len(sourceSegmentURLs))
for segmentIndex, u := range sourceSegmentURLs {
rc, err := clients.DownloadOSURL(u.URL)
if err != nil {
return outputs, fmt.Errorf("failed to download source segment %q: %s", u, err)
}

var tr clients.TranscodeResult
// If an AccessToken is provided via the request for transcode, then use remote Broadcasters.
// Otherwise, use the local harcoded Broadcaster.
if transcodeRequest.AccessToken != "" {
creds := clients.Credentials{
AccessToken: transcodeRequest.AccessToken,
CustomAPIURL: transcodeRequest.TranscodeAPIUrl,
}
broadcasterClient, _ := clients.NewRemoteBroadcasterClient(creds)
// Get renditions from remote broadcaster
tr, err = broadcasterClient.TranscodeSegmentWithRemoteBroadcaster(rc, int64(segmentIndex), transcodeProfiles, streamName, u.DurationMillis)
if err != nil {
return outputs, fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err)
}
} else {
// Get renditions from local broadcaster
tr, err = localBroadcasterClient.TranscodeSegment(rc, int64(segmentIndex), transcodeProfiles, u.DurationMillis, manifestID)
if err != nil {
return outputs, fmt.Errorf("failed to run TranscodeSegment: %s", err)
}
}
// Store renditions
for _, transcodedSegment := range tr.Renditions {
renditionIndex := getProfileIndex(transcodeProfiles, transcodedSegment.Name)
if renditionIndex == -1 {
return outputs, fmt.Errorf("failed to find profile with name %q while parsing rendition segment", transcodedSegment.Name)
}

relativeRenditionPath := fmt.Sprintf("rendition-%d/", renditionIndex)
relativeRenditionURL, err := url.Parse(relativeRenditionPath)
if err != nil {
return outputs, fmt.Errorf("error building rendition segment URL %q: %s", relativeRenditionPath, err)
}
renditionURL := targetOSURL.ResolveReference(relativeRenditionURL)

err = clients.UploadToOSURL(renditionURL.String(), fmt.Sprintf("%d.ts", segmentIndex), bytes.NewReader(transcodedSegment.MediaData))
if err != nil {
return outputs, fmt.Errorf("failed to upload master playlist: %s", err)
queue <- segmentInfo{Input: u, Index: segmentIndex}
}
close(queue)
// Use channel for recieving the errors
errors := make(chan error, 100)
// Start number of workers in parallel
var completed sync.WaitGroup
completed.Add(config.TranscodingParallelJobs)
for index := 0; index < config.TranscodingParallelJobs; index++ {
go func() {
defer completed.Done()
for segment := range queue {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetOSURL)
if err != nil {
errors <- err
return
}
var completedRatio = calculateCompletedRatio(len(sourceSegmentURLs), segment.Index+1)
if err = clients.DefaultCallbackClient.SendTranscodeStatus(transcodeRequest.CallbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil {
log.LogError(transcodeRequest.RequestID, "failed to send transcode status callback", err, "url", transcodeRequest.CallbackURL)
}
}
}

var completedRatio = calculateCompletedRatio(len(sourceSegmentURLs), segmentIndex+1)
if err = clients.DefaultCallbackClient.SendTranscodeStatus(callbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil {
log.LogError(transcodeRequest.RequestID, "failed to send transcode status callback", err, "url", callbackURL)
}
}()
// Add some desync interval to avoid load spikes on segment-encode-end
time.Sleep(713 * time.Millisecond)
}
// Wait for all segments to transcode or first error
select {
case <-channelFromWaitgroup(&completed):
case err = <-errors:
return outputs, err
}

// Build the manifests and push them to storage
Expand All @@ -183,14 +164,60 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
},
}
// Send the success callback
err = clients.DefaultCallbackClient.SendTranscodeStatusCompleted(callbackURL, inputInfo, outputs)
err = clients.DefaultCallbackClient.SendTranscodeStatusCompleted(transcodeRequest.CallbackURL, inputInfo, outputs)
if err != nil {
log.LogError(transcodeRequest.RequestID, "Failed to send TranscodeStatusCompleted callback", err, "url", callbackURL)
log.LogError(transcodeRequest.RequestID, "Failed to send TranscodeStatusCompleted callback", err, "url", transcodeRequest.CallbackURL)
}
// Return outputs for .dtsh file creation
return outputs, nil
}

func transcodeSegment(segment segmentInfo, streamName, manifestID string, transcodeRequest TranscodeSegmentRequest, transcodeProfiles []clients.EncodedProfile, targetOSURL *url.URL) error {
rc, err := clients.DownloadOSURL(segment.Input.URL)
if err != nil {
return fmt.Errorf("failed to download source segment %q: %s", segment.Input, err)
}
var tr clients.TranscodeResult
// If an AccessToken is provided via the request for transcode, then use remote Broadcasters.
// Otherwise, use the local harcoded Broadcaster.
if transcodeRequest.AccessToken != "" {
creds := clients.Credentials{
AccessToken: transcodeRequest.AccessToken,
CustomAPIURL: transcodeRequest.TranscodeAPIUrl,
}
broadcasterClient, _ := clients.NewRemoteBroadcasterClient(creds)
// TODO: failed to run TranscodeSegmentWithRemoteBroadcaster: CreateStream(): http POST(https://origin.livepeer.com/api/stream) returned 422 422 Unprocessable Entity
tr, err = broadcasterClient.TranscodeSegmentWithRemoteBroadcaster(rc, int64(segment.Index), transcodeProfiles, streamName, segment.Input.DurationMillis)
if err != nil {
return fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err)
}
} else {
tr, err = localBroadcasterClient.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID)
if err != nil {
return fmt.Errorf("failed to run TranscodeSegment: %s", err)
}
}
for _, transcodedSegment := range tr.Renditions {
renditionIndex := getProfileIndex(transcodeProfiles, transcodedSegment.Name)
if renditionIndex == -1 {
return fmt.Errorf("failed to find profile with name %q while parsing rendition segment", transcodedSegment.Name)
}

relativeRenditionPath := fmt.Sprintf("rendition-%d/", renditionIndex)
relativeRenditionURL, err := url.Parse(relativeRenditionPath)
if err != nil {
return fmt.Errorf("error building rendition segment URL %q: %s", relativeRenditionPath, err)
}
renditionURL := targetOSURL.ResolveReference(relativeRenditionURL)

err = clients.UploadToOSURL(renditionURL.String(), fmt.Sprintf("%d.ts", segment.Index), bytes.NewReader(transcodedSegment.MediaData))
if err != nil {
return fmt.Errorf("failed to upload master playlist: %s", err)
}
}
return nil
}

func getProfileIndex(transcodeProfiles []clients.EncodedProfile, profile string) int {
for i, p := range transcodeProfiles {
if p.Name == profile {
Expand All @@ -217,3 +244,17 @@ func isInputVideoBiggerThanDefaults(iv clients.InputVideo) bool {
}
return false
}

func channelFromWaitgroup(wg *sync.WaitGroup) chan bool {
completed := make(chan bool)
go func() {
wg.Wait()
close(completed)
}()
return completed
}

type segmentInfo struct {
Input SourceSegment
Index int
}

0 comments on commit fe520dd

Please sign in to comment.