Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop transcoding on first error #174

Merged
merged 3 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"
"time"
)

var Version string
Expand All @@ -22,3 +23,5 @@ const DefaultCustomAPIUrl = "https://origin.livepeer.com/api/"
var RecordingCallback string = "http://127.0.0.1:8008/recording/status"

var TranscodingParallelJobs int = 5

var TranscodingParallelSleep time.Duration = 713 * time.Millisecond
51 changes: 17 additions & 34 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path"
"strconv"
"sync"
"time"

"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
Expand Down Expand Up @@ -133,40 +132,24 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
// transcodedStats hold actual info from transcoded results within requested constraints (this usually differs from requested profiles)
transcodedStats := statsFromProfiles(transcodeProfiles)

// Iterate through the segment URLs and transcode them
// Use channel to queue segments
queue := make(chan segmentInfo, len(sourceSegmentURLs))
for segmentIndex, u := range sourceSegmentURLs {
queue <- segmentInfo{Input: u, Index: segmentIndex}
}
close(queue)
// Use channel for recieving the errors
errors := make(chan error, 100)
// Start number of workers in parallel
var completed sync.WaitGroup
completed.Add(config.TranscodingParallelJobs)
for index := 0; index < config.TranscodingParallelJobs; index++ {
go func() {
defer completed.Done()
for segment := range queue {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetTranscodedRenditionOutputURL, transcodedStats)
if err != nil {
errors <- err
return
}
var completedRatio = calculateCompletedRatio(len(sourceSegmentURLs), segment.Index+1)
if err = clients.DefaultCallbackClient.SendTranscodeStatus(transcodeRequest.CallbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil {
log.LogError(transcodeRequest.RequestID, "failed to send transcode status callback", err, "url", transcodeRequest.CallbackURL)
}
var jobs *ParallelTranscoding
jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetTranscodedRenditionOutputURL, transcodedStats)
if err != nil {
return err
}
if jobs.IsRunning() {
// Sending callback only if we are still running
var completedRatio = calculateCompletedRatio(jobs.GetTotalCount(), jobs.GetCompletedCount()+1)
if err = clients.DefaultCallbackClient.SendTranscodeStatus(transcodeRequest.CallbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil {
log.LogError(transcodeRequest.RequestID, "failed to send transcode status callback", err, "url", transcodeRequest.CallbackURL)
}
}()
// Add some desync interval to avoid load spikes on segment-encode-end
time.Sleep(713 * time.Millisecond)
}
// Wait for all segments to transcode or first error
select {
case <-channelFromWaitgroup(&completed):
case err = <-errors:
}
return nil
})
jobs.Start()
if err = jobs.Wait(); err != nil {
// return first error to caller
return outputs, err
}

Expand Down
106 changes: 106 additions & 0 deletions transcode/transcode_jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package transcode

import (
"sync"
"time"

"github.com/livepeer/catalyst-api/config"
)

type ParallelTranscoding struct {
queue chan segmentInfo
errors chan error
completed sync.WaitGroup
work func(segment segmentInfo) error

m sync.Mutex
isRunning bool
totalSegments int
completedSegments int
}

func NewParallelTranscoding(sourceSegmentURLs []SourceSegment, work func(segment segmentInfo) error) *ParallelTranscoding {
jobs := &ParallelTranscoding{
queue: make(chan segmentInfo, len(sourceSegmentURLs)),
errors: make(chan error, 100),
work: work,
isRunning: true,
totalSegments: len(sourceSegmentURLs),
}
// post all jobs on buffered queue for goroutines to process
for segmentIndex, u := range sourceSegmentURLs {
jobs.queue <- segmentInfo{Input: u, Index: segmentIndex}
}
close(jobs.queue)
return jobs
}

// Start spawns configured number of goroutines to process segments in parallel
func (t *ParallelTranscoding) Start() {
t.completed.Add(config.TranscodingParallelJobs)
for index := 0; index < config.TranscodingParallelJobs; index++ {
go t.workerRoutine()
// Add some desync interval to avoid load spikes on segment-encode-end
time.Sleep(config.TranscodingParallelSleep)
}
}

