Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: price prediction for suppliers #1594

Merged
merged 3 commits into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions insonmnia/benchmarks/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ func (m *loader) Load(ctx context.Context) (Mapping, error) {

maxID := benchmarkList.Max()
if maxID <= arrayMappingThreshold {
return newArrayMapping(benchmarkList, maxID), nil
return NewArrayMapping(benchmarkList, maxID), nil
}

return newMapMapping(benchmarkList), nil
return NewMapMapping(benchmarkList), nil
}

func newArrayMapping(benchmarks BenchList, maxID uint64) Mapping {
func NewArrayMapping(benchmarks BenchList, maxID uint64) Mapping {
deviceTypes := make([]sonm.DeviceType, maxID+1)
splittingAlgorithms := make([]sonm.SplittingAlgorithm, maxID+1)

Expand All @@ -52,7 +52,7 @@ func newArrayMapping(benchmarks BenchList, maxID uint64) Mapping {
}
}

func newMapMapping(benchmarks BenchList) Mapping {
func NewMapMapping(benchmarks BenchList) Mapping {
deviceTypes := map[uint64]sonm.DeviceType{}
splittingAlgorithms := map[uint64]sonm.SplittingAlgorithm{}
for _, benchmarks := range benchmarks.MapByDeviceType() {
Expand Down
2 changes: 1 addition & 1 deletion insonmnia/node/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newServices(options *remoteOptions) *services {
blacklist: newBlacklistAPI(options),
profile: newProfileAPI(options),
monitoring: newMonitoringAPI(options),
orderPredictor: optimus.NewPredictorService(options.cfg.Predictor, options.benchList, options.log),
orderPredictor: optimus.NewPredictorService(options.cfg.Predictor, options.eth.Market(), options.benchList, options.dwh, options.log),
}
}

Expand Down
14 changes: 14 additions & 0 deletions optimus/blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,17 @@ func (m *multiBlacklist) Update(ctx context.Context) error {

return nil
}

type emptyBlacklist struct{}

func newEmptyBlacklist() *emptyBlacklist {
return &emptyBlacklist{}
}

func (emptyBlacklist) Update(ctx context.Context) error {
return nil
}

func (emptyBlacklist) IsAllowed(addr common.Address) bool {
return true
}
1 change: 1 addition & 0 deletions optimus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type workerConfig struct {
PreludeTimeout time.Duration `yaml:"prelude_timeout" default:"30s"`
Optimization OptimizationConfig `yaml:"optimization"`
Simulation *simulationConfig `yaml:"simulation"`
VerboseLog bool `yaml:"verbose"`
}

func (m *workerConfig) Validate() error {
Expand Down
51 changes: 49 additions & 2 deletions optimus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type workerEngine struct {
tagger *Tagger
}

func newWorkerEngine(cfg *workerConfig, addr, masterAddr common.Address, blacklist Blacklist, worker sonm.WorkerManagementClient, market blockchain.MarketAPI, marketCache MarketScanner, benchmarkMapping benchmarks.Mapping, tagger *Tagger, log *zap.SugaredLogger) (*workerEngine, error) {
func newWorkerEngine(cfg *workerConfig, addr, masterAddr common.Address, blacklist Blacklist, worker WorkerManagementClientAPI, market blockchain.MarketAPI, marketCache MarketScanner, benchmarkMapping benchmarks.Mapping, tagger *Tagger, log *zap.SugaredLogger) (*workerEngine, error) {
if cfg.DryRun {
log.Infof("activated dry-run mode for this worker")
worker = NewReadOnlyWorker(worker)
Expand Down Expand Up @@ -624,7 +624,9 @@ func (m *workerEngine) matchingOrders(deviceManager *DeviceManager, devices *son

for _, order := range orders {
if err := filter.Filter(order.GetOrder()); err != nil {
log.Debugf("exclude order %s from matching: %v", order.GetOrder().GetId(), err)
if m.cfg.VerboseLog {
log.Debugf("exclude order %s from matching: %v", order.GetOrder().GetId(), err)
}
continue
}

Expand Down Expand Up @@ -702,6 +704,51 @@ type OptimizationMethodFactory interface {
Create(orders, matchedOrders []*MarketOrder, log *zap.SugaredLogger) OptimizationMethod
}

type defaultPredictionOptimizationMethodFactory struct{}

func (m *defaultPredictionOptimizationMethodFactory) Config() interface{} {
return m
}

func (m *defaultPredictionOptimizationMethodFactory) Create(orders, matchedOrders []*MarketOrder, log *zap.SugaredLogger) OptimizationMethod {
if len(matchedOrders) < 64 {
return &BranchBoundModel{
Log: log.With(zap.String("model", "BBM")),
}
}

return &BatchModel{
Methods: []OptimizationMethod{
&GreedyLinearRegressionModel{
orders: orders,
regression: &regressionClassifier{
model: &SCAKKTModel{
MaxIterations: 1e7,
Log: log,
},
},
exhaustionLimit: 128,
log: log.With(zap.String("model", "LLS")),
},
&GeneticModel{
NewGenomeLab: NewPackedOrdersNewGenome,
PopulationSize: 256,
MaxGenerations: 32,
MaxAge: 5 * time.Minute,
Log: log.With(zap.String("model", "GMP")),
},
&GeneticModel{
NewGenomeLab: NewDecisionOrdersNewGenome,
PopulationSize: 512,
MaxGenerations: 24,
MaxAge: 5 * time.Minute,
Log: log.With(zap.String("model", "GMD")),
},
},
Log: log,
}
}

type defaultOptimizationMethodFactory struct{}

func (m *defaultOptimizationMethodFactory) Config() interface{} {
Expand Down
59 changes: 54 additions & 5 deletions optimus/predictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,49 @@ package optimus
import (
"context"
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/sonm-io/core/blockchain"
"github.com/sonm-io/core/insonmnia/benchmarks"
"github.com/sonm-io/core/insonmnia/dwh"
"github.com/sonm-io/core/proto"
"github.com/sonm-io/core/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type engineFactory func(worker WorkerManagementClientAPI) (*workerEngine, error)

func predictionEngineConfig(cfg marketplaceConfig) *workerConfig {
priceThresholdValue := &RelativePriceThreshold{
Int: big.NewInt(int64(5.0 * 1000)),
}

return &workerConfig{
PrivateKey: cfg.PrivateKey,
Epoch: 60 * time.Second,
OrderPolicy: 0,
DryRun: false,
Identity: sonm.IdentityLevel_ANONYMOUS,
PriceThreshold: priceThreshold{
PriceThreshold: priceThresholdValue,
},
StaleThreshold: 5 * time.Minute,
PreludeTimeout: 30 * time.Second,
Optimization: OptimizationConfig{
Model: optimizationMethodFactory{
OptimizationMethodFactory: &defaultPredictionOptimizationMethodFactory{},
},
},
}
}

type PredictorConfig struct {
Blockchain *blockchain.Config
DWH *dwh.DWHConfig
Marketplace marketplaceConfig
}

Expand All @@ -21,14 +54,17 @@ type PredictorService struct {
log *zap.SugaredLogger

mu sync.RWMutex
market blockchain.MarketAPI
marketCache *MarketCache
benchmarks benchmarks.BenchList
regression *regressionClassifier
classification *OrderClassification
engineFactory engineFactory
}

// NewPredictorService constructs a new order price predictor service.
// Returns nil when nil "cfg" is passed.
func NewPredictorService(cfg *PredictorConfig, benchmarks benchmarks.BenchList, log *zap.SugaredLogger) *PredictorService {
func NewPredictorService(cfg *PredictorConfig, market blockchain.MarketAPI, benchmarkList benchmarks.BenchList, dwh sonm.DWHClient, log *zap.SugaredLogger) *PredictorService {
if cfg == nil {
return nil
}
Expand All @@ -40,11 +76,24 @@ func NewPredictorService(cfg *PredictorConfig, benchmarks benchmarks.BenchList,
},
}

engineConfig := predictionEngineConfig(cfg.Marketplace)
blacklist := newEmptyBlacklist()
marketCache := newMarketCache(newMarketScanner(cfg.Marketplace, dwh), cfg.Marketplace.Interval)
benchmarkMapping := benchmarks.NewArrayMapping(benchmarkList, benchmarkList.Max())
tagger := newTagger("predictor")

engineFactory := func(worker WorkerManagementClientAPI) (*workerEngine, error) {
return newWorkerEngine(engineConfig, common.Address{}, common.Address{}, blacklist, worker, market, marketCache, benchmarkMapping, tagger, log)
}

m := &PredictorService{
cfg: cfg,
log: log,
benchmarks: benchmarks,
regression: regression,
cfg: cfg,
log: log,
market: market,
marketCache: marketCache,
benchmarks: benchmarkList,
regression: regression,
engineFactory: engineFactory,
}

return m
Expand Down
19 changes: 17 additions & 2 deletions optimus/predictor_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (m *PredictorService) Predict(ctx context.Context, request *sonm.BidResourc
benchmarksValues[bench.GetID()] = value
}

benchmarks, err := sonm.NewBenchmarks(benchmarksValues)
orderBenchmarks, err := sonm.NewBenchmarks(benchmarksValues)
if err != nil {
return nil, fmt.Errorf("could not parse benchmark values: %s", err)
}

order := &sonm.Order{
Benchmarks: benchmarks,
Benchmarks: orderBenchmarks,
}

price, err := classification.Predictor.PredictPrice(&MarketOrder{
Expand All @@ -58,3 +58,18 @@ func (m *PredictorService) Predict(ctx context.Context, request *sonm.BidResourc
PerSecond: sonm.NewBigInt(priceBigI),
}, nil
}

func (m *PredictorService) PredictSupplier(ctx context.Context, request *sonm.PredictSupplierRequest) (*sonm.PredictSupplierReply, error) {
request.Normalize()

worker := newMockWorker(request.GetDevices())
engine, err := m.engineFactory(worker)
if err != nil {
return nil, err
}
if err := engine.execute(ctx); err != nil {
return nil, err
}

return &sonm.PredictSupplierReply{Price: sonm.SumPrice(worker.Result)}, nil
}
53 changes: 47 additions & 6 deletions optimus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -58,15 +59,23 @@ func (m *namedErrorGroup) ErrorOrNil() error {
return m
}

type WorkerManagementClientAPI interface {
Devices(ctx context.Context, request *sonm.Empty, opts ...grpc.CallOption) (*sonm.DevicesReply, error)
AskPlans(ctx context.Context, request *sonm.Empty, opts ...grpc.CallOption) (*sonm.AskPlansReply, error)
CreateAskPlan(ctx context.Context, request *sonm.AskPlan, opts ...grpc.CallOption) (*sonm.ID, error)
RemoveAskPlan(ctx context.Context, request *sonm.ID, opts ...grpc.CallOption) (*sonm.Empty, error)
NextMaintenance(ctx context.Context, request *sonm.Empty, opts ...grpc.CallOption) (*sonm.Timestamp, error)
}

// WorkerManagementClientExt extends default "WorkerManagementClient" with an
// ability to remove multiple ask-plans.
type WorkerManagementClientExt interface {
sonm.WorkerManagementClient
WorkerManagementClientAPI
RemoveAskPlans(ctx context.Context, ids []string) error
}

type workerManagementClientExt struct {
sonm.WorkerManagementClient
WorkerManagementClientAPI
}

func (m *workerManagementClientExt) RemoveAskPlans(ctx context.Context, ids []string) error {
Expand Down Expand Up @@ -128,15 +137,15 @@ func (m *workerManagementClientExt) RemoveAskPlans(ctx context.Context, ids []st
// immutable operations. It returns some default response for operations that
// mutates something.
type ReadOnlyWorker struct {
sonm.WorkerManagementClient
WorkerManagementClientAPI

mu sync.Mutex
removedPlans map[string]struct{}
}

func NewReadOnlyWorker(worker sonm.WorkerManagementClient) *ReadOnlyWorker {
func NewReadOnlyWorker(worker WorkerManagementClientAPI) *ReadOnlyWorker {
return &ReadOnlyWorker{
WorkerManagementClient: worker,
WorkerManagementClientAPI: worker,

removedPlans: map[string]struct{}{},
}
Expand All @@ -147,7 +156,7 @@ func (m *ReadOnlyWorker) CreateAskPlan(ctx context.Context, in *sonm.AskPlan, op
}

func (m *ReadOnlyWorker) AskPlans(ctx context.Context, in *sonm.Empty, opts ...grpc.CallOption) (*sonm.AskPlansReply, error) {
plans, err := m.WorkerManagementClient.AskPlans(ctx, in, opts...)
plans, err := m.WorkerManagementClientAPI.AskPlans(ctx, in, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -192,3 +201,35 @@ func (m *ReadOnlyWorker) RemoveBenchmark(ctx context.Context, in *sonm.NumericID
func (m *ReadOnlyWorker) PurgeBenchmarks(ctx context.Context, in *sonm.Empty, opts ...grpc.CallOption) (*sonm.Empty, error) {
return &sonm.Empty{}, nil
}

type mockWorker struct {
PredefinedDevices *sonm.DevicesReply
Result []*sonm.AskPlan
}

func newMockWorker(devices *sonm.DevicesReply) *mockWorker {
return &mockWorker{
PredefinedDevices: devices,
}
}

func (m *mockWorker) Devices(ctx context.Context, request *sonm.Empty, opts ...grpc.CallOption) (*sonm.DevicesReply, error) {
return m.PredefinedDevices, nil
}

func (m *mockWorker) AskPlans(ctx context.Context, request *sonm.Empty, opts ...grpc.CallOption) (*sonm.AskPlansReply, error) {
return &sonm.AskPlansReply{}, nil
}

func (m *mockWorker) CreateAskPlan(ctx context.Context, request *sonm.AskPlan, opts ...grpc.CallOption) (*sonm.ID, error) {
m.Result = append(m.Result, request)
return &sonm.ID{Id: "00000000-0000-0000-0000-000000000000"}, nil
}

func (m *mockWorker) RemoveAskPlan(ctx context.Context, request *sonm.ID, opts ...grpc.CallOption) (*sonm.Empty, error) {
return &sonm.Empty{}, nil
}

func (m *mockWorker) NextMaintenance(ctx context.Context, request *sonm.Empty, opts ...grpc.CallOption) (*sonm.Timestamp, error) {
return &sonm.Timestamp{Seconds: math.MaxInt32}, nil
}
2 changes: 2 additions & 0 deletions proto/ask_plan.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions proto/optimus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sonm

func (m *PredictSupplierRequest) Normalize() {
for _, dev := range m.GetDevices().GetGPUs() {
dev.GetDevice().FillHashID()
}
}
Loading