From 4ce03fbc3a57682234abc4f14623fdfffdd7e7a1 Mon Sep 17 00:00:00 2001 From: Thom Shutt Date: Mon, 17 Oct 2022 14:59:19 +0100 Subject: [PATCH 1/2] Add callback heartbeats for in-flight transcodes --- cache/transcoding.go | 31 ++++++++++++++++++++ cache/transcoding_test.go | 62 +++++++++++++++++++++++++++++++++++++++ main.go | 9 +++++- 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/cache/transcoding.go b/cache/transcoding.go index f1b7d4805..46930e74b 100644 --- a/cache/transcoding.go +++ b/cache/transcoding.go @@ -2,8 +2,10 @@ package cache import ( "sync" + "time" "github.com/livepeer/catalyst-api/clients" + "github.com/livepeer/catalyst-api/config" ) type TranscodingCache struct { @@ -34,6 +36,29 @@ type SegmentInfo struct { Outputs []clients.OutputVideo // Information about the final transcoded outputs we've created } +// Send "keepalive" callbacks to ensure the caller (Studio) knows transcoding is still ongoing and hasn't failed +func (t *TranscodingCache) SendTranscodingHeartbeats(interval time.Duration, quit chan bool) { + for { + // Stop the infinite loop if we receive a quit message + select { + case <-quit: + return + default: + } + + jobs := t.GetAll() + for id, job := range jobs { + err := clients.DefaultCallbackClient.SendTranscodeStatus(job.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5) + if err == nil { + _ = config.Logger.Log("msg", "Sent Transcode Status heartbeat", "id", id, "callback_url", job.CallbackUrl) + } else { + _ = config.Logger.Log("msg", "failed to send Transcode Status heartbeat", "id", id, "callback_url", job.CallbackUrl, "error", err) + } + } + time.Sleep(interval) + } +} + func (si SegmentInfo) ContainsDestination(destination string) bool { for _, existing := range si.Destinations { if existing == destination { @@ -93,6 +118,12 @@ func (c *TranscodingCache) Get(streamName string) *SegmentInfo { return nil } +func (c *TranscodingCache) GetAll() map[string]*SegmentInfo { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.pushes +} + func (c *TranscodingCache) Store(streamName string, info SegmentInfo) { c.mutex.Lock() c.pushes[streamName] = &info diff --git a/cache/transcoding_test.go b/cache/transcoding_test.go index 28d9a5ae2..eddbd6c73 100644 --- a/cache/transcoding_test.go +++ b/cache/transcoding_test.go @@ -1,8 +1,15 @@ package cache import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" "testing" + "time" + "github.com/livepeer/catalyst-api/clients" "github.com/stretchr/testify/require" ) @@ -42,3 +49,58 @@ func TestStoreAndRemoveTranscoding(t *testing.T) { c.Transcoding.Remove("some-stream-name") require.Nil(t, c.Transcoding.Get("some-stream-name")) } + +func TestHeartbeatsAreFiredWithInterval(t *testing.T) { + // Create a stub server to receive the callbacks and a variable to track how many we get + var requests = map[string]int{} + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check the message is a valid TranscodeStatusMessage + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + var tsm clients.TranscodeStatusMessage + require.NoError(t, json.Unmarshal(body, &tsm)) + + // Increment our counter for the stream ID, which comes on the final part of our URL + parts := strings.Split(r.URL.Path, "/") + require.NotZero(t, len(parts), 0, "Expected "+r.URL.Path+" to have some slashes in") + id := parts[len(parts)-1] + requests[id] += 1 + })) + defer ts.Close() + + // Add 2 jobs into the stream cache with different names + c := NewStreamCache() + c.Transcoding.Store("some-stream-name", SegmentInfo{ + CallbackUrl: ts.URL + "/some-stream-name", + Source: "s3://source", + UploadDir: "upload-dir", + Destinations: []string{ + "s3://destination-1", + "s3://destination-2", + }, + }) + c.Transcoding.Store("some-stream-name-2", SegmentInfo{ + CallbackUrl: ts.URL + "/some-stream-name-2", + Source: "s3://source", + UploadDir: "upload-dir", + Destinations: []string{ + "s3://destination-1", + "s3://destination-2", + }, + }) + + // Start the callback loop + heartbeatStop := make(chan bool) + go c.Transcoding.SendTranscodingHeartbeats(200*time.Millisecond, heartbeatStop) + defer func() { heartbeatStop <- true }() + + // Wait for a few iterations + time.Sleep(time.Second) + + // Check that we got roughly the amount of callbacks we'd expect + require.GreaterOrEqual(t, requests["some-stream-name"], 3) + require.LessOrEqual(t, requests["some-stream-name"], 10) + + require.GreaterOrEqual(t, requests["some-stream-name-2"], 3) + require.LessOrEqual(t, requests["some-stream-name-2"], 10) +} diff --git a/main.go b/main.go index 47fae38b4..017e8be8a 100644 --- a/main.go +++ b/main.go @@ -3,8 +3,10 @@ package main import ( "flag" "log" + "time" "github.com/livepeer/catalyst-api/api" + "github.com/livepeer/catalyst-api/cache" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/livepeer-data/pkg/mistconnector" ) @@ -14,8 +16,8 @@ func main() { mistPort := flag.Int("mist-port", 4242, "Port to listen on") mistHttpPort := flag.Int("mist-http-port", 8080, "Port to listen on") apiToken := flag.String("api-token", "IAmAuthorized", "Auth header value for API access") - flag.StringVar(&config.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events") mistJson := flag.Bool("j", false, "Print application info as JSON. Used by Mist to present flags in its UI.") + flag.StringVar(&config.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events") flag.Parse() if *mistJson { @@ -23,6 +25,11 @@ func main() { return } + // Send "keepalive" heartbeats while transcodes are ongoing + heartbeatStop := make(chan bool) + go cache.DefaultStreamCache.Transcoding.SendTranscodingHeartbeats(15*time.Second, heartbeatStop) + defer func() { heartbeatStop <- true }() + if err := api.ListenAndServe(*port, *mistPort, *mistHttpPort, *apiToken); err != nil { log.Fatal(err) } From 7f4d520856bc6ae6bd9aa0f9d50b5f37e3aeaad5 Mon Sep 17 00:00:00 2001 From: Thom Shutt Date: Mon, 17 Oct 2022 15:08:39 +0100 Subject: [PATCH 2/2] Make test map access thread safe --- cache/transcoding_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cache/transcoding_test.go b/cache/transcoding_test.go index eddbd6c73..b00e8c63c 100644 --- a/cache/transcoding_test.go +++ b/cache/transcoding_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "time" @@ -53,6 +54,8 @@ func TestStoreAndRemoveTranscoding(t *testing.T) { func TestHeartbeatsAreFiredWithInterval(t *testing.T) { // Create a stub server to receive the callbacks and a variable to track how many we get var requests = map[string]int{} + var requestsMutex = &sync.RWMutex{} + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Check the message is a valid TranscodeStatusMessage body, err := io.ReadAll(r.Body) @@ -64,7 +67,10 @@ func TestHeartbeatsAreFiredWithInterval(t *testing.T) { parts := strings.Split(r.URL.Path, "/") require.NotZero(t, len(parts), 0, "Expected "+r.URL.Path+" to have some slashes in") id := parts[len(parts)-1] + + requestsMutex.Lock() requests[id] += 1 + requestsMutex.Unlock() })) defer ts.Close() @@ -98,9 +104,10 @@ func TestHeartbeatsAreFiredWithInterval(t *testing.T) { time.Sleep(time.Second) // Check that we got roughly the amount of callbacks we'd expect + requestsMutex.RLock() + defer requestsMutex.RUnlock() require.GreaterOrEqual(t, requests["some-stream-name"], 3) require.LessOrEqual(t, requests["some-stream-name"], 10) - require.GreaterOrEqual(t, requests["some-stream-name-2"], 3) require.LessOrEqual(t, requests["some-stream-name-2"], 10) }