diff --git a/config/config.go b/config/config.go index 7bcd9b162..6311f4451 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "time" ) var Version string @@ -22,3 +23,5 @@ const DefaultCustomAPIUrl = "https://origin.livepeer.com/api/" var RecordingCallback string = "http://127.0.0.1:8008/recording/status" var TranscodingParallelJobs int = 5 + +var TranscodingParallelSleep time.Duration = 713 * time.Millisecond diff --git a/transcode/transcode.go b/transcode/transcode.go index e612bb1de..da7195cc0 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -7,7 +7,6 @@ import ( "path" "strconv" "sync" - "time" "github.com/livepeer/catalyst-api/clients" "github.com/livepeer/catalyst-api/config" @@ -133,40 +132,24 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st // transcodedStats hold actual info from transcoded results within requested constraints (this usually differs from requested profiles) transcodedStats := statsFromProfiles(transcodeProfiles) - // Iterate through the segment URLs and transcode them - // Use channel to queue segments - queue := make(chan segmentInfo, len(sourceSegmentURLs)) - for segmentIndex, u := range sourceSegmentURLs { - queue <- segmentInfo{Input: u, Index: segmentIndex} - } - close(queue) - // Use channel for recieving the errors - errors := make(chan error, 100) - // Start number of workers in parallel - var completed sync.WaitGroup - completed.Add(config.TranscodingParallelJobs) - for index := 0; index < config.TranscodingParallelJobs; index++ { - go func() { - defer completed.Done() - for segment := range queue { - err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetTranscodedRenditionOutputURL, transcodedStats) - if err != nil { - errors <- err - return - } - var completedRatio = calculateCompletedRatio(len(sourceSegmentURLs), segment.Index+1) - if err = clients.DefaultCallbackClient.SendTranscodeStatus(transcodeRequest.CallbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil { - log.LogError(transcodeRequest.RequestID, "failed to send transcode status callback", err, "url", transcodeRequest.CallbackURL) - } + var jobs *ParallelTranscoding + jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { + err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetTranscodedRenditionOutputURL, transcodedStats) + if err != nil { + return err + } + if jobs.IsRunning() { + // Sending callback only if we are still running + var completedRatio = calculateCompletedRatio(jobs.GetTotalCount(), jobs.GetCompletedCount()+1) + if err = clients.DefaultCallbackClient.SendTranscodeStatus(transcodeRequest.CallbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil { + log.LogError(transcodeRequest.RequestID, "failed to send transcode status callback", err, "url", transcodeRequest.CallbackURL) } - }() - // Add some desync interval to avoid load spikes on segment-encode-end - time.Sleep(713 * time.Millisecond) - } - // Wait for all segments to transcode or first error - select { - case <-channelFromWaitgroup(&completed): - case err = <-errors: + } + return nil + }) + jobs.Start() + if err = jobs.Wait(); err != nil { + // return first error to caller return outputs, err } diff --git a/transcode/transcode_jobs.go b/transcode/transcode_jobs.go new file mode 100644 index 000000000..f2fd89c1e --- /dev/null +++ b/transcode/transcode_jobs.go @@ -0,0 +1,106 @@ +package transcode + +import ( + "sync" + "time" + + "github.com/livepeer/catalyst-api/config" +) + +type ParallelTranscoding struct { + queue chan segmentInfo + errors chan error + completed sync.WaitGroup + work func(segment segmentInfo) error + + m sync.Mutex + isRunning bool + totalSegments int + completedSegments int +} + +func NewParallelTranscoding(sourceSegmentURLs []SourceSegment, work func(segment segmentInfo) error) *ParallelTranscoding { + jobs := &ParallelTranscoding{ + queue: make(chan segmentInfo, len(sourceSegmentURLs)), + errors: make(chan error, 100), + work: work, + isRunning: true, + totalSegments: len(sourceSegmentURLs), + } + // post all jobs on buffered queue for goroutines to process + for segmentIndex, u := range sourceSegmentURLs { + jobs.queue <- segmentInfo{Input: u, Index: segmentIndex} + } + close(jobs.queue) + return jobs +} + +// Start spawns configured number of goroutines to process segments in parallel +func (t *ParallelTranscoding) Start() { + t.completed.Add(config.TranscodingParallelJobs) + for index := 0; index < config.TranscodingParallelJobs; index++ { + go t.workerRoutine() + // Add some desync interval to avoid load spikes on segment-encode-end + time.Sleep(config.TranscodingParallelSleep) + } +} + +func (t *ParallelTranscoding) Stop() { + t.m.Lock() + defer t.m.Unlock() + t.isRunning = false +} + +func (t *ParallelTranscoding) IsRunning() bool { + t.m.Lock() + defer t.m.Unlock() + return t.isRunning +} + +func (t *ParallelTranscoding) GetTotalCount() int { + // not updating totalSegments, no lock needed here + return t.totalSegments +} + +func (t *ParallelTranscoding) GetCompletedCount() int { + t.m.Lock() + defer t.m.Unlock() + return t.completedSegments +} + +// Wait waits for all segments to transcode or first error +func (t *ParallelTranscoding) Wait() error { + select { + case <-channelFromWaitgroup(&t.completed): + return nil + case err := <-t.errors: + return err + } +} + +func (t *ParallelTranscoding) segmentCompleted() { + t.m.Lock() + defer t.m.Unlock() + if !t.isRunning { + // in case of error further progress is denied + return + } + t.completedSegments += 1 +} + +func (t *ParallelTranscoding) workerRoutine() { + defer t.completed.Done() + for segment := range t.queue { + if !t.IsRunning() { + return + } + err := t.work(segment) + if err != nil { + // stop all other goroutines on first error + t.Stop() + t.errors <- err + return + } + t.segmentCompleted() + } +} diff --git a/transcode/transcode_test.go b/transcode/transcode_test.go index 02177c8aa..e613df354 100644 --- a/transcode/transcode_test.go +++ b/transcode/transcode_test.go @@ -10,7 +10,9 @@ import ( "path" "path/filepath" "strconv" + "sync" "testing" + "time" "github.com/livepeer/catalyst-api/clients" "github.com/livepeer/catalyst-api/config" @@ -157,3 +159,61 @@ func TestItCalculatesTheTranscodeCompletionPercentageCorrectly(t *testing.T) { require.Equal(t, 0.01, calculateCompletedRatio(100, 1)) require.Equal(t, 0.6, calculateCompletedRatio(100, 60)) } + +func TestParallelJobFailureStopsNextBatch(t *testing.T) { + config.TranscodingParallelJobs = 3 + config.TranscodingParallelSleep = 0 + sourceSegmentURLs := []SourceSegment{ + // First 3 jobs run in parallel, second one fails + {"1.ts", 1000}, {"2.ts", 1000}, {"3.ts", 1000}, + // Rest of jobs should not be processed + {"4.ts", 1000}, {"5.ts", 1000}, {"6.ts", 1000}, + } + halted := fmt.Errorf("halted") + m := sync.Mutex{} + var handlerIndex int = 0 + jobs := NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { + time.Sleep(50 * time.Millisecond) // simulate processing + m.Lock() + defer m.Unlock() + defer func() { handlerIndex += 1 }() + if handlerIndex == 0 { + return nil + } + if handlerIndex == 1 { + return halted + } + return fmt.Errorf("failure detected late") + }) + jobs.Start() + err := jobs.Wait() + // Check we got first error + require.Error(t, err) + require.Error(t, err, halted) + // Check progress state is properly set + require.Equal(t, 6, jobs.GetTotalCount()) + require.Equal(t, 1, jobs.GetCompletedCount()) + time.Sleep(10 * time.Millisecond) // wait for other workers to exit +} + +func TestParallelJobSaveTime(t *testing.T) { + config.TranscodingParallelJobs = 3 + config.TranscodingParallelSleep = 0 + sourceSegmentURLs := []SourceSegment{ + // First 3 jobs should end at ~51ms mark + {"1.ts", 1000}, {"2.ts", 1000}, {"3.ts", 1000}, + // Second 3 jobs should end at ~101ms mark + {"4.ts", 1000}, {"5.ts", 1000}, {"6.ts", 1000}, + } + start := time.Now() + jobs := NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { + time.Sleep(50 * time.Millisecond) + return nil + }) + jobs.Start() + require.NoError(t, jobs.Wait()) + elapsed := time.Since(start) + require.Greater(t, elapsed, 60*time.Millisecond) + require.Less(t, elapsed, 160*time.Millisecond) // usually takes less than 101ms on idle machine + time.Sleep(10 * time.Millisecond) // wait for other workers to exit +}