Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: keep internal state for Connor #1545

Merged
merged 3 commits into from Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 83 additions & 1 deletion connor/engine.go
Expand Up @@ -44,6 +44,7 @@ type engine struct {
ordersCreateChan chan *types.Corder
ordersResultsChan chan *types.Corder
orderCancelChan chan *types.CorderCancelTuple
state *state
}

var (
Expand All @@ -66,13 +67,25 @@ var (
Name: "sonm_orders_replaced",
Help: "Number of orders that were re-created on marker because of price deviation",
})

restoredOrdersCounter = prometheus.NewCounter(prometheus.CounterOpts{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inconsistent naming

Name: "sonm_orders_restored",
Help: "Number of orders registered in runtime",
})

restoredDealsCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sonm_deals_restored",
Help: "Number of deals registered in runtime",
})
)

func init() {
prometheus.MustRegister(activeDealsGauge)
prometheus.MustRegister(activeOrdersGauge)
prometheus.MustRegister(createdOrdersCounter)
prometheus.MustRegister(replacedOrdersCounter)
prometheus.MustRegister(restoredOrdersCounter)
prometheus.MustRegister(restoredDealsCounter)
}

func New(ctx context.Context, cfg *Config, log *zap.Logger) (*engine, error) {
Expand Down Expand Up @@ -105,6 +118,7 @@ func New(ctx context.Context, cfg *Config, log *zap.Logger) (*engine, error) {
return &engine{
cfg: cfg,
log: log,
state: newState(log),
ethAddr: crypto.PubkeyToAddress(key.PublicKey),

priceProvider: cfg.backends().priceProvider,
Expand Down Expand Up @@ -153,14 +167,20 @@ func (e *engine) Serve(ctx context.Context) error {
return fmt.Errorf("failed to restore market state: %v", err)
}

wg.Go(func() error {
return e.waitForExternalUpdates(ctx)
})

return wg.Wait()
}

func (e *engine) CreateOrder(bid *types.Corder) {
e.state.addQueuedOrder(bid)
e.ordersCreateChan <- bid
}

func (e *engine) CancelOrder(order *types.Corder) {
e.state.addActiveOrder(order)
e.orderCancelChan <- types.NewCorderCancelTuple(order)
}

Expand All @@ -171,6 +191,7 @@ func (e *engine) RestoreOrder(order *types.Corder) {

func (e *engine) RestoreDeal(ctx context.Context, deal *sonm.Deal) {
e.log.Debug("restoring deal", zap.String("id", deal.GetId().Unwrap().String()))
e.state.addDeal(e.dealFactory.FromDeal(deal))
go e.processDeal(ctx, deal)
}

Expand Down Expand Up @@ -199,7 +220,9 @@ func (e *engine) processOrderCreate(ctx context.Context) {
}
e.log.Debug("order successfully created", zap.String("order_id", created.GetId().Unwrap().String()))
createdOrdersCounter.Inc()
e.ordersResultsChan <- e.corderFactory.FromOrder(created)
corder := e.corderFactory.FromOrder(created)
e.state.deleteCreatingOrder(corder)
e.ordersResultsChan <- corder
}
}

Expand All @@ -222,6 +245,7 @@ func (e *engine) processOrderCancel(ctx context.Context) {
continue
}

e.state.deleteActiveOrder(tuple.Corder.GetId().Unwrap().String())
e.log.Debug("order cancelled", zap.String("order_id", tuple.Corder.GetId().Unwrap().String()))
}
}
Expand Down Expand Up @@ -252,6 +276,7 @@ func (e *engine) getOrderByID(ctx context.Context, id string) (*sonm.Order, erro

func (e *engine) processOrderResult(ctx context.Context) {
for order := range e.ordersResultsChan {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this should not be async

e.state.addActiveOrder(order)
go e.waitForDeal(ctx, order)
}
}
Expand Down Expand Up @@ -308,6 +333,8 @@ func (e *engine) checkOrderForDealOnce(ctx context.Context, log *zap.Logger, ord

if ord.GetOrderStatus() == sonm.OrderStatus_ORDER_INACTIVE {
activeOrdersGauge.Dec()
e.state.deleteActiveOrder(ord.GetId().Unwrap().String())
// TODO: (in a separate PR) check for `one_shot` parameter, do not re-create order.
log.Info("order becomes inactive, looking for related deal")

if ord.GetDealID().IsZero() {
Expand Down Expand Up @@ -342,6 +369,7 @@ func (e *engine) processDeal(ctx context.Context, deal *sonm.Deal) {

log.Debug("start deal processing")
defer log.Debug("stop deal processing")
defer e.state.deleteDeal(dealID)

e.antiFraud.DealOpened(deal)
defer e.antiFraud.FinishDeal(ctx, deal, antifraud.AllChecks)
Expand Down Expand Up @@ -822,6 +850,60 @@ func (e *engine) restoreMarketState(ctx context.Context) error {
return nil
}

func (e *engine) waitForExternalUpdates(ctx context.Context) error {
tk := util.NewImmediateTicker(time.Minute)
defer tk.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tk.C:
e.restoreNewDeals(ctx)
e.restoreNewOrders(ctx)
e.state.dump()
}
}
}

func (e *engine) restoreNewDeals(ctx context.Context) {
reqCtx, cancel := context.WithTimeout(ctx, e.cfg.Engine.ConnectionTimeout)
defer cancel()

deals, err := e.deals.List(reqCtx, &sonm.Count{Count: 1000})
if err != nil {
e.log.Warn("failed to load deals from DWH", zap.Error(err))
return
}

for _, deal := range deals.GetDeal() {
id := deal.GetId().Unwrap().String()
if !e.state.hasDeal(id) {
restoredDealsCounter.Inc()
e.RestoreDeal(ctx, deal)
}
}
}

func (e *engine) restoreNewOrders(ctx context.Context) {
reqCtx, cancel := context.WithTimeout(ctx, e.cfg.Engine.ConnectionTimeout)
defer cancel()

orders, err := e.market.GetOrders(reqCtx, &sonm.Count{Count: 1000})
if err != nil {
e.log.Warn("failed to load orders from DWH", zap.Error(err))
return
}

for _, order := range orders.GetOrders() {
cord := e.corderFactory.FromOrder(order)
if !e.state.hasOrder(cord) {
restoredOrdersCounter.Inc()
e.RestoreOrder(cord)
}
}
}

func (e *engine) getTargetCorders() []*types.Corder {
v := make([]*types.Corder, 0)

Expand Down
148 changes: 148 additions & 0 deletions connor/state.go
@@ -0,0 +1,148 @@
package connor

import (
"encoding/json"
"io/ioutil"
"sync"

"github.com/sonm-io/core/connor/types"
"go.uber.org/zap"
)

type dealQuality struct {
Deal *types.Deal
ByLogs float64
ByPool float64
}

type state struct {
mu sync.Mutex
log *zap.Logger
activeOrders map[string]*types.Corder
queuedOrders map[string]*types.Corder
deals map[string]*dealQuality
}

func newState(log *zap.Logger) *state {
return &state{
log: log.Named("state"),
activeOrders: map[string]*types.Corder{},
queuedOrders: map[string]*types.Corder{},
deals: map[string]*dealQuality{},
}
}

func (s *state) hasOrder(order *types.Corder) bool {
s.mu.Lock()
defer s.mu.Unlock()

id := order.GetId().Unwrap().String()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before mu

hash := order.Hash()

_, ok := s.activeOrders[id]
if !ok {
_, ok = s.queuedOrders[hash]
}
return ok
}

func (s *state) hasDeal(id string) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incostistent with hasOrder

s.mu.Lock()
defer s.mu.Unlock()

_, ok := s.deals[id]
return ok
}

func (s *state) addActiveOrder(ord *types.Corder) {
s.mu.Lock()
defer s.mu.Unlock()

s.activeOrders[ord.GetId().Unwrap().String()] = ord
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create string id before lock and use it here and in log

s.log.Debug("adding active order",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either 'added' or move before lock

zap.String("order_id", ord.GetId().Unwrap().String()),
zap.Int("active", len(s.activeOrders)),
zap.Int("creating", len(s.queuedOrders)))
}

func (s *state) addQueuedOrder(ord *types.Corder) {
s.mu.Lock()
defer s.mu.Unlock()

hash := ord.Hash()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move before lock

s.queuedOrders[hash] = ord
s.log.Debug("adding queued order",
zap.String("order_hash", hash),
zap.Int("active", len(s.activeOrders)),
zap.Int("queued", len(s.queuedOrders)))
}

func (s *state) deleteActiveOrder(id string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make all externally used methods public

s.mu.Lock()
defer s.mu.Unlock()

delete(s.activeOrders, id)
s.log.Debug("deleting active order",
zap.String("order_id", id),
zap.Int("active", len(s.activeOrders)),
zap.Int("queued", len(s.queuedOrders)))
}

func (s *state) deleteCreatingOrder(ord *types.Corder) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queued?

s.mu.Lock()
defer s.mu.Unlock()

hash := ord.Hash()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

delete(s.queuedOrders, hash)
s.log.Debug("deleting queued order",
zap.String("order_hash", hash),
zap.Int("active", len(s.activeOrders)),
zap.Int("queued", len(s.queuedOrders)))
}

func (s *state) addDeal(deal *types.Deal) {
s.mu.Lock()
defer s.mu.Unlock()

s.deals[deal.GetId().Unwrap().String()] = &dealQuality{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Deal: deal,
ByLogs: 1,
ByPool: 1,
}
s.log.Debug("adding deal",
zap.String("deal_id", deal.GetId().Unwrap().String()),
zap.Int("total", len(s.deals)))
}

func (s *state) deleteDeal(id string) {
s.mu.Lock()
defer s.mu.Unlock()

s.log.Debug("deleting deal", zap.String("deal_id", id), zap.Int("total", len(s.deals)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'deleted' and move after delete call

delete(s.deals, id)
}

func (s *state) dump() {
s.mu.Lock()
defer s.mu.Unlock()

b, err := json.Marshal(struct {
ActiveOrders map[string]*types.Corder
QueuedOrders map[string]*types.Corder
Deals map[string]*dealQuality
}{
ActiveOrders: s.activeOrders,
QueuedOrders: s.queuedOrders,
Deals: s.deals,
})

if err != nil {
s.log.Warn("cannot marshal state", zap.Error(err))
return
}

s.log.Info("dumpling state to the disk")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split in two methods, write can possibly block for a long time

if err := ioutil.WriteFile("/tmp/connor_state.json", b, 0600); err != nil {
s.log.Warn("cannot write state data ", zap.Error(err))
}
}
2 changes: 1 addition & 1 deletion connor/types/corder.go
Expand Up @@ -115,7 +115,7 @@ func (co *Corder) IsReplaceable(newPrice *big.Int, delta float64) bool {
return isOrderReplaceable(currentPrice, newFloatPrice, delta)
}

func (co *Corder) hash() string {
func (co *Corder) Hash() string {
s := struct {
Benchmarks []uint64
Counterparty common.Address
Expand Down
2 changes: 1 addition & 1 deletion connor/types/types.go
Expand Up @@ -34,7 +34,7 @@ func DivideOrdersSets(existingCorders, targetCorders []*Corder) *OrdersSets {

for _, ord := range targetCorders {
if ex, ok := existingByBenchmark[ord.GetHashrate()]; ok {
if ex.hash() == ord.hash() {
if ex.Hash() == ord.Hash() {
set.Restore = append(set.Restore, ex)
} else {
set.Cancel = append(set.Cancel, ex)
Expand Down