diff --git a/config/config.go b/config/config.go index f1506d794..092d7b6e1 100644 --- a/config/config.go +++ b/config/config.go @@ -11,8 +11,8 @@ var Version string // Used so that we can generate fixed timestamps in tests var Clock TimestampGenerator = RealTimestampGenerator{} -// Path to Mist's "Livepeer" process that we shell out to for the transcoding -const PathMistProcLivepeer = "/usr/local/bin/MistProcLivepeer" +// Path to Mist's binaries that we shell out to for transcoding and header file creation +var PathMistDir = "/usr/local/bin" // Port that the local Broadcaster runs on const DefaultBroadcasterPort = 8935 diff --git a/handlers/misttriggers/push_end.go b/handlers/misttriggers/push_end.go index 18405b091..6b79837ff 100644 --- a/handlers/misttriggers/push_end.go +++ b/handlers/misttriggers/push_end.go @@ -5,6 +5,8 @@ import ( "log" "net/http" "net/url" + "os/exec" + "path" "strings" "time" @@ -12,6 +14,7 @@ import ( "github.com/livepeer/catalyst-api/clients" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/errors" + "github.com/livepeer/catalyst-api/subprocess" ) // TriggerPushEnd responds to PUSH_END trigger @@ -63,7 +66,7 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite return } - uploadSuccess := pushStatus == "null" + uploadSuccess := pushStatus != "null" if uploadSuccess { // TODO: Do some maths so that we don't always send 0.5 if err := clients.DefaultCallbackClient.SendTranscodeStatus(info.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5); err != nil { @@ -76,6 +79,10 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite } } + if err := createDtsh(actualDestination); err != nil { + _ = config.Logger.Log("msg", "createDtsh failed", "err", err, "destination", actualDestination) + } + // We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX cache.DefaultStreamCache.Transcoding.RemovePushDestination(streamName, destination) if cache.DefaultStreamCache.Transcoding.AreDestinationsEmpty(streamName) { @@ -143,3 +150,25 @@ func uuidFromPushUrl(uri string) (string, error) { } return path[len(path)-2], nil } + +func createDtsh(destination string) error { + url, err := url.Parse(destination) + if err != nil { + return err + } + url.RawQuery = "" + url.Fragment = "" + headerPrepare := exec.Command(path.Join(config.PathMistDir, "MistInHLS"), "-H", url.String()) + if err = subprocess.LogOutputs(headerPrepare); err != nil { + return err + } + if err = headerPrepare.Start(); err != nil { + return err + } + go func() { + if err := headerPrepare.Wait(); err != nil { + _ = config.Logger.Log("msg", "createDtsh return code", "code", err, "destination", destination) + } + }() + return nil +} diff --git a/handlers/misttriggers/triggers_test.go b/handlers/misttriggers/triggers_test.go index 1ff7d3870..53a45fe75 100644 --- a/handlers/misttriggers/triggers_test.go +++ b/handlers/misttriggers/triggers_test.go @@ -4,8 +4,12 @@ import ( "bytes" "encoding/json" "io" + "io/fs" "net/http" "net/http/httptest" + "net/url" + "os" + "path" "testing" "time" @@ -107,6 +111,23 @@ func TestRecordingCompleted(t *testing.T) { } } +func TestMistInHLSStart(t *testing.T) { + dir := t.TempDir() + config.PathMistDir = dir + destination := "unused" + err := createDtsh("invalid://user:abc{DEf1=lp@example.com:5432/db?sslmode=require") + require.IsType(t, &url.Error{}, err) + err = createDtsh(destination) + require.IsType(t, &fs.PathError{}, err) + + script := path.Join(dir, "MistInHLS") + _ = os.WriteFile(script, []byte("#!/bin/sh\necho livepeer\n"), 0744) + + err = createDtsh(destination) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) +} + type StreamSample struct { streamName string expected PipelineId diff --git a/handlers/transcode.go b/handlers/transcode.go index 44236f56c..2bf8e2a71 100644 --- a/handlers/transcode.go +++ b/handlers/transcode.go @@ -1,22 +1,21 @@ package handlers import ( - "bufio" - "bytes" "encoding/json" "fmt" "io" "log" "net/http" "net/url" - "os" "os/exec" + "path" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/cache" "github.com/livepeer/catalyst-api/clients" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/errors" + "github.com/livepeer/catalyst-api/subprocess" "github.com/xeipuuv/gojsonschema" ) @@ -74,30 +73,6 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle { } } -// stream from a source to a destination buffer while also printing -func streamOutput(src io.Reader, dst *bytes.Buffer, out io.Writer) error { - mw := io.MultiWriter(dst, out) - s := bufio.NewReader(src) - for { - var line []byte - line, err := s.ReadSlice('\n') - if err == io.EOF && len(line) == 0 { - break - } - if err == io.EOF { - return fmt.Errorf("Improper termination: %v", line) - } - if err != nil { - return err - } - _, err = mw.Write(line) - if err != nil { - return err - } - } - return nil -} - // RunTranscodeProcess starts `MistLivepeeerProc` as a subprocess to transcode inputStream into renditionsStream. func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegmentRequest) error { @@ -117,36 +92,18 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm } args := string(configPayload) - transcodeCommand := exec.Command(config.PathMistProcLivepeer, args, "--debug", "8", "--kickoff") - - var stdout, stderr bytes.Buffer - stderrPipe, err := transcodeCommand.StderrPipe() - if err != nil { - return fmt.Errorf("Failed to open stderr pipe: %s", err) - } - stdoutPipe, err := transcodeCommand.StdoutPipe() - if err != nil { - return fmt.Errorf("Failed to open stdout pipe: %s", err) + transcodeCommand := exec.Command(path.Join(config.PathMistDir, "MistProcLivepeer"), args, "--debug", "8", "--kickoff") + if err = subprocess.LogOutputs(transcodeCommand); err != nil { + return err } // Start the Transcode Command asynchronously - we call Wait() later in this method fmt.Printf("Starting transcode via: %s\n", transcodeCommand.String()) err = transcodeCommand.Start() if err != nil { - return fmt.Errorf("Failed to start MistProcLivepeer: %s", err) + return fmt.Errorf("failed to start MistProcLivepeer: %s", err) } - go func() { - if streamOutput(stdoutPipe, &stdout, os.Stdout) != nil { - _ = fmt.Errorf("Failed to stream output from stdout") - } - }() - go func() { - if streamOutput(stderrPipe, &stderr, os.Stderr) != nil { - _ = fmt.Errorf("Failed to stream output from stderr") - } - }() - dir, _ := url.Parse(".") uploadDir := inputUrl.ResolveReference(dir) // Cache the stream data, later used in the trigger handlers called by Mist diff --git a/subprocess/logging.go b/subprocess/logging.go new file mode 100644 index 000000000..3ad9da07b --- /dev/null +++ b/subprocess/logging.go @@ -0,0 +1,50 @@ +package subprocess + +import ( + "bufio" + "fmt" + "io" + "os" + "os/exec" + + "github.com/livepeer/catalyst-api/config" +) + +func streamOutput(src io.Reader, out io.Writer) { + s := bufio.NewReader(src) + for { + var line []byte + line, err := s.ReadSlice('\n') + if err == io.EOF && len(line) == 0 { + break + } + if err == io.EOF { + _ = config.Logger.Log("msg", "streamOutput() improper termination", "line", line) + return + } + if err != nil { + _ = config.Logger.Log("msg", "streamOutput ReadSlice error", "err", err) + return + } + _, err = out.Write(line) + if err != nil { + _ = config.Logger.Log("msg", "streamOutput out.Write error", "err", err) + return + } + } +} + +// LogOutputs starts new goroutines to print cmd's stdout & stderr to our stdout & stderr +func LogOutputs(cmd *exec.Cmd) error { + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("Failed to open stderr pipe: %s", err) + } + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("Failed to open stdout pipe: %s", err) + } + go streamOutput(stderrPipe, os.Stderr) + go streamOutput(stdoutPipe, os.Stdout) + return nil +}