Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify studio when recording completes #50

Merged
merged 13 commits into from
Sep 30, 2022
Merged
34 changes: 34 additions & 0 deletions handlers/misttriggers/push_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package misttriggers

import (
"fmt"
"log"
"net/http"
"net/url"
"strings"
"time"

"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
Expand Down Expand Up @@ -40,6 +43,8 @@ func (d *MistCallbackHandlersCollection) TriggerPushEnd(w http.ResponseWriter, r
d.TranscodingPushEnd(w, req, streamName, destination, actualDestination, pushStatus)
case Segmenting:
d.SegmentingPushEnd(w, req, streamName)
case Recording:
d.RecordingPushEnd(w, req, streamName, actualDestination, pushStatus)
default:
// Not related to API logic
}
Expand Down Expand Up @@ -82,6 +87,23 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite
}
}

func (d *MistCallbackHandlersCollection) RecordingPushEnd(w http.ResponseWriter, req *http.Request, streamName, actualDestination, pushStatus string) {
var err error
pushSuccess := pushStatus == "null"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behavior from Studio if this is false? Should it show an error to the user somehow or can it still recover the recording somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail in our first version. Improve by recovering in later tasks. Same applies for transcoding stages where some of them can fail independently. To be optimal we can just re-run failed stages. For now we re-run entire process.

event := &clients.RecordingEvent{
Event: "end",
Timestamp: time.Now().UnixMilli(),
StreamName: streamName,
Hostname: req.Host,
Success: &pushSuccess,
}
if event.RecordingId, err = uuidFromPushUrl(actualDestination); err != nil {
log.Printf("RecordingPushEnd extract uuid failed %v", err)
return
}
go clients.DefaultCallbackClient.SendRecordingEvent(event)
}

func (d *MistCallbackHandlersCollection) SegmentingPushEnd(w http.ResponseWriter, req *http.Request, streamName string) {
// when uploading is done, remove trigger and stream from Mist
defer cache.DefaultStreamCache.Segmenting.Remove(streamName)
Expand Down Expand Up @@ -109,3 +131,15 @@ func (d *MistCallbackHandlersCollection) SegmentingPushEnd(w http.ResponseWriter
// TODO: Start Transcoding (stubbed for now with below method)
stubTranscodingCallbacksForStudio(callbackUrl)
}

func uuidFromPushUrl(uri string) (string, error) {
pushUrl, err := url.Parse(uri)
if err != nil {
return "", err
}
path := strings.Split(pushUrl.EscapedPath(), "/")
if len(path) < 4 {
return "", fmt.Errorf("push url path malformed: element count %d %s", len(path), pushUrl.EscapedPath())
}
return path[len(path)-2], nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a regex here instead? Perhaps validating some other parts of this URL as well. (I particularly don't remember what it is, from this. is it the s3+https:// uri?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We now have plans to expand on URL path prefix with region and bucket name later. I figured its safer to grab path element before filename which should stay in this format in near future.

}
39 changes: 39 additions & 0 deletions handlers/misttriggers/triggers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,45 @@ func TestRecordingStart(t *testing.T) {
}
}

func TestRecordingCompleted(t *testing.T) {
testStartTime := time.Now().UnixMilli()
mistCallbackHandlers := &MistCallbackHandlersCollection{MistClient: clients.StubMistClient{}}
callbackHappened := make(chan bool, 10)
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
payload, err := io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(200)
message := clients.RecordingEvent{}
err = json.Unmarshal(payload, &message)
require.NoError(t, err)
require.Equal(t, "videoSomeStreamName", message.StreamName)
require.Equal(t, "0b152108-0bee-4333-8cb7-e859b800c57f", message.RecordingId)
require.Equal(t, "end", message.Event)
require.NotNil(t, message.Success)
require.True(t, *message.Success)
require.GreaterOrEqual(t, message.Timestamp, testStartTime)
require.Less(t, message.Timestamp, testStartTime+2)
callbackHappened <- true
}))
defer callbackServer.Close()
config.RecordingCallback = callbackServer.URL

router := httprouter.New()
router.POST("/api/mist/trigger", mistCallbackHandlers.Trigger())
pushOutTriggerPayload := "123\nvideoSomeStreamName\ns3+https://creds:passwd@s3.storage.com/region/livepeer-recordings-bucket/$stream/0b152108-0bee-4333-8cb7-e859b800c57f/index.m3u8\ns3+https://creds:passwd@s3.storage.com/region/livepeer-recordings-bucket/videoSomeStreamName/0b152108-0bee-4333-8cb7-e859b800c57f/index.m3u8\n[]\nnull"
req, _ := http.NewRequest("POST", "/api/mist/trigger", bytes.NewBuffer([]byte(pushOutTriggerPayload)))
req.Header.Set("X-Trigger", "PUSH_END")
req.Header.Set("Host", "test.livepeer.monster")
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
require.Equal(t, 200, rr.Result().StatusCode)
select {
case <-callbackHappened:
case <-time.After(1 * time.Second):
require.FailNow(t, "no callback happened")
}
}

type StreamSample struct {
streamName string
expected PipelineId
Expand Down