Skip to content

Commit

Permalink
Add ctx to logging for selection algorithm (#3080)
Browse files Browse the repository at this point in the history
* Add ctx to logging for selection algorithm

* Reorg imports

* Fix unit tests
  • Loading branch information
leszko committed Jun 14, 2024
1 parent df4c106 commit b62c60c
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 14 deletions.
2 changes: 1 addition & 1 deletion common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type OrchestratorPool interface {
}

type SelectionAlgorithm interface {
Select(addrs []ethcommon.Address, stakes map[ethcommon.Address]int64, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) ethcommon.Address
Select(ctx context.Context, addrs []ethcommon.Address, stakes map[ethcommon.Address]int64, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) ethcommon.Address
}

type PerfScore struct {
Expand Down
2 changes: 1 addition & 1 deletion server/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *MinLSSelector) selectUnknownSession(ctx context.Context) *BroadcastSess
s.perfScore.Mu.Unlock()
}

selected := s.selectionAlgorithm.Select(addrs, stakes, maxPrice, prices, perfScores)
selected := s.selectionAlgorithm.Select(ctx, addrs, stakes, maxPrice, prices, perfScores)

for i, sess := range s.unknownSessions {
if sess.OrchestratorInfo.GetTicketParams() == nil {
Expand Down
21 changes: 11 additions & 10 deletions server/selection_algorithm.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package server

import (
"context"
"math"
"math/big"
"math/rand"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/clog"
)

var random = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -22,18 +23,18 @@ type ProbabilitySelectionAlgorithm struct {
PriceExpFactor float64
}

func (sa ProbabilitySelectionAlgorithm) Select(addrs []ethcommon.Address, stakes map[ethcommon.Address]int64, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) ethcommon.Address {
filtered := sa.filter(addrs, maxPrice, prices, perfScores)
func (sa ProbabilitySelectionAlgorithm) Select(ctx context.Context, addrs []ethcommon.Address, stakes map[ethcommon.Address]int64, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) ethcommon.Address {
filtered := sa.filter(ctx, addrs, maxPrice, prices, perfScores)
probabilities := sa.calculateProbabilities(filtered, stakes, prices)
return selectBy(probabilities)
}

func (sa ProbabilitySelectionAlgorithm) filter(addrs []ethcommon.Address, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) []ethcommon.Address {
filteredByPerfScore := sa.filterByPerfScore(addrs, perfScores)
return sa.filterByMaxPrice(filteredByPerfScore, maxPrice, prices)
func (sa ProbabilitySelectionAlgorithm) filter(ctx context.Context, addrs []ethcommon.Address, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) []ethcommon.Address {
filteredByPerfScore := sa.filterByPerfScore(ctx, addrs, perfScores)
return sa.filterByMaxPrice(ctx, filteredByPerfScore, maxPrice, prices)
}

func (sa ProbabilitySelectionAlgorithm) filterByPerfScore(addrs []ethcommon.Address, scores map[ethcommon.Address]float64) []ethcommon.Address {
func (sa ProbabilitySelectionAlgorithm) filterByPerfScore(ctx context.Context, addrs []ethcommon.Address, scores map[ethcommon.Address]float64) []ethcommon.Address {
if sa.MinPerfScore <= 0 || len(scores) == 0 {
// Performance Score filter not defined, return all Orchestrators
return addrs
Expand All @@ -49,13 +50,13 @@ func (sa ProbabilitySelectionAlgorithm) filterByPerfScore(addrs []ethcommon.Addr
if len(res) == 0 {
// If no orchestrators pass the perf filter, return all Orchestrators.
// That may mean some issues with the PerfScore service.
glog.Warningf("No Orchestrators passed min performance score filter, not using the filter, numAddrs=%d, minPerfScore=%v, scores=%v, addrs=%v", len(addrs), sa.MinPerfScore, scores, addrs)
clog.Warningf(ctx, "No Orchestrators passed min performance score filter, not using the filter, numAddrs=%d, minPerfScore=%v, scores=%v, addrs=%v", len(addrs), sa.MinPerfScore, scores, addrs)
return addrs
}
return res
}

func (sa ProbabilitySelectionAlgorithm) filterByMaxPrice(addrs []ethcommon.Address, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat) []ethcommon.Address {
func (sa ProbabilitySelectionAlgorithm) filterByMaxPrice(ctx context.Context, addrs []ethcommon.Address, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat) []ethcommon.Address {
if maxPrice == nil || len(prices) == 0 {
// Max price filter not defined, return all Orchestrators
return addrs
Expand All @@ -72,7 +73,7 @@ func (sa ProbabilitySelectionAlgorithm) filterByMaxPrice(addrs []ethcommon.Addre
if len(res) == 0 {
// If no orchestrators pass the filter, return all Orchestrators
// It means that no orchestrators are below the max price
glog.Warningf("No Orchestrators passed max price filter, not using the filter, numAddrs=%d, maxPrice=%v, prices=%v, addrs=%v", len(addrs), maxPrice, prices, addrs)
clog.Warningf(ctx, "No Orchestrators passed max price filter, not using the filter, numAddrs=%d, maxPrice=%v, prices=%v, addrs=%v", len(addrs), maxPrice, prices, addrs)
return addrs
}
return res
Expand Down
3 changes: 2 additions & 1 deletion server/selection_algorithm_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"math/big"
"testing"

Expand Down Expand Up @@ -208,7 +209,7 @@ func TestFilter(t *testing.T) {
MinPerfScore: tt.orchMinPerfScore,
}

res := sa.filter(addrs, maxPrice, prices, perfScores)
res := sa.filter(context.Background(), addrs, maxPrice, prices, perfScores)

var exp []ethcommon.Address
for _, o := range tt.want {
Expand Down
2 changes: 1 addition & 1 deletion server/selection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *stubStakeReader) SetStakes(stakes map[ethcommon.Address]int64) {

type stubSelectionAlgorithm struct{}

func (sa stubSelectionAlgorithm) Select(addrs []ethcommon.Address, stakes map[ethcommon.Address]int64, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) ethcommon.Address {
func (sa stubSelectionAlgorithm) Select(ctx context.Context, addrs []ethcommon.Address, stakes map[ethcommon.Address]int64, maxPrice *big.Rat, prices map[ethcommon.Address]*big.Rat, perfScores map[ethcommon.Address]float64) ethcommon.Address {
if len(addrs) == 0 {
return ethcommon.Address{}
}
Expand Down

0 comments on commit b62c60c

Please sign in to comment.