Skip to content

Commit

Permalink
Send Studio callbacks during + after transcoding process
Browse files Browse the repository at this point in the history
  • Loading branch information
thomshutt committed Oct 27, 2022
1 parent a4b0edf commit 7788923
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
19 changes: 18 additions & 1 deletion handlers/misttriggers/recording_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (d *MistCallbackHandlersCollection) triggerRecordingEndSegmenting(w http.Re
transcodedManifestURL := segmentedUploadURL.ResolveReference(relativeTranscodeURL)

go func() {
err := transcode.RunTranscodeProcess(transcodeRequest.UploadURL, transcodedManifestURL.String(), transcodeRequest.Profiles)
err := transcode.RunTranscodeProcess(transcodeRequest.UploadURL, transcodedManifestURL.String(), transcodeRequest.Profiles, callbackUrl)
if err != nil {
_ = config.Logger.Log(
"msg", "RunTranscodeProcess returned an error",
Expand All @@ -121,6 +121,23 @@ func (d *MistCallbackHandlersCollection) triggerRecordingEndSegmenting(w http.Re
"source", transcodeRequest.SourceFile,
"target", transcodeRequest.UploadURL,
)

if err := clients.DefaultCallbackClient.SendTranscodeStatusError(callbackUrl, "Transcoding Failed: "+err.Error()); err != nil {
_ = config.Logger.Log("msg", "Failed to send Error callback", "err", err.Error(), "stream_name", p.StreamName)
}
} else {
// TODO: Fill in with real values once we have them back from the transcoder
err = clients.DefaultCallbackClient.SendTranscodeStatusCompleted(
callbackUrl,
clients.InputVideo{
Format: "unknown",
},
[]clients.OutputVideo{},
)

if err != nil {
_ = config.Logger.Log("msg", "Failed to send Completed callback", "err", err.Error(), "stream_name", p.StreamName)
}
}
}()
}
Expand Down
3 changes: 2 additions & 1 deletion handlers/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle {
}

// TODO: Do this asynchronously
if err := transcode.RunTranscodeProcess(transcodeRequest.SourceFile, transcodeRequest.UploadURL, transcodeRequest.Profiles); err != nil {
err = transcode.RunTranscodeProcess(transcodeRequest.SourceFile, transcodeRequest.UploadURL, transcodeRequest.Profiles, transcodeRequest.CallbackURL)
if err != nil {
errors.WriteHTTPInternalServerError(w, "Error running Transcode process", err)
}

Expand Down
13 changes: 11 additions & 2 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var defaultTranscodeProfiles = []cache.EncodedProfile{
},
}

func RunTranscodeProcess(sourceManifestOSURL, targetManifestOSURL string, transcodeProfiles []cache.EncodedProfile) error {
func RunTranscodeProcess(sourceManifestOSURL, targetManifestOSURL string, transcodeProfiles []cache.EncodedProfile, callbackURL string) error {
_ = config.Logger.Log("msg", "RunTranscodeProcess (v2) Beginning", "source", sourceManifestOSURL, "target", targetManifestOSURL)

// If Profiles haven't been overridden, use the default set
Expand All @@ -49,7 +49,7 @@ func RunTranscodeProcess(sourceManifestOSURL, targetManifestOSURL string, transc
}

// Iterate through the segment URLs and transcode them
for _, u := range sourceSegmentURLs {
for i, u := range sourceSegmentURLs {
rc, err := clients.DownloadOSURL(u)
if err != nil {
return fmt.Errorf("failed to download source segment %q: %s", u, err)
Expand All @@ -64,6 +64,11 @@ func RunTranscodeProcess(sourceManifestOSURL, targetManifestOSURL string, transc
return fmt.Errorf("failed to read source segment data %q: %s", u, err)
}
_ = config.Logger.Log("msg", "downloaded source segment", "url", u, "size_bytes", nRead, "error", err)

var completedRatio = calculateCompletedRatio(len(sourceSegmentURLs), i+1)
if err = clients.DefaultCallbackClient.SendTranscodeStatus(callbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil {
_ = config.Logger.Log("msg", "failed to send transcode status callback", "url", callbackURL, "error", err)
}
}

// Build the manifests and push them to storage
Expand All @@ -74,3 +79,7 @@ func RunTranscodeProcess(sourceManifestOSURL, targetManifestOSURL string, transc

return nil
}

func calculateCompletedRatio(totalSegments, completedSegments int) float64 {
return (1 / float64(totalSegments)) * float64(completedSegments)
}
32 changes: 32 additions & 0 deletions transcode/transcode_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package transcode

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -45,6 +49,20 @@ func TestItCanTranscode(t *testing.T) {
outputDir := os.TempDir()
outputMasterManifest := filepath.Join(outputDir, "output-master.m3u8")

// Set up a server to receive callbacks and store them in an array for future verification
var callbacks []map[string]interface{}
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check that we got the callback we're expecting
body, err := io.ReadAll(r.Body)
require.NoError(t, err)

var callback map[string]interface{}
err = json.Unmarshal(body, &callback)
require.NoError(t, err)
callbacks = append(callbacks, callback)
}))
defer callbackServer.Close()

// Check we don't get an error downloading or parsing it
err = RunTranscodeProcess(
manifestFile.Name(),
Expand All @@ -63,6 +81,7 @@ func TestItCanTranscode(t *testing.T) {
Height: 720,
},
},
callbackServer.URL,
)
require.NoError(t, err)

Expand All @@ -72,4 +91,17 @@ func TestItCanTranscode(t *testing.T) {
require.Greater(t, len(masterManifestBytes), 0)
require.Contains(t, string(masterManifestBytes), "#EXTM3U")
require.Contains(t, string(masterManifestBytes), "#EXT-X-STREAM-INF")

// Check we received a progress callback for each segment
require.Equal(t, 2, len(callbacks))
require.Equal(t, 0.7, callbacks[0]["completion_ratio"])
require.Equal(t, 1.0, callbacks[1]["completion_ratio"])
}

func TestItCalculatesTheTranscodeCompletionPercentageCorrectly(t *testing.T) {
require.Equal(t, 0.5, calculateCompletedRatio(2, 1))
require.Equal(t, 0.5, calculateCompletedRatio(4, 2))
require.Equal(t, 0.1, calculateCompletedRatio(10, 1))
require.Equal(t, 0.01, calculateCompletedRatio(100, 1))
require.Equal(t, 0.6, calculateCompletedRatio(100, 60))
}

0 comments on commit 7788923

Please sign in to comment.