Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: keep internal state for Connor #1545

Merged
merged 3 commits into from Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 82 additions & 1 deletion connor/engine.go
Expand Up @@ -44,6 +44,7 @@ type engine struct {
ordersCreateChan chan *types.Corder
ordersResultsChan chan *types.Corder
orderCancelChan chan *types.CorderCancelTuple
state *state
}

var (
Expand All @@ -66,13 +67,25 @@ var (
Name: "sonm_orders_replaced",
Help: "Number of orders that were re-created on marker because of price deviation",
})

adoptedOrdersCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sonm_orders_adopted",
Help: "Number of orders restored in runtime",
})

adoptedDealsCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sonm_deals_adopted",
Help: "Number of deals restored in runtime",
})
)

func init() {
prometheus.MustRegister(activeDealsGauge)
prometheus.MustRegister(activeOrdersGauge)
prometheus.MustRegister(createdOrdersCounter)
prometheus.MustRegister(replacedOrdersCounter)
prometheus.MustRegister(adoptedOrdersCounter)
prometheus.MustRegister(adoptedDealsCounter)
}

func New(ctx context.Context, cfg *Config, log *zap.Logger) (*engine, error) {
Expand Down Expand Up @@ -105,6 +118,7 @@ func New(ctx context.Context, cfg *Config, log *zap.Logger) (*engine, error) {
return &engine{
cfg: cfg,
log: log,
state: NewState(log),
ethAddr: crypto.PubkeyToAddress(key.PublicKey),

priceProvider: cfg.backends().priceProvider,
Expand Down Expand Up @@ -153,14 +167,20 @@ func (e *engine) Serve(ctx context.Context) error {
return fmt.Errorf("failed to restore market state: %v", err)
}

wg.Go(func() error {
return e.waitForExternalUpdates(ctx)
})

return wg.Wait()
}

func (e *engine) CreateOrder(bid *types.Corder) {
e.state.AddQueuedOrder(bid)
e.ordersCreateChan <- bid
}

func (e *engine) CancelOrder(order *types.Corder) {
e.state.AddActiveOrder(order)
e.orderCancelChan <- types.NewCorderCancelTuple(order)
}

Expand All @@ -171,6 +191,7 @@ func (e *engine) RestoreOrder(order *types.Corder) {

func (e *engine) RestoreDeal(ctx context.Context, deal *sonm.Deal) {
e.log.Debug("restoring deal", zap.String("id", deal.GetId().Unwrap().String()))
e.state.AddDeal(e.dealFactory.FromDeal(deal))
go e.processDeal(ctx, deal)
}

Expand Down Expand Up @@ -199,7 +220,9 @@ func (e *engine) processOrderCreate(ctx context.Context) {
}
e.log.Debug("order successfully created", zap.String("order_id", created.GetId().Unwrap().String()))
createdOrdersCounter.Inc()
e.ordersResultsChan <- e.corderFactory.FromOrder(created)
corder := e.corderFactory.FromOrder(created)
e.state.DeleteQueuedOrder(corder)
e.ordersResultsChan <- corder
}
}

Expand All @@ -222,6 +245,7 @@ func (e *engine) processOrderCancel(ctx context.Context) {
continue
}

e.state.DeleteActiveOrder(tuple.Corder)
e.log.Debug("order cancelled", zap.String("order_id", tuple.Corder.GetId().Unwrap().String()))
}
}
Expand Down Expand Up @@ -252,6 +276,7 @@ func (e *engine) getOrderByID(ctx context.Context, id string) (*sonm.Order, erro

func (e *engine) processOrderResult(ctx context.Context) {
for order := range e.ordersResultsChan {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this should not be async

e.state.AddActiveOrder(order)
go e.waitForDeal(ctx, order)
}
}
Expand Down Expand Up @@ -308,6 +333,8 @@ func (e *engine) checkOrderForDealOnce(ctx context.Context, log *zap.Logger, ord

if ord.GetOrderStatus() == sonm.OrderStatus_ORDER_INACTIVE {
activeOrdersGauge.Dec()
e.state.DeleteActiveOrder(e.corderFactory.FromOrder(ord))
// TODO: (in a separate PR) check for `one_shot` parameter, do not re-create order.
log.Info("order becomes inactive, looking for related deal")

if ord.GetDealID().IsZero() {
Expand Down Expand Up @@ -342,6 +369,7 @@ func (e *engine) processDeal(ctx context.Context, deal *sonm.Deal) {

log.Debug("start deal processing")
defer log.Debug("stop deal processing")
defer e.state.DeleteDeal(e.dealFactory.FromDeal(deal))

e.antiFraud.DealOpened(deal)
defer e.antiFraud.FinishDeal(ctx, deal, antifraud.AllChecks)
Expand Down Expand Up @@ -822,6 +850,59 @@ func (e *engine) restoreMarketState(ctx context.Context) error {
return nil
}

func (e *engine) waitForExternalUpdates(ctx context.Context) error {
tk := util.NewImmediateTicker(time.Minute)
defer tk.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tk.C:
e.adoptExternalDeals(ctx)
e.adoptExternalOrders(ctx)
e.state.DumpToFile()
}
}
}

func (e *engine) adoptExternalDeals(ctx context.Context) {
reqCtx, cancel := context.WithTimeout(ctx, e.cfg.Engine.ConnectionTimeout)
defer cancel()

deals, err := e.deals.List(reqCtx, &sonm.Count{Count: 1000})
if err != nil {
e.log.Warn("failed to load deals from DWH", zap.Error(err))
return
}

for _, deal := range deals.GetDeal() {
if !e.state.HasDeal(e.dealFactory.FromDeal(deal)) {
adoptedDealsCounter.Inc()
e.RestoreDeal(ctx, deal)
}
}
}

func (e *engine) adoptExternalOrders(ctx context.Context) {
reqCtx, cancel := context.WithTimeout(ctx, e.cfg.Engine.ConnectionTimeout)
defer cancel()

orders, err := e.market.GetOrders(reqCtx, &sonm.Count{Count: 1000})
if err != nil {
e.log.Warn("failed to load orders from DWH", zap.Error(err))
return
}

for _, order := range orders.GetOrders() {
cord := e.corderFactory.FromOrder(order)
if !e.state.HasOrder(cord) {
adoptedOrdersCounter.Inc()
e.RestoreOrder(cord)
}
}
}

func (e *engine) getTargetCorders() []*types.Corder {
v := make([]*types.Corder, 0)

Expand Down
158 changes: 158 additions & 0 deletions connor/state.go
@@ -0,0 +1,158 @@
package connor

import (
"encoding/json"
"fmt"
"io/ioutil"
"sync"

"github.com/sonm-io/core/connor/types"
"go.uber.org/zap"
)

type dealQuality struct {
Deal *types.Deal `json:"deal"`
ByLogs float64 `json:"by_logs"`
ByPool float64 `json:"by_pool"`
}

type state struct {
mu sync.Mutex
log *zap.Logger
activeOrders map[string]*types.Corder
queuedOrders map[string]*types.Corder
deals map[string]*dealQuality
}

func NewState(log *zap.Logger) *state {
return &state{
log: log.Named("state"),
activeOrders: map[string]*types.Corder{},
queuedOrders: map[string]*types.Corder{},
deals: map[string]*dealQuality{},
}
}

func (s *state) HasOrder(order *types.Corder) bool {
id := order.GetId().Unwrap().String()
hash := order.Hash()

s.mu.Lock()
defer s.mu.Unlock()

_, ok := s.activeOrders[id]
if !ok {
_, ok = s.queuedOrders[hash]
}
return ok
}

func (s *state) HasDeal(deal *types.Deal) bool {
id := deal.GetId().Unwrap().String()

s.mu.Lock()
defer s.mu.Unlock()

_, ok := s.deals[id]
return ok
}

func (s *state) AddActiveOrder(ord *types.Corder) {
id := ord.GetId().Unwrap().String()

s.mu.Lock()
defer s.mu.Unlock()

s.activeOrders[id] = ord
s.log.Debug("active order added",
zap.String("order_id", id),
zap.Int("active", len(s.activeOrders)),
zap.Int("creating", len(s.queuedOrders)))
}

func (s *state) AddQueuedOrder(ord *types.Corder) {
hash := ord.Hash()

s.mu.Lock()
defer s.mu.Unlock()

s.queuedOrders[hash] = ord
s.log.Debug("queued order added",
zap.String("order_hash", hash),
zap.Int("active", len(s.activeOrders)),
zap.Int("queued", len(s.queuedOrders)))
}

func (s *state) DeleteActiveOrder(ord *types.Corder) {
id := ord.GetId().Unwrap().String()

s.mu.Lock()
defer s.mu.Unlock()

delete(s.activeOrders, id)
s.log.Debug("active order deleted",
zap.String("order_id", id),
zap.Int("active", len(s.activeOrders)),
zap.Int("queued", len(s.queuedOrders)))
}

func (s *state) DeleteQueuedOrder(ord *types.Corder) {
hash := ord.Hash()

s.mu.Lock()
defer s.mu.Unlock()

delete(s.queuedOrders, hash)
s.log.Debug("queued order deleted",
zap.String("order_hash", hash),
zap.Int("active", len(s.activeOrders)),
zap.Int("queued", len(s.queuedOrders)))
}

func (s *state) AddDeal(deal *types.Deal) {
id := deal.GetId().Unwrap().String()

s.mu.Lock()
defer s.mu.Unlock()

s.deals[id] = &dealQuality{
Deal: deal,
ByLogs: 1,
ByPool: 1,
}
s.log.Debug("deal added", zap.String("deal_id", id), zap.Int("total", len(s.deals)))
}

func (s *state) DeleteDeal(deal *types.Deal) {
id := deal.GetId().Unwrap().String()

s.mu.Lock()
defer s.mu.Unlock()

delete(s.deals, id)
s.log.Debug("deal deleted", zap.String("deal_id", id), zap.Int("total", len(s.deals)))
}

func (s *state) dump() ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()

return json.Marshal(struct {
ActiveOrders map[string]*types.Corder
QueuedOrders map[string]*types.Corder
Deals map[string]*dealQuality
}{
ActiveOrders: s.activeOrders,
QueuedOrders: s.queuedOrders,
Deals: s.deals,
})
}

func (s *state) DumpToFile() error {
data, err := s.dump()
if err != nil {
return fmt.Errorf("failed to maarshal storage state: %v", err)
}

return ioutil.WriteFile("/tmp/connor_state.json", data, 0600)
}
2 changes: 1 addition & 1 deletion connor/types/corder.go
Expand Up @@ -115,7 +115,7 @@ func (co *Corder) IsReplaceable(newPrice *big.Int, delta float64) bool {
return isOrderReplaceable(currentPrice, newFloatPrice, delta)
}

func (co *Corder) hash() string {
func (co *Corder) Hash() string {
s := struct {
Benchmarks []uint64
Counterparty common.Address
Expand Down
2 changes: 1 addition & 1 deletion connor/types/types.go
Expand Up @@ -34,7 +34,7 @@ func DivideOrdersSets(existingCorders, targetCorders []*Corder) *OrdersSets {

for _, ord := range targetCorders {
if ex, ok := existingByBenchmark[ord.GetHashrate()]; ok {
if ex.hash() == ord.hash() {
if ex.Hash() == ord.Hash() {
set.Restore = append(set.Restore, ex)
} else {
set.Cancel = append(set.Cancel, ex)
Expand Down