diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index 9c4ba121a..89124d5bf 100644 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -103,8 +103,11 @@ func main() { initializeRound := flag.Bool("initializeRound", false, "Set to true if running as a transcoder and the node should automatically initialize new rounds") ticketEV := flag.String("ticketEV", "1000000000", "The expected value for PM tickets") - // Price Info + // Orchestrator base pricing info pricePerUnit := flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels") + // Broadcaster max acceptable price + maxPricePerUnit := flag.Int("maxPricePerUnit", 0, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price") + // Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice pixelsPerUnit := flag.Int("pixelsPerUnit", 1, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel") // Metrics & logging: @@ -321,13 +324,11 @@ func main() { // Set price per pixel base info if *pixelsPerUnit <= 0 { // Can't divide by 0 - glog.Fatalf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *pixelsPerUnit) - return + panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *pixelsPerUnit)) } if *pricePerUnit <= 0 { // Prevent orchestrator from unknowingly provide free transcoding - glog.Fatalf("Price per unit of pixels must be greater than 0, provided %d instead\n", *pricePerUnit) - return + panic(fmt.Errorf("Price per unit of pixels must be greater than 0, provided %d instead\n", *pricePerUnit)) } n.PriceInfo = big.NewRat(int64(*pricePerUnit), int64(*pixelsPerUnit)) glog.Infof("Price: %d wei for %d pixels\n ", *pricePerUnit, *pixelsPerUnit) @@ -387,6 +388,17 @@ func main() { // TODO: Initialize Sender with an implementation // of RoundsManager that reads from a cache n.Sender = pm.NewSender(n.Eth, n.Eth) + + if *pixelsPerUnit <= 0 { + // Can't divide by 0 + panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *pixelsPerUnit)) + } + if *maxPricePerUnit > 0 { + server.BroadcastCfg.SetMaxPrice(big.NewRat(int64(*maxPricePerUnit), int64(*pixelsPerUnit))) + } else { + glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *maxPricePerUnit) + glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values") + } } // Start services diff --git a/cmd/livepeer_cli/wizard_broadcast.go b/cmd/livepeer_cli/wizard_broadcast.go index 0807fb8fe..498476aef 100644 --- a/cmd/livepeer_cli/wizard_broadcast.go +++ b/cmd/livepeer_cli/wizard_broadcast.go @@ -51,8 +51,14 @@ func (w *wizard) allTranscodingOptions() map[int]string { } func (w *wizard) setBroadcastConfig() { - fmt.Printf("Enter broadcast max price per segment in Wei - ") - maxPricePerSegment := w.readBigInt() + fmt.Printf("Enter a maximum transcoding price per pixel, in wei per pixels (pricePerUnit / pixelsPerUnit).\n") + fmt.Printf("eg. 1 wei / 10 pixels = 0,1 wei per pixel \n") + fmt.Printf("\n") + fmt.Printf("Enter amount of pixels that make up a single unit (default: 1 pixel) - ") + pixelsPerUnit := w.readDefaultInt(1) + fmt.Printf("\n") + fmt.Printf("Enter the maximum price to pay for %d pixels in Wei (required) - ", pixelsPerUnit) + maxPricePerUnit := w.readDefaultInt(0) opts := w.allTranscodingOptions() if opts == nil { @@ -69,7 +75,8 @@ func (w *wizard) setBroadcastConfig() { } val := url.Values{ - "maxPricePerSegment": {fmt.Sprintf("%v", maxPricePerSegment.String())}, + "pixelsPerUnit": {fmt.Sprintf("%v", strconv.Itoa(pixelsPerUnit))}, + "maxPricePerUnit": {fmt.Sprintf("%v", strconv.Itoa(maxPricePerUnit))}, "transcodingOptions": {fmt.Sprintf("%v", transOpts)}, } diff --git a/cmd/livepeer_cli/wizard_stats.go b/cmd/livepeer_cli/wizard_stats.go index 15810f974..d2c36d150 100644 --- a/cmd/livepeer_cli/wizard_stats.go +++ b/cmd/livepeer_cli/wizard_stats.go @@ -360,7 +360,7 @@ func (w *wizard) getEthBalance() string { return e } -func (w *wizard) getBroadcastConfig() (*big.Int, string) { +func (w *wizard) getBroadcastConfig() (*big.Rat, string) { resp, err := http.Get(fmt.Sprintf("http://%v:%v/getBroadcastConfig", w.host, w.httpPort)) if err != nil { glog.Errorf("Error getting broadcast config: %v", err) @@ -375,7 +375,7 @@ func (w *wizard) getBroadcastConfig() (*big.Int, string) { } var config struct { - MaxPricePerSegment *big.Int + MaxPrice *big.Rat TranscodingOptions string } err = json.Unmarshal(result, &config) @@ -384,7 +384,7 @@ func (w *wizard) getBroadcastConfig() (*big.Int, string) { return nil, "" } - return config.MaxPricePerSegment, config.TranscodingOptions + return config.MaxPrice, config.TranscodingOptions } func (w *wizard) getOrchestratorInfo() (lpTypes.Transcoder, error) { diff --git a/discovery/db_discovery.go b/discovery/db_discovery.go index 45467647b..5d7246389 100644 --- a/discovery/db_discovery.go +++ b/discovery/db_discovery.go @@ -1,6 +1,7 @@ package discovery import ( + "math/big" "net/url" "time" @@ -8,6 +9,7 @@ import ( "github.com/livepeer/go-livepeer/core" lpTypes "github.com/livepeer/go-livepeer/eth/types" "github.com/livepeer/go-livepeer/net" + "github.com/livepeer/go-livepeer/server" "github.com/golang/glog" ) @@ -69,7 +71,15 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(numOrchestrators int) ([]*n uris = append(uris, uri) } - orchPool := NewOrchestratorPool(dbo.node, uris) + pred := func(info *net.OrchestratorInfo) bool { + price := server.BroadcastCfg.MaxPrice() + if price != nil { + return big.NewRat(info.PriceInfo.PricePerUnit, info.PriceInfo.PixelsPerUnit).Cmp(price) <= 0 + } + return true + } + + orchPool := NewOrchestratorPoolWithPred(dbo.node, uris, pred) orchInfos, err := orchPool.GetOrchestrators(numOrchestrators) if err != nil || len(orchInfos) <= 0 { diff --git a/discovery/discovery.go b/discovery/discovery.go index 971522eb1..690b894ff 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -24,6 +24,7 @@ var serverGetOrchInfo = server.GetOrchestratorInfo type orchestratorPool struct { uris []*url.URL bcast server.Broadcaster + pred func(info *net.OrchestratorInfo) bool } var perm = func(len int) []int { return rand.Perm(len) } @@ -57,6 +58,17 @@ func NewOrchestratorPool(node *core.LivepeerNode, addresses []string) *orchestra return &orchestratorPool{bcast: bcast, uris: randomizedUris} } +func NewOrchestratorPoolWithPred(node *core.LivepeerNode, addresses []string, pred func(*net.OrchestratorInfo) bool) *orchestratorPool { + // if livepeer running in offchain mode, return nil + if node.Eth == nil { + glog.Error("Could not refresh DB list of orchestrators: LivepeerNode nil") + return nil + } + pool := NewOrchestratorPool(node, addresses) + pool.pred = pred + return pool +} + func NewOnchainOrchestratorPool(node *core.LivepeerNode) *orchestratorPool { // if livepeer running in offchain mode, return nil if node.Eth == nil { @@ -97,7 +109,7 @@ func (o *orchestratorPool) GetOrchestrators(numOrchestrators int) ([]*net.Orches respLock.Lock() defer respLock.Unlock() numResp++ - if err == nil { + if err == nil && (o.pred == nil || o.pred(info)) { orchInfos = append(orchInfos, info) numSuccessResp++ } else if monitor.Enabled { diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 10f164de6..9d6536995 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -2,14 +2,17 @@ package discovery import ( "context" + "math/big" "math/rand" "net/url" "runtime" + "strconv" "sync" "testing" "time" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/golang/glog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-livepeer/eth" @@ -52,6 +55,33 @@ func StubOrchestrators(addresses []string) []*lpTypes.Transcoder { return orchestrators } +func TestNewOrchestratorPoolWithPred_NilEthClient_ReturnsNil_LogsError(t *testing.T) { + node, _ := core.NewLivepeerNode(nil, "", nil) + errorLogsBefore := glog.Stats.Error.Lines() + pool := NewOrchestratorPoolWithPred(node, nil, nil) + errorLogsAfter := glog.Stats.Error.Lines() + assert.NotZero(t, errorLogsAfter-errorLogsBefore) + assert.Nil(t, pool) +} + +func TestNewOnchainOrchestratorPool_NilEthClient_ReturnsNil_LogsError(t *testing.T) { + node, _ := core.NewLivepeerNode(nil, "", nil) + errorLogsBefore := glog.Stats.Error.Lines() + pool := NewOnchainOrchestratorPool(node) + errorLogsAfter := glog.Stats.Error.Lines() + assert.NotZero(t, errorLogsAfter-errorLogsBefore) + assert.Nil(t, pool) +} + +func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsNil_LogsError(t *testing.T) { + node, _ := core.NewLivepeerNode(nil, "", nil) + errorLogsBefore := glog.Stats.Error.Lines() + poolCache := NewDBOrchestratorPoolCache(node) + errorLogsAfter := glog.Stats.Error.Lines() + assert.NotZero(t, errorLogsAfter-errorLogsBefore) + assert.Nil(t, poolCache) +} + func TestDeadLock(t *testing.T) { gmp := runtime.GOMAXPROCS(50) defer runtime.GOMAXPROCS(gmp) @@ -70,7 +100,6 @@ func TestDeadLock(t *testing.T) { for i := 0; i < 50; i++ { addresses = append(addresses, "https://127.0.0.1:8936") } - assert := assert.New(t) pool := NewOrchestratorPool(nil, addresses) infos, err := pool.GetOrchestrators(1) @@ -79,6 +108,53 @@ func TestDeadLock(t *testing.T) { assert.Equal("transcoderfromtestserver", infos[0].Transcoder) } +func TestDeadLock_NewOrchestratorPoolWithPred(t *testing.T) { + gmp := runtime.GOMAXPROCS(50) + defer runtime.GOMAXPROCS(gmp) + var mu sync.Mutex + first := true + serverGetOrchInfo = func(ctx context.Context, bcast server.Broadcaster, orchestratorServer *url.URL) (*net.OrchestratorInfo, error) { + mu.Lock() + if first { + time.Sleep(100 * time.Millisecond) + first = false + } + mu.Unlock() + return &net.OrchestratorInfo{ + Transcoder: "transcoderfromtestserver", + PriceInfo: &net.PriceInfo{ + PricePerUnit: 5, + PixelsPerUnit: 1, + }, + }, nil + } + addresses := []string{} + for i := 0; i < 50; i++ { + addresses = append(addresses, "https://127.0.0.1:8936") + } + + assert := assert.New(t) + pred := func(info *net.OrchestratorInfo) bool { + price := server.BroadcastCfg.MaxPrice() + if price != nil { + return big.NewRat(info.PriceInfo.PricePerUnit, info.PriceInfo.PixelsPerUnit).Cmp(price) <= 0 + } + return true + } + + orchestrators := StubOrchestrators(addresses) + + node, _ := core.NewLivepeerNode(nil, "", nil) + node.Eth = ð.StubClient{Orchestrators: orchestrators} + + pool := NewOrchestratorPoolWithPred(node, addresses, pred) + infos, err := pool.GetOrchestrators(1) + + assert.Nil(err, "Should not be error") + assert.Len(infos, 1, "Should return one orchestrator") + assert.Equal("transcoderfromtestserver", infos[0].Transcoder) +} + func TestPoolSize(t *testing.T) { addresses := []string{"https://127.0.0.1:8936", "https://127.0.0.1:8937", "https://127.0.0.1:8938"} @@ -86,9 +162,39 @@ func TestPoolSize(t *testing.T) { pool := NewOrchestratorPool(nil, addresses) assert.Equal(3, pool.Size()) + // will results in len(uris) <= 0 -> log Error + errorLogsBefore := glog.Stats.Error.Lines() pool = NewOrchestratorPool(nil, nil) + errorLogsAfter := glog.Stats.Error.Lines() assert.Equal(0, pool.Size()) + assert.NotZero(t, errorLogsAfter-errorLogsBefore) +} + +func TestDbOrchestratorPoolCacheSize(t *testing.T) { + dbh, dbraw, err := common.TempDB(t) + defer dbh.Close() + defer dbraw.Close() + require := require.New(t) + require.Nil(err) + node, _ := core.NewLivepeerNode(nil, "", nil) + node.Database = dbh + node.Eth = ð.StubClient{} + + errorLogsBefore := glog.Stats.Error.Lines() + // No orchestrators on eth.Stubclient will result in no registered orchs found on chain + emptyPool := NewDBOrchestratorPoolCache(node) + errorLogsAfter := glog.Stats.Error.Lines() + require.NotNil(emptyPool) + assert.Equal(t, 0, emptyPool.Size()) + assert.NotZero(t, errorLogsAfter-errorLogsBefore) + + addresses := []string{"https://127.0.0.1:8936", "https://127.0.0.1:8937", "https://127.0.0.1:8938"} + orchestrators := StubOrchestrators(addresses) + node.Eth = ð.StubClient{Orchestrators: orchestrators} + nonEmptyPool := NewDBOrchestratorPoolCache(node) + require.NotNil(nonEmptyPool) + assert.Equal(t, len(addresses), emptyPool.Size()) } func TestCacheRegisteredTranscoders_GivenListOfOrchs_CreatesPoolCacheCorrectly(t *testing.T) { @@ -120,12 +226,6 @@ func TestNewDBOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t node, _ := core.NewLivepeerNode(nil, "", nil) node.Database = dbh - // check size for empty db - node.Eth = ð.StubClient{} - emptyPool := NewDBOrchestratorPoolCache(node) - require.NotNil(emptyPool) - assert.Equal(0, emptyPool.Size()) - // adding orchestrators to DB addresses := []string{"https://127.0.0.1:8936", "https://127.0.0.1:8937", "https://127.0.0.1:8938"} orchestrators := StubOrchestrators(addresses) @@ -186,11 +286,16 @@ func TestNewDBOrchestratorPoolCache_TestURLs_Empty(t *testing.T) { node.Database = dbh addresses := []string{} + // Addresses is empty slice -> No orchestrators orchestrators := StubOrchestrators(addresses) + // Contains empty orchestrator slice -> No registered orchs found on chain error node.Eth = ð.StubClient{Orchestrators: orchestrators} + errorLogsBefore := glog.Stats.Error.Lines() dbOrch := NewDBOrchestratorPoolCache(node) + errorLogsAfter := glog.Stats.Error.Lines() require.NotNil(dbOrch) assert.Equal(0, dbOrch.Size()) + assert.NotZero(t, errorLogsAfter-errorLogsBefore) urls := dbOrch.GetURLs() assert.Len(urls, 0) } @@ -222,3 +327,272 @@ func TestNewOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t * assert.Equal(uri.String(), expected[i]) } } + +func TestNewOrchestratorPoolWithPred_TestPredicate(t *testing.T) { + pred := func(info *net.OrchestratorInfo) bool { + price := server.BroadcastCfg.MaxPrice() + if price != nil { + return big.NewRat(info.PriceInfo.PricePerUnit, info.PriceInfo.PixelsPerUnit).Cmp(price) <= 0 + } + return true + } + + addresses := []string{} + for i := 0; i < 50; i++ { + addresses = append(addresses, "https://127.0.0.1:8936") + } + orchestrators := StubOrchestrators(addresses) + + node, _ := core.NewLivepeerNode(nil, "", nil) + node.Eth = ð.StubClient{Orchestrators: orchestrators} + + pool := NewOrchestratorPoolWithPred(node, addresses, pred) + + oInfo := &net.OrchestratorInfo{ + PriceInfo: &net.PriceInfo{ + PricePerUnit: 5, + PixelsPerUnit: 1, + }, + } + + // server.BroadcastCfg.maxPrice not yet set, predicate should return true + assert.True(t, pool.pred(oInfo)) + + // Set server.BroadcastCfg.maxPrice higher than PriceInfo , should return true + server.BroadcastCfg.SetMaxPrice(big.NewRat(10, 1)) + assert.True(t, pool.pred(oInfo)) + + // Set MaxBroadcastPrice lower than PriceInfo, should return false + server.BroadcastCfg.SetMaxPrice(big.NewRat(1, 1)) + assert.False(t, pool.pred(oInfo)) +} + +func TestCachedPool_AllOrchestratorsTooExpensive_ReturnsEmptyList(t *testing.T) { + // Test setup + perm = func(len int) []int { return rand.Perm(50) } + + server.BroadcastCfg.SetMaxPrice(big.NewRat(10, 1)) + gmp := runtime.GOMAXPROCS(50) + defer runtime.GOMAXPROCS(gmp) + var mu sync.Mutex + first := true + serverGetOrchInfo = func(ctx context.Context, bcast server.Broadcaster, orchestratorServer *url.URL) (*net.OrchestratorInfo, error) { + mu.Lock() + if first { + time.Sleep(100 * time.Millisecond) + first = false + } + mu.Unlock() + return &net.OrchestratorInfo{ + Transcoder: "transcoderfromtestserver", + PriceInfo: &net.PriceInfo{ + PricePerUnit: 999, + PixelsPerUnit: 1, + }, + }, nil + } + addresses := []string{} + for i := 0; i < 50; i++ { + addresses = append(addresses, "https://127.0.0.1:"+strconv.Itoa(8936+i)) + } + + assert := assert.New(t) + + // Create Database + dbh, dbraw, err := common.TempDB(t) + defer dbh.Close() + defer dbraw.Close() + require := require.New(t) + require.Nil(err) + + orchestrators := StubOrchestrators(addresses) + + // Create node + node, _ := core.NewLivepeerNode(nil, "", nil) + node.Database = dbh + node.Eth = ð.StubClient{Orchestrators: orchestrators} + + // Add orchs to DB + cachedOrchs, err := cacheDBOrchs(node, orchestrators) + require.Nil(err) + assert.Len(cachedOrchs, 50) + assert.Equal(cachedOrchs[1].ServiceURI, addresses[1]) + + // ensuring orchs exist in DB + orchs, err := node.Database.SelectOrchs() + require.Nil(err) + assert.Len(orchs, 50) + assert.Equal(orchs[0].ServiceURI, addresses[0]) + + // creating new OrchestratorPoolCache + dbOrch := NewDBOrchestratorPoolCache(node) + require.NotNil(dbOrch) + + // check size + assert.Equal(50, dbOrch.Size()) + + urls := dbOrch.GetURLs() + assert.Len(urls, 50) + infos, err := dbOrch.GetOrchestrators(len(addresses)) + + assert.Nil(err, "Should not be error") + assert.Len(infos, 0) +} + +func TestCachedPool_GetOrchestrators_MaxBroadcastPriceNotSet(t *testing.T) { + // Test setup + server.BroadcastCfg.SetMaxPrice(nil) + perm = func(len int) []int { return rand.Perm(50) } + + gmp := runtime.GOMAXPROCS(50) + defer runtime.GOMAXPROCS(gmp) + var mu sync.Mutex + first := true + serverGetOrchInfo = func(ctx context.Context, bcast server.Broadcaster, orchestratorServer *url.URL) (*net.OrchestratorInfo, error) { + mu.Lock() + if first { + time.Sleep(100 * time.Millisecond) + first = false + } + mu.Unlock() + return &net.OrchestratorInfo{ + Transcoder: "transcoderfromtestserver", + PriceInfo: &net.PriceInfo{ + PricePerUnit: 999, + PixelsPerUnit: 1, + }, + }, nil + } + addresses := []string{} + for i := 0; i < 50; i++ { + addresses = append(addresses, "https://127.0.0.1:"+strconv.Itoa(8936+i)) + } + + assert := assert.New(t) + + // Create Database + dbh, dbraw, err := common.TempDB(t) + defer dbh.Close() + defer dbraw.Close() + require := require.New(t) + require.Nil(err) + + orchestrators := StubOrchestrators(addresses) + + // Create node + node, _ := core.NewLivepeerNode(nil, "", nil) + node.Database = dbh + node.Eth = ð.StubClient{Orchestrators: orchestrators} + + // Add orchs to DB + cachedOrchs, err := cacheDBOrchs(node, orchestrators) + require.Nil(err) + assert.Len(cachedOrchs, 50) + assert.Equal(cachedOrchs[1].ServiceURI, addresses[1]) + + // ensuring orchs exist in DB + orchs, err := node.Database.SelectOrchs() + require.Nil(err) + assert.Len(orchs, 50) + assert.Equal(orchs[0].ServiceURI, addresses[0]) + + // creating new OrchestratorPoolCache + dbOrch := NewDBOrchestratorPoolCache(node) + require.NotNil(dbOrch) + + // check size + assert.Equal(50, dbOrch.Size()) + + urls := dbOrch.GetURLs() + assert.Len(urls, 50) + infos, err := dbOrch.GetOrchestrators(50) + + assert.Nil(err, "Should not be error") + assert.Len(infos, 50) +} + +func TestCachedPool_N_OrchestratorsGoodPricing_ReturnsNOrchestrators(t *testing.T) { + // Test setup + perm = func(len int) []int { return rand.Perm(50) } + + server.BroadcastCfg.SetMaxPrice(big.NewRat(10, 1)) + gmp := runtime.GOMAXPROCS(50) + defer runtime.GOMAXPROCS(gmp) + var mu sync.Mutex + first := true + serverGetOrchInfo = func(ctx context.Context, bcast server.Broadcaster, orchestratorServer *url.URL) (*net.OrchestratorInfo, error) { + mu.Lock() + if first { + time.Sleep(100 * time.Millisecond) + first = false + } + mu.Unlock() + if i, _ := strconv.Atoi(orchestratorServer.Port()); i > 8960 { + // Return valid pricing + return &net.OrchestratorInfo{ + Transcoder: "goodPriceTranscoder", + PriceInfo: &net.PriceInfo{ + PricePerUnit: 1, + PixelsPerUnit: 1, + }, + }, nil + } + // Return invalid pricing + return &net.OrchestratorInfo{ + Transcoder: "badPriceTranscoder", + PriceInfo: &net.PriceInfo{ + PricePerUnit: 999, + PixelsPerUnit: 1, + }, + }, nil + } + addresses := []string{} + for i := 0; i < 50; i++ { + addresses = append(addresses, "https://127.0.0.1:"+strconv.Itoa(8936+i)) + } + + assert := assert.New(t) + + // Create Database + dbh, dbraw, err := common.TempDB(t) + defer dbh.Close() + defer dbraw.Close() + require := require.New(t) + require.Nil(err) + + orchestrators := StubOrchestrators(addresses) + + // Create node + node, _ := core.NewLivepeerNode(nil, "", nil) + node.Database = dbh + node.Eth = ð.StubClient{Orchestrators: orchestrators} + + // Add orchs to DB + cachedOrchs, err := cacheDBOrchs(node, orchestrators) + require.Nil(err) + assert.Len(cachedOrchs, 50) + assert.Equal(cachedOrchs[1].ServiceURI, addresses[1]) + + // ensuring orchs exist in DB + orchs, err := node.Database.SelectOrchs() + require.Nil(err) + assert.Len(orchs, 50) + assert.Equal(orchs[0].ServiceURI, addresses[0]) + + // creating new OrchestratorPoolCache + dbOrch := NewDBOrchestratorPoolCache(node) + require.NotNil(dbOrch) + + // check size + assert.Equal(50, dbOrch.Size()) + + urls := dbOrch.GetURLs() + assert.Len(urls, 50) + infos, err := dbOrch.GetOrchestrators(len(addresses)) + + assert.Nil(err, "Should not be error") + assert.Len(infos, 25) + for _, info := range infos { + assert.Equal(info.Transcoder, "goodPriceTranscoder") + } +} diff --git a/server/broadcast.go b/server/broadcast.go index 612859f0f..efcd7e5f3 100644 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -23,6 +23,25 @@ import ( "github.com/livepeer/lpms/stream" ) +var BroadcastCfg = &BroadcastConfig{} + +type BroadcastConfig struct { + maxPrice *big.Rat + mu sync.RWMutex +} + +func (cfg *BroadcastConfig) MaxPrice() *big.Rat { + cfg.mu.RLock() + defer cfg.mu.RUnlock() + return cfg.maxPrice +} + +func (cfg *BroadcastConfig) SetMaxPrice(price *big.Rat) { + cfg.mu.RLock() + defer cfg.mu.RUnlock() + cfg.maxPrice = price +} + type BroadcastSessionsManager struct { // Accessing or changing any of the below requires ownership of this mutex sessLock *sync.Mutex diff --git a/server/mediaserver.go b/server/mediaserver.go index 30ed14559..2f2e8b8e3 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -55,7 +55,6 @@ const StreamKeyBytes = 6 const SegLen = 2 * time.Second const BroadcastRetry = 15 * time.Second -var BroadcastPrice = big.NewInt(1) var BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ffmpeg.P240p30fps4x3, ffmpeg.P360p30fps16x9} var AuthWebhookURL string diff --git a/server/webserver.go b/server/webserver.go index bf74e0ba3..611e6c027 100644 --- a/server/webserver.go +++ b/server/webserver.go @@ -76,17 +76,29 @@ func (s *LivepeerServer) cliWebServerHandlers(bindAddr string) *http.ServeMux { return } - priceStr := r.FormValue("maxPricePerSegment") - if priceStr == "" { - glog.Errorf("Need to provide max price per segment") + pricePerUnit := r.FormValue("maxPricePerUnit") + pr, err := strconv.ParseInt(pricePerUnit, 10, 64) + if err != nil { + glog.Errorf("Error converting string to int64: %v\n", err) return } - price, err := lpcommon.ParseBigInt(priceStr) + + pixelsPerUnit := r.FormValue("pixelsPerUnit") + px, err := strconv.ParseInt(pixelsPerUnit, 10, 64) if err != nil { - glog.Error(err) + glog.Errorf("Error converting string to int64: %v\n", err) + return + } + if px <= 0 { + glog.Errorf("pixels per unit must be greater than 0, provided %d\n", px) return } + var price *big.Rat + if pr > 0 { + price = big.NewRat(pr, px) + } + transcodingOptions := r.FormValue("transcodingOptions") if transcodingOptions == "" { glog.Errorf("Need to provide transcoding options") @@ -104,11 +116,14 @@ func (s *LivepeerServer) cliWebServerHandlers(bindAddr string) *http.ServeMux { glog.Errorf("Invalid transcoding options: %v", transcodingOptions) return } - - BroadcastPrice = price + BroadcastCfg.SetMaxPrice(price) BroadcastJobVideoProfiles = profiles - - glog.Infof("Transcode Job Price: %v, Transcode Job Type: %v", BroadcastPrice, BroadcastJobVideoProfiles) + if price != nil { + glog.Infof("Maximum transcoding price: %d per %q pixels\n", pr, px) + } else { + glog.Info("Maximum transcoding price per pixel not set, broadcaster is currently set to accept ANY price.\n") + } + glog.Infof("Transcode Job Type: %v", BroadcastJobVideoProfiles) }) mux.HandleFunc("/getBroadcastConfig", func(w http.ResponseWriter, r *http.Request) { @@ -117,10 +132,10 @@ func (s *LivepeerServer) cliWebServerHandlers(bindAddr string) *http.ServeMux { pNames = append(pNames, p.Name) } config := struct { - MaxPricePerSegment *big.Int + MaxPrice *big.Rat TranscodingOptions string }{ - BroadcastPrice, + BroadcastCfg.MaxPrice(), strings.Join(pNames, ","), } diff --git a/test_args.sh b/test_args.sh index 1fa66230e..62d690f13 100755 --- a/test_args.sh +++ b/test_args.sh @@ -45,6 +45,11 @@ run_lp -broadcaster -network rinkeby $ETH_ARGS [ -d "$DEFAULT_DATADIR"/rinkeby ] kill $pid +# Error if flags to set MaxBroadcastPrice aren't provided correctly +res=0 +./livepeer -broadcaster -network rinkeby $ETH_ARGS -maxPricePerUnit 0 -pixelsPerUnit -5 || res=$? +[ $res -ne 0 ] + run_lp -broadcaster -network mainnet $ETH_ARGS [ -d "$DEFAULT_DATADIR"/mainnet ] kill $pid