Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback heartbeats for in-flight transcodes #65

Merged
merged 2 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions cache/transcoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package cache

import (
"sync"
"time"

"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
)

type TranscodingCache struct {
Expand Down Expand Up @@ -34,6 +36,29 @@ type SegmentInfo struct {
Outputs []clients.OutputVideo // Information about the final transcoded outputs we've created
}

// Send "keepalive" callbacks to ensure the caller (Studio) knows transcoding is still ongoing and hasn't failed
func (t *TranscodingCache) SendTranscodingHeartbeats(interval time.Duration, quit chan bool) {
for {
// Stop the infinite loop if we receive a quit message
select {
case <-quit:
return
default:
}

jobs := t.GetAll()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: seems possible to have a race between this thread and transcode thread. We should expect on studio side to receive a heartbeat callback after transcode-complete callback. We should notify studio team about this quirk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I think Studio already has logic where they'll throw away messages if "Completion %" is less than one they've already received, which is quite a nice way of getting some thread safety around this stuff without them implementing a full on state machine

for id, job := range jobs {
err := clients.DefaultCallbackClient.SendTranscodeStatus(job.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5)
if err == nil {
_ = config.Logger.Log("msg", "Sent Transcode Status heartbeat", "id", id, "callback_url", job.CallbackUrl)
} else {
_ = config.Logger.Log("msg", "failed to send Transcode Status heartbeat", "id", id, "callback_url", job.CallbackUrl, "error", err)
}
}
time.Sleep(interval)
}
}

func (si SegmentInfo) ContainsDestination(destination string) bool {
for _, existing := range si.Destinations {
if existing == destination {
Expand Down Expand Up @@ -93,6 +118,12 @@ func (c *TranscodingCache) Get(streamName string) *SegmentInfo {
return nil
}

func (c *TranscodingCache) GetAll() map[string]*SegmentInfo {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.pushes
}

func (c *TranscodingCache) Store(streamName string, info SegmentInfo) {
c.mutex.Lock()
c.pushes[streamName] = &info
Expand Down
69 changes: 69 additions & 0 deletions cache/transcoding_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package cache

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/livepeer/catalyst-api/clients"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -42,3 +50,64 @@ func TestStoreAndRemoveTranscoding(t *testing.T) {
c.Transcoding.Remove("some-stream-name")
require.Nil(t, c.Transcoding.Get("some-stream-name"))
}

func TestHeartbeatsAreFiredWithInterval(t *testing.T) {
// Create a stub server to receive the callbacks and a variable to track how many we get
var requests = map[string]int{}
var requestsMutex = &sync.RWMutex{}

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check the message is a valid TranscodeStatusMessage
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
var tsm clients.TranscodeStatusMessage
require.NoError(t, json.Unmarshal(body, &tsm))

// Increment our counter for the stream ID, which comes on the final part of our URL
parts := strings.Split(r.URL.Path, "/")
require.NotZero(t, len(parts), 0, "Expected "+r.URL.Path+" to have some slashes in")
id := parts[len(parts)-1]

requestsMutex.Lock()
requests[id] += 1
requestsMutex.Unlock()
}))
defer ts.Close()

// Add 2 jobs into the stream cache with different names
c := NewStreamCache()
c.Transcoding.Store("some-stream-name", SegmentInfo{
CallbackUrl: ts.URL + "/some-stream-name",
Source: "s3://source",
UploadDir: "upload-dir",
Destinations: []string{
"s3://destination-1",
"s3://destination-2",
},
})
c.Transcoding.Store("some-stream-name-2", SegmentInfo{
CallbackUrl: ts.URL + "/some-stream-name-2",
Source: "s3://source",
UploadDir: "upload-dir",
Destinations: []string{
"s3://destination-1",
"s3://destination-2",
},
})

// Start the callback loop
heartbeatStop := make(chan bool)
go c.Transcoding.SendTranscodingHeartbeats(200*time.Millisecond, heartbeatStop)
defer func() { heartbeatStop <- true }()

// Wait for a few iterations
time.Sleep(time.Second)

// Check that we got roughly the amount of callbacks we'd expect
requestsMutex.RLock()
defer requestsMutex.RUnlock()
require.GreaterOrEqual(t, requests["some-stream-name"], 3)
require.LessOrEqual(t, requests["some-stream-name"], 10)
require.GreaterOrEqual(t, requests["some-stream-name-2"], 3)
require.LessOrEqual(t, requests["some-stream-name-2"], 10)
}
9 changes: 8 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"flag"
"log"
"time"

"github.com/livepeer/catalyst-api/api"
"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/livepeer-data/pkg/mistconnector"
)
Expand All @@ -14,15 +16,20 @@ func main() {
mistPort := flag.Int("mist-port", 4242, "Port to listen on")
mistHttpPort := flag.Int("mist-http-port", 8080, "Port to listen on")
apiToken := flag.String("api-token", "IAmAuthorized", "Auth header value for API access")
flag.StringVar(&config.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events")
mistJson := flag.Bool("j", false, "Print application info as JSON. Used by Mist to present flags in its UI.")
flag.StringVar(&config.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events")
flag.Parse()

if *mistJson {
mistconnector.PrintMistConfigJson("catalyst-api", "HTTP API server for translating Catalyst API requests into Mist calls", "Catalyst API", config.Version, flag.CommandLine)
return
}

// Send "keepalive" heartbeats while transcodes are ongoing
heartbeatStop := make(chan bool)
go cache.DefaultStreamCache.Transcoding.SendTranscodingHeartbeats(15*time.Second, heartbeatStop)
defer func() { heartbeatStop <- true }()

if err := api.ListenAndServe(*port, *mistPort, *mistHttpPort, *apiToken); err != nil {
log.Fatal(err)
}
Expand Down