Skip to content

Commit

Permalink
Intel Support
Browse files Browse the repository at this point in the history
  • Loading branch information
stronk-dev committed Oct 27, 2022
1 parent e1b25b2 commit 6343f37
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 2 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.CurrentManifest = flag.Bool("currentManifest", *cfg.CurrentManifest, "Expose the currently active ManifestID as \"/stream/current.m3u8\"")
cfg.Nvidia = flag.String("nvidia", *cfg.Nvidia, "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
cfg.Netint = flag.String("netint", *cfg.Netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
cfg.Intel = flag.String("intel", *cfg.Intel, "Comma-separated list of Intel device GUIDs (or \"all\" for all available devices)")
cfg.TestTranscoder = flag.Bool("testTranscoder", *cfg.TestTranscoder, "Test Nvidia GPU transcoding at startup")
cfg.SceneClassificationModelPath = flag.String("sceneClassificationModelPath", *cfg.SceneClassificationModelPath, "Path to scene classification model")
cfg.DetectContent = flag.Bool("detectContent", *cfg.DetectContent, "Set to true to enable content type detection")
Expand Down
21 changes: 19 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type LivepeerConfig struct {
CurrentManifest *bool
Nvidia *string
Netint *string
Intel *string
TestTranscoder *bool
SceneClassificationModelPath *string
DetectContent *bool
Expand Down Expand Up @@ -151,6 +152,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
defaultIntel := ""
defaultTestTranscoder := true
defaultDetectContent := false
defaultSceneClassificationModelPath := "tasmodel.pb"
Expand Down Expand Up @@ -222,6 +224,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
CurrentManifest: &defaultCurrentManifest,
Nvidia: &defaultNvidia,
Netint: &defaultNetint,
Intel: &defaultIntel,
TestTranscoder: &defaultTestTranscoder,
SceneClassificationModelPath: &defaultSceneClassificationModelPath,
DetectContent: &defaultDetectContent,
Expand Down Expand Up @@ -278,8 +281,18 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}

if *cfg.Netint != "" && *cfg.Nvidia != "" {
glog.Fatal("both -netint and -nvidia arguments specified, this is not supported")
configuredCards := 0
if *cfg.Netint != "" {
configuredCards++
}
if *cfg.Nvidia != "" {
configuredCards++
}
if *cfg.Intel != "" {
configuredCards++
}
if configuredCards > 1 {
glog.Fatal("Only one of the `-netint`, `-intel` and `-nvidia` arguments is allowed, but multiple are specified, this is not supported")
return
}

Expand Down Expand Up @@ -392,6 +405,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
accel = ffmpeg.Netint
devicesStr = *cfg.Netint
}
if *cfg.Intel != "" {
accel = ffmpeg.Intel
devicesStr = *cfg.Intel
}
if accel != ffmpeg.Software {
accelName := ffmpeg.AccelerationNameLookup[accel]
tf, dtf, err := core.GetTranscoderFactoryByAccel(accel)
Expand Down
30 changes: 30 additions & 0 deletions cmd/livepeer_bench/livepeer_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
transcodingOptions := flag.String("transcodingOptions", "P240p30fps16x9,P360p30fps16x9,P720p30fps16x9", "Transcoding options for broadcast job, or path to json config")
nvidia := flag.String("nvidia", "", "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
netint := flag.String("netint", "", "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
intel := flag.String("intel", "", "Comma-separated list of Intel device GUIDs (or \"all\" for all available devices)")
outPrefix := flag.String("outPrefix", "", "Output segments' prefix (no segments are generated by default)")
detectionFreq := flag.Int("detectionFreq", 0, "Run content-detection on every nth segment. No detection occurs for default frequency of 0.")
detectionSampleRate := flag.Uint("detectionSampleRate", 1, "Run content-detection on every nth frame of a particular segment, if detectionFreq > 0.")
Expand All @@ -50,6 +51,22 @@ func main() {
os.Exit(1)
}

configuredCards := 0
if *netint != "" {
configuredCards++
}
if *nvidia != "" {
configuredCards++
}
if *intel != "" {
configuredCards++
}
if configuredCards > 1 {
glog.Errorf("Only one of the `-netint`, `-intel` and `-nvidia` arguments is allowed, but multiple are specified, this is not supported")
flag.Usage()
os.Exit(1)
}

profiles := parseVideoProfiles(*transcodingOptions)

f, err := os.Open(*in)
Expand Down Expand Up @@ -85,6 +102,15 @@ func main() {
}
}

if *intel != "" {
var err error
accel = ffmpeg.Intel
devices, err = common.ParseAccelDevices(*intel, accel)
if err != nil {
glog.Fatalf("Error while parsing '-intel %v' flag: %v", *intel, err)
}
}

glog.Infof("log level is: %d", ffmpeg.LogLevel(*log*8))
ffmpeg.InitFFmpegWithLogLevel(ffmpeg.LogLevel(*log * 8))

Expand All @@ -108,6 +134,10 @@ func main() {
data = append(data, []string{"Netint GUIDs", fmt.Sprintf("%v", strings.Join(devices, ","))})
}

if accel == ffmpeg.Intel && len(devices) > 0 {
data = append(data, []string{"Intel GUIDs", fmt.Sprintf("%v", strings.Join(devices, ","))})
}

fmt.Printf("data %s \n", data)

if *detectionFreq > 0 {
Expand Down
38 changes: 38 additions & 0 deletions cmd/livepeer_bench/transcodingOptions-intel.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"name": "240p0",
"fps": 0,
"bitrate": 250000,
"width": 426,
"height": 240,
"gop": "1",
"encoder": "AV1"
},
{
"name": "360p0",
"fps": 0,
"bitrate": 800000,
"width": 640,
"height": 360,
"gop": "1",
"encoder": "AV1"
},
{
"name": "480p0",
"fps": 0,
"bitrate": 1600000,
"width": 854,
"height": 480,
"gop": "1",
"encoder": "AV1"
},
{
"name": "720p0",
"fps": 0,
"bitrate": 3000000,
"width": 1280,
"height": 720,
"gop": "1",
"encoder": "AV1"
}
]
1 change: 1 addition & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ func detectNvidiaDevices() ([]string, error) {
}

func ParseAccelDevices(devices string, acceleration ffmpeg.Acceleration) ([]string, error) {
// TODO: create detectIntelDevices ?
if acceleration == ffmpeg.Nvidia && devices == "all" {
return detectNvidiaDevices()
}
Expand Down
56 changes: 56 additions & 0 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,45 @@ type NetintTranscoder struct {
session *ffmpeg.Transcoder
}

type IntelTranscoder struct {
device string
session *ffmpeg.Transcoder
}

func (nv *IntelTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (td *TranscodeData, retErr error) {
// Returns UnrecoverableError instead of panicking to gracefully notify orchestrator about transcoder's failure
defer recoverFromPanic(&retErr)

in := &ffmpeg.TranscodeOptionsIn{
Fname: md.Fname,
Accel: ffmpeg.Intel,
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Intel, profiles, md.CalcPerceptualHash, md.SegmentParameters)
if md.DetectorEnabled {
out = append(out, detectorsToTranscodeOptions(WorkDir, ffmpeg.Intel, md.DetectorProfiles)...)
}

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()

res, err := nv.session.Transcode(in, out)
if err != nil {
return nil, err
}

if monitor.Enabled && parseErr == nil {
// This will run only when fname is actual URL and contains seqNo in it.
// When orchestrator works as transcoder, `fname` will be relative path to file in local
// filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will
// be called in orchestrator.go
monitor.SegmentTranscoded(ctx, 0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles), true, true)
}

return resToTranscodeData(ctx, res, out)
}

func (nv *NetintTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (td *TranscodeData, retErr error) {
// Returns UnrecoverableError instead of panicking to gracefully notify orchestrator about transcoder's failure
defer recoverFromPanic(&retErr)
Expand Down Expand Up @@ -173,6 +212,10 @@ func (nt *NetintTranscoder) EndTranscodingSession(sessionId string) {
nt.Stop()
}

func (nt *IntelTranscoder) EndTranscodingSession(sessionId string) {
nt.Stop()
}

type transcodeTestParams struct {
TestAvailable bool
Cap Capability
Expand Down Expand Up @@ -397,6 +440,8 @@ func GetTranscoderFactoryByAccel(acceleration ffmpeg.Acceleration) (func(device
return NewNvidiaTranscoder, NewNvidiaTranscoderWithDetector, nil
case ffmpeg.Netint:
return NewNetintTranscoder, nil, nil
case ffmpeg.Intel:
return NewIntelTranscoder, nil, nil
default:
return nil, nil, ffmpeg.ErrTranscoderHw
}
Expand All @@ -416,6 +461,13 @@ func NewNetintTranscoder(gpu string) TranscoderSession {
}
}

func NewIntelTranscoder(gpu string) TranscoderSession {
return &IntelTranscoder{
device: gpu,
session: ffmpeg.NewTranscoder(),
}
}

func NewNvidiaTranscoderWithDetector(detector ffmpeg.DetectorProfile, gpu string) (TranscoderSession, error) {
// Hardcode detection to device 0 for now
// Transcoding can still run on a separate GPU as we copy frames to CPU before detection
Expand All @@ -434,6 +486,10 @@ func (nv *NetintTranscoder) Stop() {
nv.session.StopTranscoder()
}

func (nv *IntelTranscoder) Stop() {
nv.session.StopTranscoder()
}

func parseURI(uri string) (string, uint64, error) {
var mid string
var seqNo uint64
Expand Down

0 comments on commit 6343f37

Please sign in to comment.