func (t *ParallelTranscoding) Stop() {
t.m.Lock()
defer t.m.Unlock()
t.isRunning = false
}

func (t *ParallelTranscoding) IsRunning() bool {
t.m.Lock()
defer t.m.Unlock()
return t.isRunning
}

func (t *ParallelTranscoding) GetTotalCount() int {
// not updating totalSegments, no lock needed here
return t.totalSegments
}

func (t *ParallelTranscoding) GetCompletedCount() int {
t.m.Lock()
defer t.m.Unlock()
return t.completedSegments
}

// Wait waits for all segments to transcode or first error
func (t *ParallelTranscoding) Wait() error {
select {
case <-channelFromWaitgroup(&t.completed):
return nil
case err := <-t.errors:
return err
}
}

func (t *ParallelTranscoding) segmentCompleted() {
t.m.Lock()
defer t.m.Unlock()
if !t.isRunning {
// in case of error further progress is denied
return
}
t.completedSegments += 1
}

func (t *ParallelTranscoding) workerRoutine() {
defer t.completed.Done()
for segment := range t.queue {
if !t.IsRunning() {
return
}
err := t.work(segment)
if err != nil {
// stop all other goroutines on first error
t.Stop()
t.errors <- err
return
}
t.segmentCompleted()
}
}
60 changes: 60 additions & 0 deletions transcode/transcode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"path"
"path/filepath"
"strconv"
"sync"
"testing"
"time"

"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
Expand Down Expand Up @@ -157,3 +159,61 @@ func TestItCalculatesTheTranscodeCompletionPercentageCorrectly(t *testing.T) {
require.Equal(t, 0.01, calculateCompletedRatio(100, 1))
require.Equal(t, 0.6, calculateCompletedRatio(100, 60))
}

func TestParallelJobFailureStopsNextBatch(t *testing.T) {
config.TranscodingParallelJobs = 3
config.TranscodingParallelSleep = 0
sourceSegmentURLs := []SourceSegment{
// First 3 jobs run in parallel, second one fails
{"1.ts", 1000}, {"2.ts", 1000}, {"3.ts", 1000},
// Rest of jobs should not be processed
{"4.ts", 1000}, {"5.ts", 1000}, {"6.ts", 1000},
}
halted := fmt.Errorf("halted")
m := sync.Mutex{}
var handlerIndex int = 0
jobs := NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
time.Sleep(50 * time.Millisecond) // simulate processing
m.Lock()
defer m.Unlock()
defer func() { handlerIndex += 1 }()
if handlerIndex == 0 {
return nil
}
if handlerIndex == 1 {
return halted
}
return fmt.Errorf("failure detected late")
})
jobs.Start()
err := jobs.Wait()
// Check we got first error
require.Error(t, err)
require.Error(t, err, halted)
// Check progress state is properly set
require.Equal(t, 6, jobs.GetTotalCount())
require.Equal(t, 1, jobs.GetCompletedCount())
time.Sleep(10 * time.Millisecond) // wait for other workers to exit
}

func TestParallelJobSaveTime(t *testing.T) {
config.TranscodingParallelJobs = 3
config.TranscodingParallelSleep = 0
sourceSegmentURLs := []SourceSegment{
// First 3 jobs should end at ~51ms mark
{"1.ts", 1000}, {"2.ts", 1000}, {"3.ts", 1000},
// Second 3 jobs should end at ~101ms mark
{"4.ts", 1000}, {"5.ts", 1000}, {"6.ts", 1000},
}
start := time.Now()
jobs := NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
time.Sleep(50 * time.Millisecond)
return nil
})
jobs.Start()
require.NoError(t, jobs.Wait())
elapsed := time.Since(start)
require.Greater(t, elapsed, 60*time.Millisecond)
require.Less(t, elapsed, 160*time.Millisecond) // usually takes less than 101ms on idle machine
time.Sleep(10 * time.Millisecond) // wait for other workers to exit
}