Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recording start callback #49

Merged
merged 7 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions clients/callback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"

Expand Down Expand Up @@ -48,6 +49,22 @@ func (c CallbackClient) DoWithRetries(r *http.Request) error {
return nil
}

func (c CallbackClient) SendRecordingEvent(event *RecordingEvent) {
eventJson, err := json.Marshal(event)
if err != nil {
log.Printf("SendRecordingStarted json marshal %v", err)
return
}
req, err := http.NewRequest(http.MethodPost, config.RecordingCallback, bytes.NewReader(eventJson))
AlexKordic marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Printf("SendRecordingStarted http.NewRequest %v", err)
return
}
if err := c.DoWithRetries(req); err != nil {
log.Printf("SendRecordingStarted callback %v", err)
}
}

// Sends a Transcode Status message to the Client (initially just Studio)
// The status strings will be useful for debugging where in the workflow we got to, but everything
// in Studio will be driven off the overall "Completion Ratio".
Expand Down Expand Up @@ -181,6 +198,15 @@ func (ts TranscodeStatus) String() string {

// The various status messages we can send

type RecordingEvent struct {
Event string `json:"event"`
StreamName string `json:"stream_name"`
RecordingId string `json:"recording_id"`
Hostname string `json:"host_name"`
Timestamp int64 `json:"timestamp"`
Success *bool `json:"success,omitempty"`
}

type TranscodeStatusMessage struct {
CompletionRatio float64 `json:"completion_ratio"` // No omitempty or we lose this for 0% completion case
Error string `json:"error,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ var Logger log.Logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
func init() {
Logger = log.With(Logger, "ts", log.DefaultTimestampUTC)
}

var RecordingCallback string = "http://127.0.0.1:8008/recording/status"
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/livepeer/catalyst-api
go 1.19

require (
github.com/google/uuid v1.3.0
github.com/julienschmidt/httprouter v1.3.0
github.com/livepeer/go-tools v0.0.0-20220926110222-2ebcbb5685b4
github.com/livepeer/livepeer-data v0.4.20
Expand All @@ -21,7 +22,6 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
59 changes: 59 additions & 0 deletions handlers/misttriggers/push_out_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package misttriggers

import (
"fmt"
"log"
"net/http"
"net/url"
"strings"
"time"

"github.com/google/uuid"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/errors"
)

// TriggerPushOutStart responds to PUSH_OUT_START trigger
// This trigger is run right before an outgoing push is started. This trigger is stream-specific and must be blocking.
// The payload for this trigger is multiple lines, each separated by a single newline character (without an ending newline), containing data:
//
// stream name (string)
// push target URI (string)
func (d *MistCallbackHandlersCollection) TriggerPushOutStart(w http.ResponseWriter, req *http.Request, payload []byte) {
lines := strings.Split(strings.TrimSuffix(string(payload), "\n"), "\n")
if len(lines) != 2 {
errors.WriteHTTPBadRequest(w, "Bad request payload", fmt.Errorf("unknown payload '%s'", string(payload)))
return
}
streamName := lines[0]
destination := lines[1]
var destinationToReturn string
switch streamNameToPipeline(streamName) {
case Recording:
destinationToReturn = d.RecordingPushOutStart(w, req, streamName, destination)
default:
destinationToReturn = destination
}
if _, err := w.Write([]byte(destinationToReturn)); err != nil {
log.Printf("TriggerPushOutStart failed to send rewritten url: %v", err)
}
}

func (d *MistCallbackHandlersCollection) RecordingPushOutStart(w http.ResponseWriter, req *http.Request, streamName, destination string) string {
event := &clients.RecordingEvent{
Event: "start",
Timestamp: time.Now().UnixMilli(),
StreamName: streamName,
RecordingId: uuid.New().String(),
Hostname: req.Host,
}
pushUrl, err := url.Parse(destination)
if err != nil {
log.Printf("RecordingPushOutStart url.Parse %v", err)
return destination
}
// Add uuid after stream name
pushUrl.Path = strings.Replace(pushUrl.Path, "$stream", "$stream/"+event.RecordingId, 1)
go clients.DefaultCallbackClient.SendRecordingEvent(event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make these hooks blocking instead. As in if they fail we should not let the stream go through either. Only opening for consideration tho, not sure what i prefer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good practice is to return as soon as possible so mistserver can continue without additional latency increase.

Lets consider a watchdog process that kills bad streams separately from our happy-path.

return pushUrl.String()
}
3 changes: 3 additions & 0 deletions handlers/misttriggers/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

const TRIGGER_PUSH_END = "PUSH_END"
const TRIGGER_PUSH_OUT_START = "PUSH_OUT_START"
const TRIGGER_LIVE_TRACK_LIST = "LIVE_TRACK_LIST"

type MistCallbackHandlersCollection struct {
Expand All @@ -34,6 +35,8 @@ func (d *MistCallbackHandlersCollection) Trigger() httprouter.Handle {

triggerName := req.Header.Get("X-Trigger")
switch triggerName {
case TRIGGER_PUSH_OUT_START:
d.TriggerPushOutStart(w, req, payload)
case TRIGGER_PUSH_END:
d.TriggerPushEnd(w, req, payload)
case TRIGGER_LIVE_TRACK_LIST:
Expand Down
50 changes: 50 additions & 0 deletions handlers/misttriggers/triggers_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package misttriggers

import (
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/stretchr/testify/require"
)

Expand All @@ -18,6 +27,47 @@ func TestPipelineId(t *testing.T) {
}
}

func TestRecordingStart(t *testing.T) {
testStartTime := time.Now().UnixMilli()
mistCallbackHandlers := &MistCallbackHandlersCollection{MistClient: clients.StubMistClient{}}
callbackHappened := make(chan bool, 10)
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
payload, err := io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(200)
message := clients.RecordingEvent{}
err = json.Unmarshal(payload, &message)
require.NoError(t, err)
require.Equal(t, "videoSomeStreamName", message.StreamName)
require.Equal(t, "start", message.Event)
require.GreaterOrEqual(t, message.Timestamp, testStartTime)
require.Less(t, message.Timestamp, testStartTime+2)
require.NotEmpty(t, message.RecordingId)
callbackHappened <- true
}))
defer callbackServer.Close()
config.RecordingCallback = callbackServer.URL

router := httprouter.New()
router.POST("/api/mist/trigger", mistCallbackHandlers.Trigger())
pushOutTriggerPayload := "videoSomeStreamName\ns3+https://creds:passwd@s3.storage.com/region/livepeer-recordings-bucket/$stream/index.m3u8"
req, _ := http.NewRequest("POST", "/api/mist/trigger", bytes.NewBuffer([]byte(pushOutTriggerPayload)))
req.Header.Set("X-Trigger", "PUSH_OUT_START")
req.Header.Set("Host", "test.livepeer.monster")
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
require.Equal(t, 200, rr.Result().StatusCode)
result := rr.Body.String()
require.Equal(t, "s3+https://creds:passwd@s3.storage.com/region/livepeer-recordings-bucket/$stream/", result[:81])
require.Greater(t, len(result), 92)
require.Equal(t, "/index.m3u8", result[len(result)-11:])
select {
case <-callbackHappened:
case <-time.After(1 * time.Second):
require.FailNow(t, "no callback happened")
}
}

type StreamSample struct {
streamName string
expected PipelineId
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
func main() {
port := flag.Int("port", 4949, "Port to listen on")
mistPort := flag.Int("mist-port", 4242, "Port to listen on")
flag.StringVar(&config.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events")
mistJson := flag.Bool("j", false, "Print application info as JSON. Used by Mist to present flags in its UI.")
flag.Parse()

Expand Down