Skip to content

Commit

Permalink
livepeer: Warn if broadcaster does not specify an orchestrator.
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh committed Nov 27, 2018
1 parent b65b4de commit e56658a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
12 changes: 8 additions & 4 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,14 @@ func main() {
n.NodeType = core.BroadcasterNode
}

if *orchAddr != "" {
n.OrchestratorSelector = discovery.NewOffchainOrchestrator(n, *orchAddr)
if n.OrchestratorSelector == nil {
return
if n.NodeType == core.BroadcasterNode {
if *orchAddr == "" {
glog.Info("No orchestrator specified; transcoding will not happen")
} else {
n.OrchestratorSelector = discovery.NewOffchainOrchestrator(n, *orchAddr)
if n.OrchestratorSelector == nil {
return
}
}
}

Expand Down
23 changes: 10 additions & 13 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/go-livepeer/monitor"
Expand All @@ -42,6 +41,7 @@ var ErrHLSPlay = errors.New("ErrHLSPlay")
var ErrRTMPPlay = errors.New("ErrRTMPPlay")
var ErrRoundInit = errors.New("ErrRoundInit")
var ErrStorage = errors.New("ErrStorage")
var ErrDiscovery = errors.New("ErrDiscovery")

const HLSWaitInterval = time.Second
const HLSBufferCap = uint(43200) //12 hrs assuming 1s segment
Expand Down Expand Up @@ -148,16 +148,12 @@ func createRTMPStreamIDHandler(s *LivepeerServer) func(url *url.URL) (strmID str
}
}

func (s *LivepeerServer) startBroadcast(transcoderAddress ethcommon.Address, jobId string, nonce uint64) (Broadcaster, error) {
tca := transcoderAddress
serviceUri, err := s.LivepeerNode.Eth.GetServiceURI(tca)
if err != nil || serviceUri == "" {
glog.Errorf("Unable to retrieve the Service URI for %v: %v", tca.Hex(), err)
if err == nil {
err = fmt.Errorf("Empty Service URI")
}
return nil, err
func (s *LivepeerServer) startBroadcast(jobId string) (Broadcaster, error) {

if s.LivepeerNode.OrchestratorSelector == nil {
return nil, ErrDiscovery
}

rpcBcast := core.NewBroadcaster(s.LivepeerNode, jobId)

tinfos, err := s.LivepeerNode.OrchestratorSelector.GetOrchestrators(1)
Expand Down Expand Up @@ -481,9 +477,10 @@ func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.
// as the RTMP stream is alive; maybe the orchestrator hasn't
// received the block containing the job yet
broadcastFunc := func() error {
transcoderAddress := ethcommon.BytesToAddress([]byte("111 Transcoder Address 1"))
rpcBcast, err = s.startBroadcast(transcoderAddress, jobId, nonce) // ANGIE - TRANSCODER ADDRESS
if err != nil {
rpcBcast, err = s.startBroadcast(jobId)
if err == ErrDiscovery {
return err // discovery disabled, don't retry
} else if err != nil {
// Should be logged upstream
} else {
rpcBcast.SetBroadcasterOS(storage)
Expand Down

0 comments on commit e56658a

Please sign in to comment.