Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split orchestrator and transcoder #575

Merged
merged 12 commits into from
Nov 26, 2018
77 changes: 58 additions & 19 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func main() {
rtmpAddr := flag.String("rtmpAddr", "127.0.0.1:"+RtmpPort, "Address to bind for RTMP commands")
cliAddr := flag.String("cliAddr", "127.0.0.1:"+CliPort, "Address to bind for CLI commands")
httpAddr := flag.String("httpAddr", "", "Address to bind for HTTP commands")
transcoder := flag.Bool("transcoder", false, "Set to true to be a transcoder")
orchAndTrans := flag.Bool("transcoder", false, "Set to true to be a orchestrator+transcoder")
orchestrator := flag.Bool("orchestrator", false, "Set to true to be a standalone orchestrator")
maxPricePerSegment := flag.String("maxPricePerSegment", "1", "Max price per segment for a broadcast job")
transcodingOptions := flag.String("transcodingOptions", "P240p30fps16x9,P360p30fps16x9", "Transcoding options for broadcast job")
currentManifest := flag.Bool("currentManifest", false, "Expose the currently active ManifestID as \"/stream/current.m3u8\"")
Expand All @@ -84,6 +85,9 @@ func main() {
s3creds := flag.String("s3creds", "", "S3 credentials (in form ACCESSKEYID/ACCESSKEY)")

version := flag.Bool("version", false, "Print out the version")
orchAddr := flag.String("orchAddr", "", "Orchestrator to connect to as a standalone transcoder")
orchSecret := flag.String("orchSecret", "", "Shared secret with the orchestrator as a standalone transcoder")
var transcoder bool

flag.Parse()

Expand All @@ -92,8 +96,17 @@ func main() {
return
}

if *orchAddr != "" {
if *orchSecret == "" {
glog.Error("Running a standalone transcoder requires both -orchAddr and -orchSecret")
return
}
*orchAddr = defaultAddr(*orchAddr, "127.0.0.1", RpcPort)
transcoder = true
}

if *rinkeby {
if !*offchain {
if !*offchain && !transcoder {
if *ethUrl == "" {
*ethUrl = "wss://rinkeby.infura.io/ws"
}
Expand All @@ -112,7 +125,7 @@ func main() {
} else if *devenv {
*datadir = *datadir + "/devenv"
} else {
if !*offchain {
if !*offchain && !transcoder {
if *ethUrl == "" {
*ethUrl = "wss://mainnet.infura.io/ws"
}
Expand All @@ -129,6 +142,11 @@ func main() {
}
}

if *orchAndTrans {
transcoder = true
*orchestrator = true
}

//Make sure datadir is present
if _, err := os.Stat(*datadir); os.IsNotExist(err) {
glog.Infof("Creating data dir: %v", *datadir)
Expand All @@ -149,10 +167,24 @@ func main() {
if err != nil {
glog.Errorf("Error creating livepeer node: %v", err)
}
if *transcoder {
n.NodeType = core.Transcoder

if *orchSecret != "" {
n.OrchSecret = *orchSecret
}

if transcoder {
n.Transcoder = core.NewLocalTranscoder(*datadir)
}

if *orchestrator {
n.NodeType = core.OrchestratorNode
} else if transcoder {
n.NodeType = core.TranscoderNode
glog.Info("***Livepeer is in transcoder mode ***")
server.RunTranscoder(n, *orchAddr)
return
} else {
n.NodeType = core.Broadcaster
n.NodeType = core.BroadcasterNode
}

if *offchain {
Expand Down Expand Up @@ -219,12 +251,12 @@ func main() {
// Setup unbonding service to manage unbonding locks
n.EthServices["UnbondingService"] = eventservices.NewUnbondingService(n.Eth, dbh)

if *transcoder {
if *orchestrator {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := setupTranscoder(ctx, n, em, *ipfsPath, *initializeRound, *serviceAddr); err != nil {
glog.Errorf("Error setting up transcoder: %v", err)
glog.Errorf("Error setting up orchestrator: %v", err)
return
}
}
Expand Down Expand Up @@ -254,12 +286,12 @@ func main() {
drivers.NodeStorage = drivers.NewS3Driver(br[0], br[1], cr[0], cr[1])
}

if n.NodeType == core.Broadcaster {
if n.NodeType == core.BroadcasterNode {
// default lpms listener for broadcaster; same as default rpc port
// TODO provide an option to disable this?
*rtmpAddr = defaultAddr(*rtmpAddr, "127.0.0.1", RtmpPort)
*httpAddr = defaultAddr(*httpAddr, "127.0.0.1", RpcPort)
} else if n.NodeType == core.Transcoder {
} else if n.NodeType == core.OrchestratorNode {
// if http addr is not provided, listen to all ifaces
// take the port to listen to from the service URI
*httpAddr = defaultAddr(*httpAddr, "", n.ServiceURI.Port())
Expand Down Expand Up @@ -323,7 +355,7 @@ func main() {
}()

go func() {
if core.Transcoder != n.NodeType {
if core.OrchestratorNode != n.NodeType {
return
}
orch := core.NewOrchestrator(s.LivepeerNode)
Expand All @@ -344,11 +376,13 @@ func main() {
}()

switch n.NodeType {
case core.Transcoder:
glog.Infof("***Livepeer Running in Transcoder Mode***")
case core.Broadcaster:
case core.OrchestratorNode:
glog.Infof("***Livepeer Running in Orchestrator Mode***")
case core.BroadcasterNode:
glog.Infof("***Livepeer Running in Broadcaster Mode***")
glog.Infof("Video Ingest Endpoint - rtmp://%v", *rtmpAddr)
case core.TranscoderNode:
glog.Infof("**Liveepeer Running in Transcoder Mode***")
}

c := make(chan os.Signal)
Expand All @@ -373,7 +407,7 @@ func main() {
}

func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMonitor, ipfsPath string, initializeRound bool, serviceUri string) error {
//Check if transcoder is active
//Check if orchestrator is active
active, err := n.Eth.IsActiveTranscoder()
if err != nil {
return err
Expand All @@ -385,6 +419,7 @@ func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMoni
glog.Infof("Transcoder %v is active", n.Eth.Account().Address.Hex())
}

serviceUriSpecified := false
if serviceUri == "" {
// TODO probably should put this (along w wizard GETs) into common code
resp, err := http.Get("https://api.ipify.org?format=text")
Expand All @@ -397,6 +432,7 @@ func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMoni
serviceUri = "https://" + strings.TrimSpace(string(body)) + ":" + RpcPort
} else {
serviceUri = "https://" + serviceUri
serviceUriSpecified = true
}
suri, err := url.ParseRequestURI(serviceUri)
if err != nil {
Expand All @@ -405,21 +441,24 @@ func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMoni
}
uriStr, err := n.Eth.GetServiceURI(n.Eth.Account().Address)
if err != nil {
glog.Error("Could not get service URI; transcoder may be unreachable")
glog.Error("Could not get service URI; orchestrator may be unreachable")
return err
}
uri, err := url.ParseRequestURI(uriStr)
if err != nil {
glog.Error("Could not parse service URI; transcoder may be unreachable")
glog.Error("Could not parse service URI; orchestrator may be unreachable")
uri, _ = url.ParseRequestURI("http://127.0.0.1:" + RpcPort)
}
if uri.Hostname() != suri.Hostname() || uri.Port() != suri.Port() {
glog.Errorf("Service address %v did not match discovered address %v; set the correct address in livepeer_cli or use -serviceAddr", uri, suri)
// TODO remove '&& false' after all transcoders have set a service URI
// TODO remove '&& false' after all orchestrators have set a service URI
if active && false {
return fmt.Errorf("Mismatched service address")
}
}
if serviceUriSpecified {
uri = suri
}

// Set up IPFS
ipfsApi, err := ipfs.StartIpfs(ctx, ipfsPath)
Expand Down Expand Up @@ -451,7 +490,7 @@ func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMoni
// Restart jobs as necessary
err = js.RestartTranscoder()
if err != nil {
glog.Errorf("Unable to restart transcoder: %v", err)
glog.Errorf("Unable to restart orchestrator: %v", err)
// non-fatal, so continue
}

Expand Down
44 changes: 22 additions & 22 deletions cmd/livepeer_cli/livepeer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {
host: c.String("host"),
in: bufio.NewReader(os.Stdin),
}
w.transcoder = w.isTranscoder()
w.orchestrator = w.isOrchestrator()
w.testnet = w.onTestnet()
w.run()

Expand All @@ -62,27 +62,27 @@ func main() {
}

type wizard struct {
endpoint string // Local livepeer node
httpPort string
host string
transcoder bool
testnet bool
in *bufio.Reader // Wrapper around stdin to allow reading user input
endpoint string // Local livepeer node
httpPort string
host string
orchestrator bool
testnet bool
in *bufio.Reader // Wrapper around stdin to allow reading user input
}

type wizardOpt struct {
desc string
invoke func()
testnet bool
transcoder bool
notTranscoder bool
desc string
invoke func()
testnet bool
orchestrator bool
notOrchestrator bool
}

func (w *wizard) initializeOptions() []wizardOpt {
options := []wizardOpt{
{desc: "Get node status", invoke: func() { w.stats(w.transcoder) }},
{desc: "Get node status", invoke: func() { w.stats(w.orchestrator) }},
{desc: "View protocol parameters", invoke: w.protocolStats},
{desc: "List registered transcoders", invoke: func() { w.registeredTranscoderStats() }},
{desc: "List registered orchestrators", invoke: func() { w.registeredOrchestratorStats() }},
{desc: "Print latest jobs", invoke: w.printLast5Jobs},
{desc: "Invoke \"initialize round\"", invoke: w.initializeRound},
{desc: "Invoke \"bond\"", invoke: w.bond},
Expand All @@ -92,12 +92,12 @@ func (w *wizard) initializeOptions() []wizardOpt {
{desc: "Invoke \"withdraw fees\" (ETH)", invoke: w.withdrawFees},
{desc: "Invoke \"claim\" (for rewards and fees)", invoke: w.claimRewardsAndFees},
{desc: "Invoke \"transfer\" (LPT)", invoke: w.transferTokens},
{desc: "Invoke \"reward\"", invoke: w.callReward, transcoder: true},
{desc: "Invoke multi-step \"become a transcoder\"", invoke: w.activateTranscoder, transcoder: true},
{desc: "Set transcoder config", invoke: w.setTranscoderConfig, transcoder: true},
{desc: "Invoke \"deposit\" (ETH)", invoke: w.deposit, notTranscoder: true},
{desc: "Invoke \"withdraw deposit\" (ETH)", invoke: w.withdraw, notTranscoder: true},
{desc: "Set broadcast config", invoke: w.setBroadcastConfig, notTranscoder: true},
{desc: "Invoke \"reward\"", invoke: w.callReward, orchestrator: true},
{desc: "Invoke multi-step \"become an orchestrator\"", invoke: w.activateOrchestrator, orchestrator: true},
{desc: "Set orchestrator config", invoke: w.setOrchestratorConfig, orchestrator: true},
{desc: "Invoke \"deposit\" (ETH)", invoke: w.deposit, notOrchestrator: true},
{desc: "Invoke \"withdraw deposit\" (ETH)", invoke: w.withdraw, notOrchestrator: true},
{desc: "Set broadcast config", invoke: w.setBroadcastConfig, notOrchestrator: true},
{desc: "Set Eth gas price", invoke: w.setGasPrice},
{desc: "Get test LPT", invoke: w.requestTokens, testnet: true},
{desc: "Get test ETH", invoke: func() {
Expand All @@ -114,7 +114,7 @@ func (w *wizard) filterOptions(options []wizardOpt) []wizardOpt {
if opt.testnet && !w.testnet {
continue
}
if !opt.transcoder && !opt.notTranscoder || w.transcoder && opt.transcoder || !w.transcoder && opt.notTranscoder {
if !opt.orchestrator && !opt.notOrchestrator || w.orchestrator && opt.orchestrator || !w.orchestrator && opt.notOrchestrator {
filtered = append(filtered, opt)
}
}
Expand All @@ -139,7 +139,7 @@ func (w *wizard) run() {
fmt.Println("+-----------------------------------------------------------+")
fmt.Println()

w.stats(w.transcoder)
w.stats(w.orchestrator)
options := w.filterOptions(w.initializeOptions())

// Basics done, loop ad infinitum about what to do
Expand Down
Loading