Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated oracle engine to send events at the end of the block #3752

Merged
merged 4 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/vega/node/node_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (l *NodeCommand) preRun(_ []string) (err error) {
}

l.collateral = collateral.New(l.Log, l.conf.Collateral, l.broker, now)
l.oracle = oracles.NewEngine(l.Log, l.conf.Oracles, now, l.broker)
l.oracle = oracles.NewEngine(l.Log, l.conf.Oracles, now, l.broker, l.timeService)
l.timeService.NotifyOnTick(l.oracle.UpdateCurrentTime)
l.oracleAdaptors = oracleAdaptors.New()

Expand Down
11 changes: 9 additions & 2 deletions execution/market_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

bmock "code.vegaprotocol.io/vega/broker/mocks"
emock "code.vegaprotocol.io/vega/execution/mocks"

"code.vegaprotocol.io/vega/collateral"
"code.vegaprotocol.io/vega/events"
"code.vegaprotocol.io/vega/execution"
Expand Down Expand Up @@ -87,6 +89,7 @@ type testMarket struct {
ctrl *gomock.Controller
collateralEngine *collateral.Engine
broker *bmock.MockBroker
timeService *emock.MockTimeService
now time.Time
asset string
mas *monitor.AuctionState
Expand All @@ -111,6 +114,8 @@ func newTestMarket(t *testing.T, now time.Time) *testMarket {

// Setup Mocking Expectations
tm.broker = bmock.NewMockBroker(ctrl)
tm.timeService = emock.NewMockTimeService(ctrl)
tm.timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

// eventFn records and count events and orderEvents
eventFn := func(evt events.Event) {
Expand Down Expand Up @@ -153,7 +158,7 @@ func (tm *testMarket) Run(ctx context.Context, mktCfg types.Market) *testMarket
liquidityConfig = liquidity.NewDefaultConfig()
)

oracleEngine := oracles.NewEngine(tm.log, oracles.NewDefaultConfig(), tm.now, tm.broker)
oracleEngine := oracles.NewEngine(tm.log, oracles.NewDefaultConfig(), tm.now, tm.broker, tm.timeService)

mas := monitor.NewAuctionState(&mktCfg, tm.now)
monitor.NewAuctionState(&mktCfg, tm.now)
Expand Down Expand Up @@ -241,6 +246,8 @@ func getTestMarket2(
feeConfig := fee.NewDefaultConfig()
liquidityConfig := liquidity.NewDefaultConfig()
broker := bmock.NewMockBroker(ctrl)
timeService := emock.NewMockTimeService(ctrl)
timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

tm := &testMarket{
log: log,
Expand Down Expand Up @@ -280,7 +287,7 @@ func getTestMarket2(
},
})

oracleEngine := oracles.NewEngine(log, oracles.NewDefaultConfig(), now, broker)
oracleEngine := oracles.NewEngine(log, oracles.NewDefaultConfig(), now, broker, timeService)

// add the token asset
tokAsset := types.Asset{
Expand Down
2 changes: 1 addition & 1 deletion integration/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newExecutionTestSetup() *executionTestSetup {
execsetup.log, collateral.NewDefaultConfig(), execsetup.broker, currentTime,
)
execsetup.oracleEngine = oracles.NewEngine(
execsetup.log, oracles.NewDefaultConfig(), currentTime, execsetup.broker,
execsetup.log, oracles.NewDefaultConfig(), currentTime, execsetup.broker, execsetup.timeService,
)
execsetup.executionEngine = execution.NewEngine(
execsetup.log,
Expand Down
7 changes: 7 additions & 0 deletions markets/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"time"

"code.vegaprotocol.io/vega/broker/mocks"
emock "code.vegaprotocol.io/vega/execution/mocks"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/markets"
"code.vegaprotocol.io/vega/oracles"

"code.vegaprotocol.io/vega/products"
oraclesv1 "code.vegaprotocol.io/vega/proto/oracles/v1"
"code.vegaprotocol.io/vega/types"
Expand Down Expand Up @@ -113,11 +115,16 @@ func newOracleEngine(t *testing.T) products.OracleEngine {
ctrl := gomock.NewController(t)
broker := mocks.NewMockBroker(ctrl)
broker.EXPECT().Send(gomock.Any())

ts := emock.NewMockTimeService(ctrl)
ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

return oracles.NewEngine(
logging.NewTestLogger(),
oracles.NewDefaultConfig(),
time.Now(),
broker,
ts,
)
}

Expand Down
52 changes: 43 additions & 9 deletions oracles/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package oracles
import (
"context"
"fmt"
"sync"
"time"

"code.vegaprotocol.io/vega/events"
Expand All @@ -16,14 +17,21 @@ type Broker interface {
SendBatch(events []events.Event)
}

// TimeService ...
//go:generate go run github.com/golang/mock/mockgen -destination mocks/time_service_mock.go -package mocks code.vegaprotocol.io/vega/oracles TimeService
type TimeService interface {
NotifyOnTick(f func(context.Context, time.Time))
}

// Engine is responsible of broadcasting the OracleData to products and risk
// models interested in it.
type Engine struct {
log *logging.Logger
broker Broker
CurrentTime time.Time

log *logging.Logger
broker Broker
CurrentTime time.Time
buffer []OracleData
subscriptions specSubscriptions
lock sync.Mutex
}

// NewEngine creates a new oracle Engine.
Expand All @@ -32,31 +40,48 @@ func NewEngine(
conf Config,
currentTime time.Time,
broker Broker,
ts TimeService,
) *Engine {
log = log.Named(namedLogger)
log.SetLevel(conf.Level.Get())

return &Engine{
e := &Engine{
log: log,
broker: broker,
CurrentTime: currentTime,
subscriptions: newSpecSubscriptions(),
}

ts.NotifyOnTick(e.UpdateCurrentTime)
return e
}

// UpdateCurrentTime listens to update of the current Vega time.
func (e *Engine) UpdateCurrentTime(_ context.Context, ts time.Time) {
func (e *Engine) UpdateCurrentTime(ctx context.Context, ts time.Time) {
e.CurrentTime = ts

e.lock.Lock()
for _, data := range e.buffer {
err := e.sendOracleUpdate(ctx, data)
if err != nil {
e.log.Debug("failed to send oracle update",
logging.Error(err),
)
}
}
e.buffer = nil
e.lock.Unlock()
}

// BroadcastData broadcasts the OracleData to products and risk models that are
// interested in it. If no one is listening to this OracleData, it is discarded.
func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error {
func (e *Engine) sendOracleUpdate(ctx context.Context, data OracleData) error {
result, err := e.subscriptions.filterSubscribers(func(spec OracleSpec) (bool, error) {
return spec.MatchData(data)
})

if err != nil {
e.log.Debug("error in filtering subscribers",
logging.Error(err),
)
return err
}

Expand All @@ -74,6 +99,15 @@ func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error {
return nil
}

// BroadcastData appends the OracleData to the buffer and is broadcast on chain time event
// to products and risk models that are interested in it. If no one is listening to this OracleData, it is discarded.
func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error {
e.lock.Lock()
e.buffer = append(e.buffer, data)
e.lock.Unlock()
return nil
}

// Subscribe registers a callback for a given OracleSpec that is call when an
// OracleData matches the spec.
// It returns a SubscriptionID that is used to Unsubscribe.
Expand Down
48 changes: 24 additions & 24 deletions oracles/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"testing"
"time"

"code.vegaprotocol.io/vega/broker/mocks"
bmok "code.vegaprotocol.io/vega/broker/mocks"
"code.vegaprotocol.io/vega/events"
"code.vegaprotocol.io/vega/execution/mocks"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/oracles"
oraclespb "code.vegaprotocol.io/vega/proto/oracles/v1"
Expand All @@ -21,7 +22,6 @@ func TestOracleEngine(t *testing.T) {
t.Run("Subscribing to oracle engine succeeds", testOracleEngineSubscribingSucceeds)
t.Run("Subscribing to oracle engine with without callback fails", testOracleEngineSubscribingWithoutCallbackFails)
t.Run("Broadcasting to right callback with correct data succeeds", testOracleEngineBroadcastingCorrectDataSucceeds)
t.Run("Broadcasting to right callback with incorrect data fails", testOracleEngineBroadcastingIncorrectDataFails)
t.Run("Unsubscribing known ID from oracle engine succeeds", testOracleEngineUnsubscribingKnownIDSucceeds)
t.Run("Unsubscribing unknown ID from oracle engine panics", testOracleEngineUnsubscribingUnknownIDPanics)
t.Run("Updating current time succeeds", testOracleEngineUpdatingCurrentTimeSucceeds)
Expand Down Expand Up @@ -100,33 +100,14 @@ func testOracleEngineBroadcastingCorrectDataSucceeds(t *testing.T) {

// then
require.NoError(t, errB)
engine.UpdateCurrentTime(ctx, currentTime)
assert.Equal(t, &dataBTC42.data, btcEquals42.subscriber.ReceivedData)
assert.Equal(t, &dataBTC42.data, btcGreater21.subscriber.ReceivedData)
assert.Nil(t, ethEquals42.subscriber.ReceivedData)
assert.Nil(t, ethLess84.subscriber.ReceivedData)
assert.Nil(t, btcGreater100.subscriber.ReceivedData)
}

func testOracleEngineBroadcastingIncorrectDataFails(t *testing.T) {
// given
btcEquals42 := spec("BTC", oraclespb.Condition_OPERATOR_EQUALS, "42")
dataBTC42 := dataWithPrice("BTC", "hello")

// setup
ctx := context.Background()
currentTime := time.Now()
engine := newEngine(ctx, t, currentTime)
engine.broker.mockNewOracleSpecSubscription(currentTime, btcEquals42.spec.Proto)

// when
_ = engine.Subscribe(ctx, btcEquals42.spec, btcEquals42.subscriber.Cb)
errB := engine.BroadcastData(ctx, dataBTC42.data)

// then
assert.Error(t, errB)
assert.Nil(t, btcEquals42.subscriber.ReceivedData)
}

func testOracleEngineUnsubscribingUnknownIDPanics(t *testing.T) {
// setup
ctx := context.Background()
Expand Down Expand Up @@ -168,6 +149,7 @@ func testOracleEngineUnsubscribingKnownIDSucceeds(t *testing.T) {
errB2 := engine.BroadcastData(context.Background(), dataBTC42.data)

// then
engine.UpdateCurrentTime(ctx, currentTime)
require.NoError(t, errB1)
require.NoError(t, errB2)
assert.Equal(t, &dataETH42.data, ethEquals42.subscriber.ReceivedData)
Expand Down Expand Up @@ -195,12 +177,17 @@ type testEngine struct {

func newEngine(ctx context.Context, t *testing.T, currentTime time.Time) *testEngine {
broker := newBroker(ctx, t)
ts := newTimeService(ctx, t)

ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

return &testEngine{
Engine: oracles.NewEngine(
logging.NewTestLogger(),
oracles.NewDefaultConfig(),
currentTime,
broker,
ts,
),
broker: broker,
}
Expand Down Expand Up @@ -276,18 +263,31 @@ func (d *dummySubscriber) Cb(_ context.Context, data oracles.OracleData) error {
}

type testBroker struct {
*mocks.MockBroker
*bmok.MockBroker
ctx context.Context
}

type testTimeService struct {
*mocks.MockTimeService
ctx context.Context
}

func newBroker(ctx context.Context, t *testing.T) *testBroker {
ctrl := gomock.NewController(t)
return &testBroker{
MockBroker: mocks.NewMockBroker(ctrl),
MockBroker: bmok.NewMockBroker(ctrl),
ctx: ctx,
}
}

func newTimeService(ctx context.Context, t *testing.T) *testTimeService {
ctrl := gomock.NewController(t)
return &testTimeService{
MockTimeService: mocks.NewMockTimeService(ctrl),
ctx: ctx,
}
}

func (b *testBroker) mockNewOracleSpecSubscription(currentTime time.Time, spec oraclespb.OracleSpec) {
spec.CreatedAt = currentTime.UnixNano()
spec.Status = oraclespb.OracleSpec_STATUS_ACTIVE
Expand Down