Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run MistInHLS mist binary on push end for each rendition #55

Merged
merged 3 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ var Version string
// Used so that we can generate fixed timestamps in tests
var Clock TimestampGenerator = RealTimestampGenerator{}

// Path to Mist's "Livepeer" process that we shell out to for the transcoding
const PathMistProcLivepeer = "/usr/local/bin/MistProcLivepeer"
// Path to Mist's binaries that we shell out to for transcoding and header file creation
var PathMistDir = "/usr/local/bin"

// Port that the local Broadcaster runs on
const DefaultBroadcasterPort = 8935
Expand Down
31 changes: 30 additions & 1 deletion handlers/misttriggers/push_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"log"
"net/http"
"net/url"
"os/exec"
"path"
"strings"
"time"

"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/subprocess"
)

// TriggerPushEnd responds to PUSH_END trigger
Expand Down Expand Up @@ -63,7 +66,7 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite
return
}

uploadSuccess := pushStatus == "null"
uploadSuccess := pushStatus != "null"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing shows we actually get null on error and json object on success.

if uploadSuccess {
// TODO: Do some maths so that we don't always send 0.5
if err := clients.DefaultCallbackClient.SendTranscodeStatus(info.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5); err != nil {
Expand All @@ -76,6 +79,10 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite
}
}

if err := createDtsh(actualDestination); err != nil {
_ = config.Logger.Log("msg", "createDtsh failed", "err", err, "destination", actualDestination)
}

// We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX
cache.DefaultStreamCache.Transcoding.RemovePushDestination(streamName, destination)
if cache.DefaultStreamCache.Transcoding.AreDestinationsEmpty(streamName) {
Expand Down Expand Up @@ -143,3 +150,25 @@ func uuidFromPushUrl(uri string) (string, error) {
}
return path[len(path)-2], nil
}

func createDtsh(destination string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a unit test for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

url, err := url.Parse(destination)
if err != nil {
return err
}
url.RawQuery = ""
url.Fragment = ""
headerPrepare := exec.Command(path.Join(config.PathMistDir, "MistInHLS"), "-H", url.String())
if err = subprocess.LogOutputs(headerPrepare); err != nil {
return err
}
if err = headerPrepare.Start(); err != nil {
return err
}
go func() {
if err := headerPrepare.Wait(); err != nil {
_ = config.Logger.Log("msg", "createDtsh return code", "code", err, "destination", destination)
}
}()
return nil
}
21 changes: 21 additions & 0 deletions handlers/misttriggers/triggers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"bytes"
"encoding/json"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path"
"testing"
"time"

Expand Down Expand Up @@ -107,6 +111,23 @@ func TestRecordingCompleted(t *testing.T) {
}
}

func TestMistInHLSStart(t *testing.T) {
dir := t.TempDir()
config.PathMistDir = dir
destination := "unused"
err := createDtsh("invalid://user:abc{DEf1=lp@example.com:5432/db?sslmode=require")
require.IsType(t, &url.Error{}, err)
err = createDtsh(destination)
require.IsType(t, &fs.PathError{}, err)

script := path.Join(dir, "MistInHLS")
_ = os.WriteFile(script, []byte("#!/bin/sh\necho livepeer\n"), 0744)

err = createDtsh(destination)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
}

type StreamSample struct {
streamName string
expected PipelineId
Expand Down
55 changes: 6 additions & 49 deletions handlers/transcode.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package handlers

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/subprocess"
"github.com/xeipuuv/gojsonschema"
)

Expand Down Expand Up @@ -74,30 +73,6 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle {
}
}

// stream from a source to a destination buffer while also printing
func streamOutput(src io.Reader, dst *bytes.Buffer, out io.Writer) error {
mw := io.MultiWriter(dst, out)
s := bufio.NewReader(src)
for {
var line []byte
line, err := s.ReadSlice('\n')
if err == io.EOF && len(line) == 0 {
break
}
if err == io.EOF {
return fmt.Errorf("Improper termination: %v", line)
}
if err != nil {
return err
}
_, err = mw.Write(line)
if err != nil {
return err
}
}
return nil
}
Comment on lines -78 to -99
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emranemran moved this to logging.go. Please check for sanity. Removed multiwriter - deemed unnecessary


// RunTranscodeProcess starts `MistLivepeeerProc` as a subprocess to transcode inputStream into renditionsStream.
func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegmentRequest) error {

Expand All @@ -117,36 +92,18 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm
}
args := string(configPayload)

transcodeCommand := exec.Command(config.PathMistProcLivepeer, args, "--debug", "8", "--kickoff")

var stdout, stderr bytes.Buffer
stderrPipe, err := transcodeCommand.StderrPipe()
if err != nil {
return fmt.Errorf("Failed to open stderr pipe: %s", err)
}
stdoutPipe, err := transcodeCommand.StdoutPipe()
if err != nil {
return fmt.Errorf("Failed to open stdout pipe: %s", err)
transcodeCommand := exec.Command(path.Join(config.PathMistDir, "MistProcLivepeer"), args, "--debug", "8", "--kickoff")
if err = subprocess.LogOutputs(transcodeCommand); err != nil {
return err
}

// Start the Transcode Command asynchronously - we call Wait() later in this method
fmt.Printf("Starting transcode via: %s\n", transcodeCommand.String())
err = transcodeCommand.Start()
if err != nil {
return fmt.Errorf("Failed to start MistProcLivepeer: %s", err)
return fmt.Errorf("failed to start MistProcLivepeer: %s", err)
}

go func() {
if streamOutput(stdoutPipe, &stdout, os.Stdout) != nil {
_ = fmt.Errorf("Failed to stream output from stdout")
}
}()
go func() {
if streamOutput(stderrPipe, &stderr, os.Stderr) != nil {
_ = fmt.Errorf("Failed to stream output from stderr")
}
}()

dir, _ := url.Parse(".")
uploadDir := inputUrl.ResolveReference(dir)
// Cache the stream data, later used in the trigger handlers called by Mist
Expand Down
50 changes: 50 additions & 0 deletions subprocess/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package subprocess

import (
"bufio"
"fmt"
"io"
"os"
"os/exec"

"github.com/livepeer/catalyst-api/config"
)

func streamOutput(src io.Reader, out io.Writer) {
s := bufio.NewReader(src)
for {
var line []byte
line, err := s.ReadSlice('\n')
if err == io.EOF && len(line) == 0 {
break
}
if err == io.EOF {
_ = config.Logger.Log("msg", "streamOutput() improper termination", "line", line)
return
}
if err != nil {
_ = config.Logger.Log("msg", "streamOutput ReadSlice error", "err", err)
return
}
_, err = out.Write(line)
if err != nil {
_ = config.Logger.Log("msg", "streamOutput out.Write error", "err", err)
return
}
}
}

// LogOutputs starts new goroutines to print cmd's stdout & stderr to our stdout & stderr
func LogOutputs(cmd *exec.Cmd) error {
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("Failed to open stderr pipe: %s", err)
}
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("Failed to open stdout pipe: %s", err)
}
go streamOutput(stderrPipe, os.Stderr)
go streamOutput(stdoutPipe, os.Stdout)
return nil
}