diff --git a/transcode/transcode.go b/transcode/transcode.go index dc09774f..b6b077a8 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -18,11 +19,12 @@ import ( "github.com/livepeer/catalyst-api/log" "github.com/livepeer/catalyst-api/metrics" "github.com/livepeer/catalyst-api/video" + "golang.org/x/sync/errgroup" ) const ( - UploadTimeout = 5 * time.Minute - SegmentChannelSize = 10 + UploadTimeout = 5 * time.Minute + SegmentWritersCount = 10 ) type TranscodeSegmentRequest struct { @@ -139,29 +141,32 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } } renditionList.AddRenditionSegment(maxProfile.Name, - &video.TSegmentList{ - SegmentDataTable: make(map[int][]byte), - }) + &video.TSegmentList{}) } else { for _, profile := range transcodeProfiles { renditionList.AddRenditionSegment(profile.Name, - &video.TSegmentList{ - SegmentDataTable: make(map[int][]byte), - }) + &video.TSegmentList{}) } } } - // Create a buffered channel where transcoded segments are sent to be written to disk - segmentChannel := make(chan video.TranscodedSegmentInfo, SegmentChannelSize) - - // Create a waitgroup to synchronize when the disk writing goroutine finishes - var wg sync.WaitGroup + var TransmuxStorageDir string + if transcodeRequest.GenerateMP4 { + var err error + // Create folder to hold transmux-ed files in local storage temporarily + TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_") + if err != nil && !os.IsExist(err) { + log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) + return outputs, segmentsCount, err + } + defer os.RemoveAll(TransmuxStorageDir) + } + segFileWriter := newSegmentFileWriter(TransmuxStorageDir) // Setup parallel transcode sessions var jobs *ParallelTranscoding jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { - err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel) + err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segFileWriter) segmentsCount++ if err != nil { return err @@ -174,45 +179,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return nil }) - var TransmuxStorageDir string - if transcodeRequest.GenerateMP4 { - var err error - // Create folder to hold transmux-ed files in local storage temporarily - TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_") - if err != nil && !os.IsExist(err) { - log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) - return outputs, segmentsCount, err - } - defer os.RemoveAll(TransmuxStorageDir) - - // Start the disk-writing (consumer) goroutine - wg.Add(1) - go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) { - var segmentBatch []video.TranscodedSegmentInfo - defer wg.Done() - - // Keep checking for new segments in the buffered channel - for segInfo := range segmentChannel { - segmentBatch = append(segmentBatch, segInfo) - // Begin writing to disk if at-least 50% of buffered channel is full - if len(segmentBatch) >= SegmentChannelSize/2 { - err := video.WriteSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) - if err != nil { - return - } - segmentBatch = nil - } - } - // Handle any remaining segments after the channel is closed - if len(segmentBatch) > 0 { - err := video.WriteSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) - if err != nil { - return - } - } - }(TransmuxStorageDir, &renditionList) - } - // Start the transcoding (producer) goroutines jobs.Start() if err = jobs.Wait(); err != nil { @@ -220,11 +186,10 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return outputs, segmentsCount, err } - // If the disk-writing gorouine was started, then close the segment channel to - // signal that no more segments will be sent. This will be a no-op if MP4s are not requested. - close(segmentChannel) // Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested. - wg.Wait() + if err = segFileWriter.wait(); err != nil { + return outputs, segmentsCount, fmt.Errorf("error writing segments to disk: %w", err) + } // Build the manifests and push them to storage manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip) @@ -416,6 +381,37 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return outputs, segmentsCount, nil } +type segmentFileWriter struct { + group errgroup.Group + transmuxTopLevelDir string +} + +func newSegmentFileWriter(transmuxTopLevelDir string) *segmentFileWriter { + s := &segmentFileWriter{transmuxTopLevelDir: transmuxTopLevelDir} + s.group.SetLimit(SegmentWritersCount) + return s +} + +func (s *segmentFileWriter) wait() error { + return s.group.Wait() +} + +func (s *segmentFileWriter) writeSegment(requestID string, renditionName string, segIndex int, segmentData []byte) { + s.group.Go(func() error { + segmentFilename := filepath.Join(s.transmuxTopLevelDir, requestID+"_"+renditionName+"_"+strconv.Itoa(segIndex)+".ts") + segmentFile, err := os.Create(segmentFilename) + if err != nil { + return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err) + } + defer segmentFile.Close() + _, err = segmentFile.Write(segmentData) + if err != nil { + return fmt.Errorf("error writing segment err: %w", err) + } + return nil + }) +} + func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) { var mp4OutputsPre []video.OutputVideoFile // e. Upload all mp4 related output files @@ -489,7 +485,7 @@ func transcodeSegment( transcodedStats []*video.RenditionStats, renditionList *video.TRenditionList, broadcaster clients.BroadcasterClient, - segmentChannel chan<- video.TranscodedSegmentInfo, + segFilewriter *segmentFileWriter, ) error { start := time.Now() @@ -569,20 +565,13 @@ func transcodeSegment( // get inner segments table from outer rendition table segmentsList := renditionList.GetSegmentList(transcodedSegment.Name) if segmentsList != nil { - // add new entry for segment # and corresponding byte stream if the profile + // add new entry for segment # if the profile // exists in the renditionList which contains only profiles for which mp4s will // be generated i.e. all profiles for mp4 inputs and only highest quality // rendition for hls inputs like recordings. - segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData) - - // send this transcoded segment to the segment channel so that it can be written - // to disk in parallel - segmentChannel <- video.TranscodedSegmentInfo{ - RequestID: transcodeRequest.RequestID, - RenditionName: transcodedSegment.Name, // Use actual rendition name - SegmentIndex: segment.Index, // Use actual segment index - } + segmentsList.AddSegment(segment.Index) + segFilewriter.writeSegment(transcodeRequest.RequestID, transcodedSegment.Name, segment.Index, transcodedSegment.MediaData) } } diff --git a/video/media.go b/video/media.go index 7f959e81..874e96e1 100644 --- a/video/media.go +++ b/video/media.go @@ -1,11 +1,7 @@ package video import ( - "fmt" - "os" - "path/filepath" "sort" - "strconv" "sync" ) @@ -33,36 +29,19 @@ import ( */ type TSegmentList struct { - mu sync.Mutex - SegmentDataTable map[int][]byte + mu sync.Mutex + SegmentList []int } -func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) { - s.mu.Lock() - s.SegmentDataTable[segIdx] = data - s.mu.Unlock() -} - -func (s *TSegmentList) RemoveSegmentData(segIdx int) { - s.mu.Lock() - s.SegmentDataTable[segIdx] = []byte{} - s.mu.Unlock() -} - -func (s *TSegmentList) GetSegment(segIdx int) []byte { +func (s *TSegmentList) AddSegment(segIdx int) { s.mu.Lock() defer s.mu.Unlock() - return s.SegmentDataTable[segIdx] + s.SegmentList = append(s.SegmentList, segIdx) } func (s *TSegmentList) GetSortedSegments() []int { - segmentsTable := s.SegmentDataTable - segments := make([]int, 0, len(segmentsTable)) - for k := range segmentsTable { - segments = append(segments, k) - } - sort.Ints(segments) - return segments + sort.Ints(s.SegmentList) + return s.SegmentList } type TRenditionList struct { @@ -72,8 +51,8 @@ type TRenditionList struct { func (r *TRenditionList) AddRenditionSegment(rendName string, sList *TSegmentList) { r.mu.Lock() + defer r.mu.Unlock() r.RenditionSegmentTable[rendName] = sList - r.mu.Unlock() } func (r *TRenditionList) GetSegmentList(rendName string) *TSegmentList { @@ -92,32 +71,3 @@ type RenditionStats struct { ManifestLocation string BitsPerSecond uint32 } - -type TranscodedSegmentInfo struct { - RequestID string - RenditionName string - SegmentIndex int -} - -func WriteSegmentsToDisk(transmuxTopLevelDir string, renditionList *TRenditionList, segmentBatch []TranscodedSegmentInfo) error { - for _, segInfo := range segmentBatch { - - // All accesses to renditionList and segmentList is protected by a mutex behind the scenes - segmentList := renditionList.GetSegmentList(segInfo.RenditionName) - segmentData := segmentList.GetSegment(segInfo.SegmentIndex) - segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts") - segmentFile, err := os.Create(segmentFilename) - if err != nil { - return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err) - } - defer segmentFile.Close() - _, err = segmentFile.Write(segmentData) - if err != nil { - return fmt.Errorf("error writing segment err: %w", err) - } - // "Delete" buffered segment data from memory in hopes the garbage-collector releases it - segmentList.RemoveSegmentData(segInfo.SegmentIndex) - - } - return nil -} diff --git a/video/transmux_test.go b/video/transmux_test.go index 2803e86d..f8516369 100644 --- a/video/transmux_test.go +++ b/video/transmux_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/require" "os" "path/filepath" + "strconv" "strings" "testing" ) @@ -42,41 +43,17 @@ const weirdManifest = `#EXTM3U func TestItConcatsStreams(t *testing.T) { // setup pre-reqs for testing stream concatenation - tr := populateRenditionSegmentList() - segmentList := tr.GetSegmentList(rendition) concatDir, err := os.MkdirTemp(os.TempDir(), "concat_stage_") require.NoError(t, err) concatTsFileName := filepath.Join(concatDir, request+"_"+rendition+".ts") - // setup a fake struct to simulate what will be sent in the channel - sb := []TranscodedSegmentInfo{ - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 0, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 1, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 2, - }, - } + tr := populateRenditionSegmentList(t, concatDir) + segmentList := tr.GetSegmentList(rendition) // setup a fake playlist sourceManifest, _, err := m3u8.DecodeFrom(strings.NewReader(normalManifest), true) require.NoError(t, err) pl := *sourceManifest.(*m3u8.MediaPlaylist) - // write segments to disk to test stream-based concatenation - err = WriteSegmentsToDisk(concatDir, tr, sb) require.NoError(t, err) - // verify segments are not held in memory anymore - for _, v := range segmentList.SegmentDataTable { - require.Equal(t, int(0), len(v)) - } // verify stream-based concatenation totalBytesW, err := ConcatTS(concatTsFileName, segmentList, pl, true) @@ -87,41 +64,17 @@ func TestItConcatsStreams(t *testing.T) { func TestItConcatsFiles(t *testing.T) { // setup pre-reqs for testing stream concatenation - tr := populateRenditionSegmentList() - segmentList := tr.GetSegmentList(rendition) concatDir, err := os.MkdirTemp(os.TempDir(), "concat_stage_") require.NoError(t, err) concatTsFileName := filepath.Join(concatDir, request+"_"+rendition+".ts") - // setup a fake struct to simulate what will be sent in the channel - sb := []TranscodedSegmentInfo{ - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 0, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 1, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 2, - }, - } + tr := populateRenditionSegmentList(t, concatDir) + segmentList := tr.GetSegmentList(rendition) // setup a fake playlist sourceManifest, _, err := m3u8.DecodeFrom(strings.NewReader(normalManifest), true) require.NoError(t, err) pl := *sourceManifest.(*m3u8.MediaPlaylist) - // write segments to disk to test stream-based concatenation - err = WriteSegmentsToDisk(concatDir, tr, sb) require.NoError(t, err) - // verify segments are not held in memory anymore - for _, v := range segmentList.SegmentDataTable { - require.Equal(t, int(0), len(v)) - } // verify file-based concatenation totalBytesWritten, err := ConcatTS(concatTsFileName, segmentList, pl, false) require.NoError(t, err) @@ -131,35 +84,15 @@ func TestItConcatsFiles(t *testing.T) { func TestItConcatsFilesOnlyUptoMP4DurationLimit(t *testing.T) { // setup pre-reqs for testing stream concatenation - tr := populateRenditionSegmentList() - segmentList := tr.GetSegmentList(rendition) concatDir, err := os.MkdirTemp(os.TempDir(), "concat_stage_") require.NoError(t, err) concatTsFileName := filepath.Join(concatDir, request+"_"+rendition+".ts") - // setup a fake struct to simulate what will be sent in the channel - sb := []TranscodedSegmentInfo{ - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 0, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 1, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 2, - }, - } + tr := populateRenditionSegmentList(t, concatDir) + segmentList := tr.GetSegmentList(rendition) // setup a fake playlist sourceManifest, _, err := m3u8.DecodeFrom(strings.NewReader(weirdManifest), true) require.NoError(t, err) pl := *sourceManifest.(*m3u8.MediaPlaylist) - // write segments to disk to test stream-based concatenation - err = WriteSegmentsToDisk(concatDir, tr, sb) require.NoError(t, err) // verify file-based concatenation totalBytesW, err := ConcatTS(concatTsFileName, segmentList, pl, false) @@ -173,35 +106,15 @@ func TestItConcatsFilesOnlyUptoMP4DurationLimit(t *testing.T) { func TestItConcatsStreamsOnlyUptoMP4DurationLimit(t *testing.T) { // setup pre-reqs for testing stream concatenation - tr := populateRenditionSegmentList() - segmentList := tr.GetSegmentList(rendition) concatDir, err := os.MkdirTemp(os.TempDir(), "concat_stage_") require.NoError(t, err) concatTsFileName := filepath.Join(concatDir, request+"_"+rendition+".ts") - // setup a fake struct to simulate what will be sent in the channel - sb := []TranscodedSegmentInfo{ - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 0, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 1, - }, - { - RequestID: request, - RenditionName: rendition, - SegmentIndex: 2, - }, - } + tr := populateRenditionSegmentList(t, concatDir) + segmentList := tr.GetSegmentList(rendition) // setup a fake playlist sourceManifest, _, err := m3u8.DecodeFrom(strings.NewReader(weirdManifest), true) require.NoError(t, err) pl := *sourceManifest.(*m3u8.MediaPlaylist) - // write segments to disk to test stream-based concatenation - err = WriteSegmentsToDisk(concatDir, tr, sb) require.NoError(t, err) // verify stream-based concatenation totalBytesW, err := ConcatTS(concatTsFileName, segmentList, pl, true) @@ -213,19 +126,24 @@ func TestItConcatsStreamsOnlyUptoMP4DurationLimit(t *testing.T) { require.Equal(t, int64(406268), totalBytesW) } -func populateRenditionSegmentList() *TRenditionList { +func populateRenditionSegmentList(t *testing.T, concatDir string) *TRenditionList { segmentFiles := []string{"../test/fixtures/seg-0.ts", "../test/fixtures/seg-1.ts", "../test/fixtures/seg-2.ts"} renditionList := &TRenditionList{ RenditionSegmentTable: make(map[string]*TSegmentList), } - segmentList := &TSegmentList{ - SegmentDataTable: make(map[int][]byte), - } + segmentList := &TSegmentList{} for i, filePath := range segmentFiles { data := readSegmentData(filePath) - segmentList.AddSegmentData(i, data) + segmentList.AddSegment(i) + + segmentFilename := filepath.Join(concatDir, request+"_"+rendition+"_"+strconv.Itoa(i)+".ts") + segmentFile, err := os.Create(segmentFilename) + require.NoError(t, err) + defer segmentFile.Close() + _, err = segmentFile.Write(data) + require.NoError(t, err) } renditionList.AddRenditionSegment(rendition, segmentList)