Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add automatic session limit and set max sessions with livepeer_cli #2781

Merged
merged 28 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
526da05
update max sessions with transcoder connect and close
ad-astra-video Feb 10, 2023
9b4701b
Dynamic session limit
eliteprox Mar 21, 2023
3dc57c7
Added -maxSessions 'auto' option
eliteprox Mar 22, 2023
a4a5aa2
Remove commented code
eliteprox Mar 22, 2023
275c5a0
Update MaxSessions description
eliteprox Mar 22, 2023
0ae4b59
update log
eliteprox Mar 22, 2023
e18df3d
Added SetMaxSessions from livepeernode.go (#25)
eliteprox Mar 22, 2023
1a9dc2f
Fix session limit counting
eliteprox Mar 26, 2023
b79aae8
Merge branch 'av/maxsessions-set-with-t-capacity' into av/maxsessions…
eliteprox Mar 26, 2023
99e0519
-maxSessions auto for orchestrators only
eliteprox Mar 29, 2023
ac1e138
Add maximum session to livepeer_cli.
eliteprox Apr 3, 2023
3af2486
Session capacity count fix
eliteprox Apr 4, 2023
2d87fb5
Merge branch 'master' into av/maxsessions-set-with-t-capacity
eliteprox Apr 5, 2023
4f3333b
Updates to cli for max sessions auto, calculate capacity after discon…
eliteprox Apr 5, 2023
29e776c
Simplify GetCurrentCapacity to use existing code and RTmutex.Lock()/R…
eliteprox Apr 5, 2023
3f7243c
Configure dynamic session limit with '0' instead of `auto`
eliteprox Apr 21, 2023
ecb5de5
Default to auto-limit when maxSessions not provided by orchestrator
eliteprox Apr 27, 2023
9c9732a
Tidying up
eliteprox Apr 27, 2023
98ca988
Tidy up more, add changelog
eliteprox Apr 27, 2023
d08ac91
Merge branch 'master' into av/maxsessions-set-with-t-capacity
eliteprox May 23, 2023
580311c
Merge branch 'master' into av/maxsessions-set-with-t-capacity
eliteprox Jun 20, 2023
8fe8b0c
go fmt
eliteprox Aug 18, 2023
dae8b38
Merge branch 'master' of https://github.com/livepeer/go-livepeer into…
eliteprox Aug 19, 2023
3d1d428
Merge branch 'master' into av/maxsessions-set-with-t-capacity
eliteprox Aug 30, 2023
98bb55d
Revert "Configure dynamic session limit with '0' instead of `auto`" -…
eliteprox Aug 30, 2023
7f7b65c
nit fixes
eliteprox Aug 31, 2023
18726e3
Update cmd/livepeer/starter/starter.go
eliteprox Aug 31, 2023
e626817
Merge branch 'master' into av/maxsessions-set-with-t-capacity
eliteprox Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- \#2839 Add Orchestrator blocklist CLI parameter (@mjh1)

#### Orchestrator
- \#2781 Add automatic session limit and set max sessions with livepeer_cli

#### Transcoder

Expand Down
2 changes: 1 addition & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
flag.CommandLine.SetOutput(os.Stdout)

// Help & Log
mistJSON := flag.Bool("j", false, "Print application info as json")

Check warning on line 37 in cmd/livepeer/livepeer.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/livepeer.go#L37

Added line #L37 was not covered by tests
version := flag.Bool("version", false, "Print out the version")
verbosity := flag.String("v", "3", "Log verbosity. {4|5|6}")

Expand All @@ -53,7 +53,7 @@

vFlag.Value.Set(*verbosity)

if *mistJSON {

Check warning on line 56 in cmd/livepeer/livepeer.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/livepeer.go#L56

Added line #L56 was not covered by tests
mistconnector.PrintMistConfigJson(
"livepeer",
"Official implementation of the Livepeer video processing protocol. Can play all roles in the network.",
Expand Down Expand Up @@ -133,7 +133,7 @@
cfg.TranscodingOptions = flag.String("transcodingOptions", *cfg.TranscodingOptions, "Transcoding options for broadcast job, or path to json config")
cfg.MaxAttempts = flag.Int("maxAttempts", *cfg.MaxAttempts, "Maximum transcode attempts")
cfg.SelectRandFreq = flag.Float64("selectRandFreq", *cfg.SelectRandFreq, "Frequency to randomly select unknown orchestrators (on-chain mode only)")
cfg.MaxSessions = flag.Int("maxSessions", *cfg.MaxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator, maximum number or RTMP streams for Broadcaster, or maximum capacity for transcoder")
cfg.MaxSessions = flag.String("maxSessions", *cfg.MaxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator or 'auto' for dynamic limit, maximum number of RTMP streams for Broadcaster, or maximum capacity for transcoder.")
cfg.CurrentManifest = flag.Bool("currentManifest", *cfg.CurrentManifest, "Expose the currently active ManifestID as \"/stream/current.m3u8\"")
cfg.Nvidia = flag.String("nvidia", *cfg.Nvidia, "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
cfg.Netint = flag.String("netint", *cfg.Netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
Expand Down
26 changes: 19 additions & 7 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"os"
"os/user"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -81,7 +82,7 @@
TranscodingOptions *string
MaxAttempts *int
SelectRandFreq *float64
MaxSessions *int
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
Expand Down Expand Up @@ -134,7 +135,7 @@
func DefaultLivepeerConfig() LivepeerConfig {
// Network & Addresses:
defaultNetwork := "offchain"
defaultRtmpAddr := "127.0.0.1:" + RTMPPort

Check warning on line 138 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L138

Added line #L138 was not covered by tests
defaultCliAddr := "127.0.0.1:" + CliPort
defaultHttpAddr := ""
defaultServiceAddr := ""
Expand All @@ -150,7 +151,7 @@
defaultTranscodingOptions := "P240p30fps16x9,P360p30fps16x9"
defaultMaxAttempts := 3
defaultSelectRandFreq := 0.3
defaultMaxSessions := 10
defaultMaxSessions := strconv.Itoa(10)

Check warning on line 154 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L154

Added line #L154 was not covered by tests
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
Expand Down Expand Up @@ -290,9 +291,20 @@
}

func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.MaxSessions <= 0 {
glog.Fatal("-maxSessions must be greater than zero")
return
if *cfg.MaxSessions == "auto" && *cfg.Orchestrator {
if *cfg.Transcoder {
glog.Fatal("-maxSessions 'auto' cannot be used when both -orchestrator and -transcoder are specified")
return
}
core.MaxSessions = 0
} else {
intMaxSessions, err := strconv.Atoi(*cfg.MaxSessions)
if err != nil || intMaxSessions <= 0 {
glog.Fatal("-maxSessions must be 'auto' or greater than zero")
return
}

Check warning on line 305 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L294-L305

Added lines #L294 - L305 were not covered by tests

core.MaxSessions = intMaxSessions

Check warning on line 307 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L307

Added line #L307 was not covered by tests
}

if *cfg.Netint != "" && *cfg.Nvidia != "" {
Expand Down Expand Up @@ -746,6 +758,7 @@
}
}

n.AutoSessionLimit = *cfg.MaxSessions == "auto"

Check warning on line 761 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L761

Added line #L761 was not covered by tests
eliteprox marked this conversation as resolved.
Show resolved Hide resolved
n.AutoAdjustPrice = *cfg.AutoAdjustPrice

ev, _ := new(big.Int).SetString(*cfg.TicketEV, 10)
Expand All @@ -769,7 +782,7 @@

var sm pm.SenderMonitor
if *cfg.RedeemerAddr != "" {
*cfg.RedeemerAddr = defaultAddr(*cfg.RedeemerAddr, "127.0.0.1", RPCPort)

Check warning on line 785 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L785

Added line #L785 was not covered by tests
rc, err := server.NewRedeemerClient(*cfg.RedeemerAddr, senderWatcher, timeWatcher)
if err != nil {
glog.Error("Unable to start redeemer client: ", err)
Expand Down Expand Up @@ -865,7 +878,7 @@
return
}

*cfg.HttpAddr = defaultAddr(*cfg.HttpAddr, "127.0.0.1", RPCPort)

Check warning on line 881 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L881

Added line #L881 was not covered by tests
url, err := url.ParseRequestURI("https://" + *cfg.HttpAddr)
if err != nil {
glog.Error("Could not parse redeemer URI: ", err)
Expand Down Expand Up @@ -981,7 +994,6 @@
}
}

core.MaxSessions = *cfg.MaxSessions
if lpmon.Enabled {
lpmon.MaxSessions(core.MaxSessions)
}
Expand All @@ -1008,8 +1020,8 @@
if n.NodeType == core.BroadcasterNode {
// default lpms listener for broadcaster; same as default rpc port
// TODO provide an option to disable this?
*cfg.RtmpAddr = defaultAddr(*cfg.RtmpAddr, "127.0.0.1", RTMPPort)
*cfg.HttpAddr = defaultAddr(*cfg.HttpAddr, "127.0.0.1", RPCPort)

Check warning on line 1024 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1023-L1024

Added lines #L1023 - L1024 were not covered by tests

bcast := core.NewBroadcaster(n)

Expand Down Expand Up @@ -1197,7 +1209,7 @@
glog.Fatal("Missing -orchAddr")
}

go server.RunTranscoder(n, orchURLs[0].Host, *cfg.MaxSessions, transcoderCaps)
go server.RunTranscoder(n, orchURLs[0].Host, core.MaxSessions, transcoderCaps)

Check warning on line 1212 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1212

Added line #L1212 was not covered by tests
}

switch n.NodeType {
Expand Down Expand Up @@ -1244,7 +1256,7 @@
if len(addrs) > 0 {
for _, addr := range strings.Split(addrs, ",") {
addr = strings.TrimSpace(addr)
addr = defaultAddr(addr, "127.0.0.1", RPCPort)

Check warning on line 1259 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1259

Added line #L1259 was not covered by tests
if !strings.HasPrefix(addr, "http") {
addr = "https://" + addr
}
Expand Down Expand Up @@ -1320,7 +1332,7 @@
glog.Errorf("Could not look up public IP err=%q", err)
return nil, err
}
addr := "https://" + strings.TrimSpace(string(body)) + ":" + RPCPort

Check warning on line 1335 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1335

Added line #L1335 was not covered by tests
inferredUri, err := url.ParseRequestURI(addr)
if err != nil {
glog.Errorf("Could not look up public IP err=%q", err)
Expand All @@ -1340,7 +1352,7 @@
ethUri, err := url.ParseRequestURI(addr)
if err != nil {
glog.Errorf("Could not parse service URI; orchestrator may be unreachable err=%q", err)
ethUri, _ = url.ParseRequestURI("http://127.0.0.1:" + RPCPort)

Check warning on line 1355 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1355

Added line #L1355 was not covered by tests
}
if ethUri.Hostname() != inferredUri.Hostname() || ethUri.Port() != inferredUri.Port() {
glog.Errorf("Service address %v did not match discovered address %v; set the correct address in livepeer_cli or use -serviceAddr", ethUri, inferredUri)
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer_cli/livepeer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (w *wizard) initializeOptions() []wizardOpt {
{desc: "Vote in a poll", invoke: w.vote, orchestrator: true},
{desc: "Set max ticket face value", invoke: w.setMaxFaceValue, orchestrator: true},
{desc: "Set price for broadcaster", invoke: w.setPriceForBroadcaster, orchestrator: true},
{desc: "Set maximum sessions", invoke: w.setMaxSessions, orchestrator: true, notOrchestrator: false},
{desc: "Exit", invoke: func() {
fmt.Println("Goodbye, my friend")
os.Exit(0)
Expand Down
24 changes: 24 additions & 0 deletions cmd/livepeer_cli/wizard_transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,27 @@
}

}

func (w *wizard) setMaxSessions() {
fmt.Println("Enter the maximum # of sessions")
maxSessions := w.readStringAndValidate(func(in string) (string, error) {
intVal, err := strconv.Atoi(in)
if "" == in || (in != "auto" && intVal <= 0 && err != nil) {
return "", fmt.Errorf("Max Sessions must be 'auto' or greater than zero")
}

Check warning on line 348 in cmd/livepeer_cli/wizard_transcoder.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer_cli/wizard_transcoder.go#L342-L348

Added lines #L342 - L348 were not covered by tests

return in, nil

Check warning on line 350 in cmd/livepeer_cli/wizard_transcoder.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer_cli/wizard_transcoder.go#L350

Added line #L350 was not covered by tests
})

data := url.Values{
"maxSessions": {fmt.Sprintf("%v", maxSessions)},
}
result, ok := httpPostWithParams(fmt.Sprintf("http://%v:%v/setMaxSessions", w.host, w.httpPort), data)
if ok {
fmt.Printf(result)
return
} else {
fmt.Printf("Error setting max sessions: %v", result)
return
}

Check warning on line 363 in cmd/livepeer_cli/wizard_transcoder.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer_cli/wizard_transcoder.go#L353-L363

Added lines #L353 - L363 were not covered by tests
}
23 changes: 23 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
"sync"
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/pm"

"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/eth"
lpmon "github.com/livepeer/go-livepeer/monitor"
)

var ErrTranscoderAvail = errors.New("ErrTranscoderUnavailable")
Expand Down Expand Up @@ -80,6 +82,7 @@
Balances *AddressBalances
Capabilities *Capabilities
AutoAdjustPrice bool
AutoSessionLimit bool
// Broadcaster public fields
Sender pm.Sender

Expand Down Expand Up @@ -154,3 +157,23 @@

n.Recipient.SetMaxFaceValue(maxfacevalue)
}

func (n *LivepeerNode) SetMaxSessions(s int) {
n.mu.Lock()
defer n.mu.Unlock()
MaxSessions = s

//update metrics reporting
if lpmon.Enabled {
lpmon.MaxSessions(MaxSessions)
}

Check warning on line 169 in core/livepeernode.go

View check run for this annotation

Codecov / codecov/patch

core/livepeernode.go#L161-L169

Added lines #L161 - L169 were not covered by tests

glog.Infof("Updated session limit to %d", MaxSessions)

Check warning on line 171 in core/livepeernode.go

View check run for this annotation

Codecov / codecov/patch

core/livepeernode.go#L171

Added line #L171 was not covered by tests
}

func (n *LivepeerNode) GetCurrentCapacity() int {
n.TranscoderManager.RTmutex.Lock()
defer n.TranscoderManager.RTmutex.Unlock()
_, totalCapacity, _ := n.TranscoderManager.totalLoadAndCapacity()
return totalCapacity

Check warning on line 178 in core/livepeernode.go

View check run for this annotation

Codecov / codecov/patch

core/livepeernode.go#L174-L178

Added lines #L174 - L178 were not covered by tests
}
9 changes: 9 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,18 @@
coreCaps := CapabilitiesFromNetCapabilities(capabilities)
n.Capabilities.AddCapacity(coreCaps)
defer n.Capabilities.RemoveCapacity(coreCaps)

if n.AutoSessionLimit {
n.SetMaxSessions(n.GetCurrentCapacity() + capacity)
leszko marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 714 in core/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/orchestrator.go#L713-L714

Added lines #L713 - L714 were not covered by tests

// Manage blocks while transcoder is connected
n.TranscoderManager.Manage(stream, capacity, capabilities)
glog.V(common.DEBUG).Infof("Closing transcoder=%s channel", from)

if n.AutoSessionLimit {
defer n.SetMaxSessions(n.GetCurrentCapacity())
}

Check warning on line 722 in core/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/orchestrator.go#L721-L722

Added lines #L721 - L722 were not covered by tests
}

func (rtm *RemoteTranscoderManager) transcoderResults(tcID int64, res *RemoteTranscoderResult) {
Expand Down
21 changes: 21 additions & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,27 @@
})
}

func (s *LivepeerServer) setMaxSessions() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
frmMaxSessions := r.FormValue("maxSessions")
if frmMaxSessions == "auto" {
s.LivepeerNode.AutoSessionLimit = true
s.LivepeerNode.SetMaxSessions(s.LivepeerNode.GetCurrentCapacity())
respondOk(w, []byte("Max sessions set to auto\n"))
} else if maxSessions, err := strconv.Atoi(frmMaxSessions); err == nil {
if maxSessions > 0 {
s.LivepeerNode.AutoSessionLimit = false
s.LivepeerNode.SetMaxSessions(maxSessions)
respondOk(w, []byte(fmt.Sprintf("Max sessions set to %d\n", maxSessions)))
} else {
respond400(w, "Max Sessions must be 'auto' or greater than zero")
}
} else {
respond400(w, err.Error())
}

Check warning on line 598 in server/handlers.go

View check run for this annotation

Codecov / codecov/patch

server/handlers.go#L583-L598

Added lines #L583 - L598 were not covered by tests
})
}

// Bond, withdraw, reward
func bondHandler(client eth.LivepeerEthClient) http.Handler {
return mustHaveClient(client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
1 change: 1 addition & 0 deletions server/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (s *LivepeerServer) cliWebServerHandlers(bindAddr string) *http.ServeMux {
mux.Handle("/setOrchestratorConfig", mustHaveFormParams(s.setOrchestratorConfigHandler(client)))
mux.Handle("/setMaxFaceValue", mustHaveFormParams(s.setMaxFaceValueHandler(), "maxfacevalue"))
mux.Handle("/setPriceForBroadcaster", mustHaveFormParams(s.setPriceForBroadcaster(), "pricePerUnit", "pixelsPerUnit", "broadcasterEthAddr"))
mux.Handle("/setMaxSessions", mustHaveFormParams(s.setMaxSessions(), "maxSessions"))

// Bond, withdraw, reward
mux.Handle("/bond", mustHaveFormParams(bondHandler(client), "amount", "toAddr"))
Expand Down