Skip to content

Commit

Permalink
rpc: Get StreamIDs from orchestrator.
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh committed Jun 12, 2018
1 parent b87c42a commit 2bf4378
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 0 deletions.
12 changes: 12 additions & 0 deletions server/mediaserver.go
Expand Up @@ -353,6 +353,18 @@ func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.
if err != nil {
return // XXX feed back error?
}
// Update the master playlist based on the streamids from the transcoder
for strmID, tProfile := range rpcBcast.tinfo.StreamIds {
vParams := ffmpeg.VideoProfileToVariantParams(ffmpeg.VideoProfileLookup[tProfile])
pl, _ := m3u8.NewMediaPlaylist(stream.DefaultHLSStreamWin, stream.DefaultHLSStreamCap)
variant := &m3u8.Variant{URI: fmt.Sprintf("%v.m3u8", strmID), Chunklist: pl, VariantParams: vParams}
manifest.Append(variant.URI, variant.Chunklist, variant.VariantParams)
}
// Update the master playlist on the network
if err = s.LivepeerNode.VideoNetwork.UpdateMasterPlaylist(string(mid), manifest); err != nil {
glog.Errorf("Error updating master playlist on network: %v", err)
return // XXX feed back error?
}
}()
}
return nil
Expand Down
26 changes: 26 additions & 0 deletions server/rpc.go
Expand Up @@ -42,6 +42,7 @@ type Orchestrator interface {
Sign(string) ([]byte, error)
GetJob(int64) (*lpTypes.Job, error)
TranscodeSeg(*lpTypes.Job, *core.SignedSegment) error
StreamIDs(*lpTypes.Job) ([]core.StreamID, error)
}

// Orchestator interface methods
Expand All @@ -67,6 +68,21 @@ func (orch *orchestrator) Address() ethcommon.Address {
return orch.address
}

func (orch *orchestrator) StreamIDs(job *lpTypes.Job) ([]core.StreamID, error) {
streamIds := make([]core.StreamID, len(job.Profiles))
sid := core.StreamID(job.StreamId)
vid := sid.GetVideoID()
for i, p := range job.Profiles {
strmId, err := core.MakeStreamID(orch.node.Identity, vid, p.Name)
if err != nil {
glog.Error("Error making stream ID: ", err)
return []core.StreamID{}, err
}
streamIds[i] = strmId
}
return streamIds, nil
}

func (orch *orchestrator) TranscodeSeg(job *lpTypes.Job, ss *core.SignedSegment) error {
orch.node.TranscodeSegment(job, ss)
return nil
Expand Down Expand Up @@ -224,10 +240,20 @@ func GetTranscoder(context context.Context, orch Orchestrator, req *TranscoderRe
if err != nil {
return nil, err
}
sids, err := orch.StreamIDs(job)
if err != nil {
return nil, err
}
stringStreamIds := make(map[string]string)
for i, s := range sids {
stringStreamIds[s.String()] = job.Profiles[i].Name
}

tr := TranscoderInfo{
Transcoder: orch.Transcoder(),
AuthType: AuthType_LPE,
Credentials: creds,
StreamIds: stringStreamIds,
}
return &tr, nil
}
Expand Down
3 changes: 3 additions & 0 deletions server/rpc_test.go
Expand Up @@ -44,6 +44,9 @@ func (r *stubOrchestrator) Address() ethcommon.Address {
func (r *stubOrchestrator) TranscodeSeg(job *lpTypes.Job, seg *core.SignedSegment) error {
return nil
}
func (r *stubOrchestrator) StreamIDs(job *lpTypes.Job) ([]core.StreamID, error) {
return []core.StreamID{}, nil
}

func StubOrchestrator() *stubOrchestrator {
pk, err := ethcrypto.GenerateKey()
Expand Down

0 comments on commit 2bf4378

Please sign in to comment.