-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run MistInHLS mist binary on push end for each rendition #55
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,16 @@ import ( | |
"log" | ||
"net/http" | ||
"net/url" | ||
"os/exec" | ||
"path" | ||
"strings" | ||
"time" | ||
|
||
"github.com/livepeer/catalyst-api/cache" | ||
"github.com/livepeer/catalyst-api/clients" | ||
"github.com/livepeer/catalyst-api/config" | ||
"github.com/livepeer/catalyst-api/errors" | ||
"github.com/livepeer/catalyst-api/subprocess" | ||
) | ||
|
||
// TriggerPushEnd responds to PUSH_END trigger | ||
|
@@ -63,7 +66,7 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite | |
return | ||
} | ||
|
||
uploadSuccess := pushStatus == "null" | ||
uploadSuccess := pushStatus != "null" | ||
if uploadSuccess { | ||
// TODO: Do some maths so that we don't always send 0.5 | ||
if err := clients.DefaultCallbackClient.SendTranscodeStatus(info.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5); err != nil { | ||
|
@@ -76,6 +79,10 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite | |
} | ||
} | ||
|
||
if err := createDtsh(actualDestination); err != nil { | ||
_ = config.Logger.Log("msg", "createDtsh failed", "err", err, "destination", actualDestination) | ||
} | ||
|
||
// We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX | ||
cache.DefaultStreamCache.Transcoding.RemovePushDestination(streamName, destination) | ||
if cache.DefaultStreamCache.Transcoding.AreDestinationsEmpty(streamName) { | ||
|
@@ -143,3 +150,25 @@ func uuidFromPushUrl(uri string) (string, error) { | |
} | ||
return path[len(path)-2], nil | ||
} | ||
|
||
func createDtsh(destination string) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a unit test for this method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea |
||
url, err := url.Parse(destination) | ||
if err != nil { | ||
return err | ||
} | ||
url.RawQuery = "" | ||
url.Fragment = "" | ||
headerPrepare := exec.Command(path.Join(config.PathMistDir, "MistInHLS"), "-H", url.String()) | ||
if err = subprocess.LogOutputs(headerPrepare); err != nil { | ||
return err | ||
} | ||
if err = headerPrepare.Start(); err != nil { | ||
return err | ||
} | ||
go func() { | ||
if err := headerPrepare.Wait(); err != nil { | ||
_ = config.Logger.Log("msg", "createDtsh return code", "code", err, "destination", destination) | ||
} | ||
}() | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,21 @@ | ||
package handlers | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"os/exec" | ||
"path" | ||
|
||
"github.com/julienschmidt/httprouter" | ||
"github.com/livepeer/catalyst-api/cache" | ||
"github.com/livepeer/catalyst-api/clients" | ||
"github.com/livepeer/catalyst-api/config" | ||
"github.com/livepeer/catalyst-api/errors" | ||
"github.com/livepeer/catalyst-api/subprocess" | ||
"github.com/xeipuuv/gojsonschema" | ||
) | ||
|
||
|
@@ -74,30 +73,6 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle { | |
} | ||
} | ||
|
||
// stream from a source to a destination buffer while also printing | ||
func streamOutput(src io.Reader, dst *bytes.Buffer, out io.Writer) error { | ||
mw := io.MultiWriter(dst, out) | ||
s := bufio.NewReader(src) | ||
for { | ||
var line []byte | ||
line, err := s.ReadSlice('\n') | ||
if err == io.EOF && len(line) == 0 { | ||
break | ||
} | ||
if err == io.EOF { | ||
return fmt.Errorf("Improper termination: %v", line) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
_, err = mw.Write(line) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
Comment on lines
-78
to
-99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @emranemran moved this to |
||
|
||
// RunTranscodeProcess starts `MistLivepeeerProc` as a subprocess to transcode inputStream into renditionsStream. | ||
func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegmentRequest) error { | ||
|
||
|
@@ -117,36 +92,18 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm | |
} | ||
args := string(configPayload) | ||
|
||
transcodeCommand := exec.Command(config.PathMistProcLivepeer, args, "--debug", "8", "--kickoff") | ||
|
||
var stdout, stderr bytes.Buffer | ||
stderrPipe, err := transcodeCommand.StderrPipe() | ||
if err != nil { | ||
return fmt.Errorf("Failed to open stderr pipe: %s", err) | ||
} | ||
stdoutPipe, err := transcodeCommand.StdoutPipe() | ||
if err != nil { | ||
return fmt.Errorf("Failed to open stdout pipe: %s", err) | ||
transcodeCommand := exec.Command(path.Join(config.PathMistDir, "MistProcLivepeer"), args, "--debug", "8", "--kickoff") | ||
if err = subprocess.LogOutputs(transcodeCommand); err != nil { | ||
return err | ||
} | ||
|
||
// Start the Transcode Command asynchronously - we call Wait() later in this method | ||
fmt.Printf("Starting transcode via: %s\n", transcodeCommand.String()) | ||
err = transcodeCommand.Start() | ||
if err != nil { | ||
return fmt.Errorf("Failed to start MistProcLivepeer: %s", err) | ||
return fmt.Errorf("failed to start MistProcLivepeer: %s", err) | ||
} | ||
|
||
go func() { | ||
if streamOutput(stdoutPipe, &stdout, os.Stdout) != nil { | ||
_ = fmt.Errorf("Failed to stream output from stdout") | ||
} | ||
}() | ||
go func() { | ||
if streamOutput(stderrPipe, &stderr, os.Stderr) != nil { | ||
_ = fmt.Errorf("Failed to stream output from stderr") | ||
} | ||
}() | ||
|
||
dir, _ := url.Parse(".") | ||
uploadDir := inputUrl.ResolveReference(dir) | ||
// Cache the stream data, later used in the trigger handlers called by Mist | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package subprocess | ||
|
||
import ( | ||
"bufio" | ||
"fmt" | ||
"io" | ||
"os" | ||
"os/exec" | ||
|
||
"github.com/livepeer/catalyst-api/config" | ||
) | ||
|
||
func streamOutput(src io.Reader, out io.Writer) { | ||
s := bufio.NewReader(src) | ||
for { | ||
var line []byte | ||
line, err := s.ReadSlice('\n') | ||
if err == io.EOF && len(line) == 0 { | ||
break | ||
} | ||
if err == io.EOF { | ||
_ = config.Logger.Log("msg", "streamOutput() improper termination", "line", line) | ||
return | ||
} | ||
if err != nil { | ||
_ = config.Logger.Log("msg", "streamOutput ReadSlice error", "err", err) | ||
return | ||
} | ||
_, err = out.Write(line) | ||
if err != nil { | ||
_ = config.Logger.Log("msg", "streamOutput out.Write error", "err", err) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// LogOutputs starts new goroutines to print cmd's stdout & stderr to our stdout & stderr | ||
func LogOutputs(cmd *exec.Cmd) error { | ||
stderrPipe, err := cmd.StderrPipe() | ||
if err != nil { | ||
return fmt.Errorf("Failed to open stderr pipe: %s", err) | ||
} | ||
stdoutPipe, err := cmd.StdoutPipe() | ||
if err != nil { | ||
return fmt.Errorf("Failed to open stdout pipe: %s", err) | ||
} | ||
go streamOutput(stderrPipe, os.Stderr) | ||
go streamOutput(stdoutPipe, os.Stdout) | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing shows we actually get
null
on error and json object on success.