From ce3f3b3e49b9fd94231bdfc88f8c43881aac4ebf Mon Sep 17 00:00:00 2001 From: Alex Kordic Date: Fri, 7 Oct 2022 19:06:53 +0200 Subject: [PATCH 1/3] Run MistInHLS mist binary on push end for each rendition Add subprocess/logging.go --- config/config.go | 4 +-- handlers/misttriggers/push_end.go | 30 ++++++++++++++++- handlers/transcode.go | 55 ++++--------------------------- subprocess/logging.go | 50 ++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 52 deletions(-) create mode 100644 subprocess/logging.go diff --git a/config/config.go b/config/config.go index f1506d794..3a2ddb541 100644 --- a/config/config.go +++ b/config/config.go @@ -11,8 +11,8 @@ var Version string // Used so that we can generate fixed timestamps in tests var Clock TimestampGenerator = RealTimestampGenerator{} -// Path to Mist's "Livepeer" process that we shell out to for the transcoding -const PathMistProcLivepeer = "/usr/local/bin/MistProcLivepeer" +// Path to Mist's binaries that we shell out to for transcoding and header file creation +const PathMistDir = "/usr/local/bin" // Port that the local Broadcaster runs on const DefaultBroadcasterPort = 8935 diff --git a/handlers/misttriggers/push_end.go b/handlers/misttriggers/push_end.go index 18405b091..53d03d0c6 100644 --- a/handlers/misttriggers/push_end.go +++ b/handlers/misttriggers/push_end.go @@ -5,6 +5,8 @@ import ( "log" "net/http" "net/url" + "os/exec" + "path" "strings" "time" @@ -12,6 +14,7 @@ import ( "github.com/livepeer/catalyst-api/clients" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/errors" + "github.com/livepeer/catalyst-api/subprocess" ) // TriggerPushEnd responds to PUSH_END trigger @@ -63,7 +66,7 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite return } - uploadSuccess := pushStatus == "null" + uploadSuccess := pushStatus != "null" if uploadSuccess { // TODO: Do some maths so that we don't always send 0.5 if err := clients.DefaultCallbackClient.SendTranscodeStatus(info.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5); err != nil { @@ -76,6 +79,10 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite } } + if err := createDtsh(actualDestination); err != nil { + _ = config.Logger.Log("msg", "createDtsh failed", "err", err) + } + // We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX cache.DefaultStreamCache.Transcoding.RemovePushDestination(streamName, destination) if cache.DefaultStreamCache.Transcoding.AreDestinationsEmpty(streamName) { @@ -143,3 +150,24 @@ func uuidFromPushUrl(uri string) (string, error) { } return path[len(path)-2], nil } + +func createDtsh(destination string) error { + url, err := url.Parse(destination) + if err != nil { + return err + } + url.RawQuery = "" + url.Fragment = "" + headerPrepare := exec.Command(path.Join(config.PathMistDir, "MistInHLS"), "-H", url.String()) + if err = subprocess.LogOutputs(headerPrepare); err != nil { + return err + } + if err = headerPrepare.Start(); err != nil { + return err + } + go func() { + err := headerPrepare.Wait() + fmt.Println("exec headerPrepare:", err) + }() + return nil +} diff --git a/handlers/transcode.go b/handlers/transcode.go index 44236f56c..2bf8e2a71 100644 --- a/handlers/transcode.go +++ b/handlers/transcode.go @@ -1,22 +1,21 @@ package handlers import ( - "bufio" - "bytes" "encoding/json" "fmt" "io" "log" "net/http" "net/url" - "os" "os/exec" + "path" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/cache" "github.com/livepeer/catalyst-api/clients" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/errors" + "github.com/livepeer/catalyst-api/subprocess" "github.com/xeipuuv/gojsonschema" ) @@ -74,30 +73,6 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle { } } -// stream from a source to a destination buffer while also printing -func streamOutput(src io.Reader, dst *bytes.Buffer, out io.Writer) error { - mw := io.MultiWriter(dst, out) - s := bufio.NewReader(src) - for { - var line []byte - line, err := s.ReadSlice('\n') - if err == io.EOF && len(line) == 0 { - break - } - if err == io.EOF { - return fmt.Errorf("Improper termination: %v", line) - } - if err != nil { - return err - } - _, err = mw.Write(line) - if err != nil { - return err - } - } - return nil -} - // RunTranscodeProcess starts `MistLivepeeerProc` as a subprocess to transcode inputStream into renditionsStream. func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegmentRequest) error { @@ -117,36 +92,18 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm } args := string(configPayload) - transcodeCommand := exec.Command(config.PathMistProcLivepeer, args, "--debug", "8", "--kickoff") - - var stdout, stderr bytes.Buffer - stderrPipe, err := transcodeCommand.StderrPipe() - if err != nil { - return fmt.Errorf("Failed to open stderr pipe: %s", err) - } - stdoutPipe, err := transcodeCommand.StdoutPipe() - if err != nil { - return fmt.Errorf("Failed to open stdout pipe: %s", err) + transcodeCommand := exec.Command(path.Join(config.PathMistDir, "MistProcLivepeer"), args, "--debug", "8", "--kickoff") + if err = subprocess.LogOutputs(transcodeCommand); err != nil { + return err } // Start the Transcode Command asynchronously - we call Wait() later in this method fmt.Printf("Starting transcode via: %s\n", transcodeCommand.String()) err = transcodeCommand.Start() if err != nil { - return fmt.Errorf("Failed to start MistProcLivepeer: %s", err) + return fmt.Errorf("failed to start MistProcLivepeer: %s", err) } - go func() { - if streamOutput(stdoutPipe, &stdout, os.Stdout) != nil { - _ = fmt.Errorf("Failed to stream output from stdout") - } - }() - go func() { - if streamOutput(stderrPipe, &stderr, os.Stderr) != nil { - _ = fmt.Errorf("Failed to stream output from stderr") - } - }() - dir, _ := url.Parse(".") uploadDir := inputUrl.ResolveReference(dir) // Cache the stream data, later used in the trigger handlers called by Mist diff --git a/subprocess/logging.go b/subprocess/logging.go new file mode 100644 index 000000000..3ad9da07b --- /dev/null +++ b/subprocess/logging.go @@ -0,0 +1,50 @@ +package subprocess + +import ( + "bufio" + "fmt" + "io" + "os" + "os/exec" + + "github.com/livepeer/catalyst-api/config" +) + +func streamOutput(src io.Reader, out io.Writer) { + s := bufio.NewReader(src) + for { + var line []byte + line, err := s.ReadSlice('\n') + if err == io.EOF && len(line) == 0 { + break + } + if err == io.EOF { + _ = config.Logger.Log("msg", "streamOutput() improper termination", "line", line) + return + } + if err != nil { + _ = config.Logger.Log("msg", "streamOutput ReadSlice error", "err", err) + return + } + _, err = out.Write(line) + if err != nil { + _ = config.Logger.Log("msg", "streamOutput out.Write error", "err", err) + return + } + } +} + +// LogOutputs starts new goroutines to print cmd's stdout & stderr to our stdout & stderr +func LogOutputs(cmd *exec.Cmd) error { + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("Failed to open stderr pipe: %s", err) + } + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("Failed to open stdout pipe: %s", err) + } + go streamOutput(stderrPipe, os.Stderr) + go streamOutput(stdoutPipe, os.Stdout) + return nil +} From 5d51f79ed7a77a0b796a50cd207c07e01fc3d58f Mon Sep 17 00:00:00 2001 From: Alex Kordic Date: Mon, 10 Oct 2022 12:37:50 +0200 Subject: [PATCH 2/3] Unit test + CR changes --- config/config.go | 2 +- handlers/misttriggers/push_end.go | 7 ++++--- handlers/misttriggers/triggers_test.go | 21 +++++++++++++++++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index 3a2ddb541..092d7b6e1 100644 --- a/config/config.go +++ b/config/config.go @@ -12,7 +12,7 @@ var Version string var Clock TimestampGenerator = RealTimestampGenerator{} // Path to Mist's binaries that we shell out to for transcoding and header file creation -const PathMistDir = "/usr/local/bin" +var PathMistDir = "/usr/local/bin" // Port that the local Broadcaster runs on const DefaultBroadcasterPort = 8935 diff --git a/handlers/misttriggers/push_end.go b/handlers/misttriggers/push_end.go index 53d03d0c6..6b79837ff 100644 --- a/handlers/misttriggers/push_end.go +++ b/handlers/misttriggers/push_end.go @@ -80,7 +80,7 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite } if err := createDtsh(actualDestination); err != nil { - _ = config.Logger.Log("msg", "createDtsh failed", "err", err) + _ = config.Logger.Log("msg", "createDtsh failed", "err", err, "destination", actualDestination) } // We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX @@ -166,8 +166,9 @@ func createDtsh(destination string) error { return err } go func() { - err := headerPrepare.Wait() - fmt.Println("exec headerPrepare:", err) + if err := headerPrepare.Wait(); err != nil { + _ = config.Logger.Log("msg", "createDtsh return code", "code", err, "destination", destination) + } }() return nil } diff --git a/handlers/misttriggers/triggers_test.go b/handlers/misttriggers/triggers_test.go index 1ff7d3870..e6942ff9e 100644 --- a/handlers/misttriggers/triggers_test.go +++ b/handlers/misttriggers/triggers_test.go @@ -4,8 +4,12 @@ import ( "bytes" "encoding/json" "io" + "io/fs" "net/http" "net/http/httptest" + "net/url" + "os" + "path" "testing" "time" @@ -107,6 +111,23 @@ func TestRecordingCompleted(t *testing.T) { } } +func TestMistInHLSStart(t *testing.T) { + dir := t.TempDir() + config.PathMistDir = dir + destination := "unused" + err := createDtsh("invalid://user:abc{DEf1=lp@example.com:5432/db?sslmode=require") + require.IsType(t, &url.Error{}, err) + err = createDtsh(destination) + require.IsType(t, &fs.PathError{}, err) + + script := path.Join(dir, "MistInHLS") + os.WriteFile(script, []byte("#!/bin/sh\necho livepeer\n"), 0744) + + err = createDtsh(destination) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) +} + type StreamSample struct { streamName string expected PipelineId From e139f46399df8ce453927401004f60cc5f3f2bfc Mon Sep 17 00:00:00 2001 From: Alex Kordic Date: Mon, 10 Oct 2022 13:29:29 +0200 Subject: [PATCH 3/3] lint fix --- handlers/misttriggers/triggers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handlers/misttriggers/triggers_test.go b/handlers/misttriggers/triggers_test.go index e6942ff9e..53a45fe75 100644 --- a/handlers/misttriggers/triggers_test.go +++ b/handlers/misttriggers/triggers_test.go @@ -121,7 +121,7 @@ func TestMistInHLSStart(t *testing.T) { require.IsType(t, &fs.PathError{}, err) script := path.Join(dir, "MistInHLS") - os.WriteFile(script, []byte("#!/bin/sh\necho livepeer\n"), 0744) + _ = os.WriteFile(script, []byte("#!/bin/sh\necho livepeer\n"), 0744) err = createDtsh(destination) require.NoError(t, err)