From b265ae64777ee04045bd6213c0d54a8fa576e301 Mon Sep 17 00:00:00 2001 From: Zohar Etzioni Date: Wed, 14 Jul 2021 19:34:03 +0300 Subject: [PATCH 1/4] Updated oracle engine to send events at the end of the block --- cmd/vega/node/node_pre.go | 2 +- execution/market_test.go | 11 +++++-- integration/setup_test.go | 2 +- markets/instrument_test.go | 7 ++++ oracles/engine.go | 52 ++++++++++++++++++++++++------ oracles/engine_test.go | 34 ++++++++++++++++--- oracles/mocks/time_service_mock.go | 48 +++++++++++++++++++++++++++ 7 files changed, 139 insertions(+), 17 deletions(-) create mode 100644 oracles/mocks/time_service_mock.go diff --git a/cmd/vega/node/node_pre.go b/cmd/vega/node/node_pre.go index c8be1a9642..d74fe33583 100644 --- a/cmd/vega/node/node_pre.go +++ b/cmd/vega/node/node_pre.go @@ -454,7 +454,7 @@ func (l *NodeCommand) preRun(_ []string) (err error) { } l.collateral = collateral.New(l.Log, l.conf.Collateral, l.broker, now) - l.oracle = oracles.NewEngine(l.Log, l.conf.Oracles, now, l.broker) + l.oracle = oracles.NewEngine(l.Log, l.conf.Oracles, now, l.broker, l.timeService) l.timeService.NotifyOnTick(l.oracle.UpdateCurrentTime) l.oracleAdaptors = oracleAdaptors.New() diff --git a/execution/market_test.go b/execution/market_test.go index d5f91488b6..c26d580df7 100644 --- a/execution/market_test.go +++ b/execution/market_test.go @@ -8,6 +8,8 @@ import ( "time" bmock "code.vegaprotocol.io/vega/broker/mocks" + omock "code.vegaprotocol.io/vega/oracles/mocks" + "code.vegaprotocol.io/vega/collateral" "code.vegaprotocol.io/vega/events" "code.vegaprotocol.io/vega/execution" @@ -87,6 +89,7 @@ type testMarket struct { ctrl *gomock.Controller collateralEngine *collateral.Engine broker *bmock.MockBroker + timeService *omock.MockTimeService now time.Time asset string mas *monitor.AuctionState @@ -111,6 +114,8 @@ func newTestMarket(t *testing.T, now time.Time) *testMarket { // Setup Mocking Expectations tm.broker = bmock.NewMockBroker(ctrl) + tm.timeService = omock.NewMockTimeService(ctrl) + tm.timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1) // eventFn records and count events and orderEvents eventFn := func(evt events.Event) { @@ -153,7 +158,7 @@ func (tm *testMarket) Run(ctx context.Context, mktCfg types.Market) *testMarket liquidityConfig = liquidity.NewDefaultConfig() ) - oracleEngine := oracles.NewEngine(tm.log, oracles.NewDefaultConfig(), tm.now, tm.broker) + oracleEngine := oracles.NewEngine(tm.log, oracles.NewDefaultConfig(), tm.now, tm.broker, tm.timeService) mas := monitor.NewAuctionState(&mktCfg, tm.now) monitor.NewAuctionState(&mktCfg, tm.now) @@ -241,6 +246,8 @@ func getTestMarket2( feeConfig := fee.NewDefaultConfig() liquidityConfig := liquidity.NewDefaultConfig() broker := bmock.NewMockBroker(ctrl) + timeService := omock.NewMockTimeService(ctrl) + timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1) tm := &testMarket{ log: log, @@ -280,7 +287,7 @@ func getTestMarket2( }, }) - oracleEngine := oracles.NewEngine(log, oracles.NewDefaultConfig(), now, broker) + oracleEngine := oracles.NewEngine(log, oracles.NewDefaultConfig(), now, broker, timeService) // add the token asset tokAsset := types.Asset{ diff --git a/integration/setup_test.go b/integration/setup_test.go index ee76f4307f..5090e7b725 100644 --- a/integration/setup_test.go +++ b/integration/setup_test.go @@ -80,7 +80,7 @@ func newExecutionTestSetup() *executionTestSetup { execsetup.log, collateral.NewDefaultConfig(), execsetup.broker, currentTime, ) execsetup.oracleEngine = oracles.NewEngine( - execsetup.log, oracles.NewDefaultConfig(), currentTime, execsetup.broker, + execsetup.log, oracles.NewDefaultConfig(), currentTime, execsetup.broker, execsetup.timeService, ) execsetup.executionEngine = execution.NewEngine( execsetup.log, diff --git a/markets/instrument_test.go b/markets/instrument_test.go index 3dc7042be4..7a3eb31f4c 100644 --- a/markets/instrument_test.go +++ b/markets/instrument_test.go @@ -9,6 +9,8 @@ import ( "code.vegaprotocol.io/vega/logging" "code.vegaprotocol.io/vega/markets" "code.vegaprotocol.io/vega/oracles" + omock "code.vegaprotocol.io/vega/oracles/mocks" + "code.vegaprotocol.io/vega/products" oraclesv1 "code.vegaprotocol.io/vega/proto/oracles/v1" "code.vegaprotocol.io/vega/types" @@ -113,11 +115,16 @@ func newOracleEngine(t *testing.T) products.OracleEngine { ctrl := gomock.NewController(t) broker := mocks.NewMockBroker(ctrl) broker.EXPECT().Send(gomock.Any()) + + ts := omock.NewMockTimeService(ctrl) + ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1) + return oracles.NewEngine( logging.NewTestLogger(), oracles.NewDefaultConfig(), time.Now(), broker, + ts, ) } diff --git a/oracles/engine.go b/oracles/engine.go index afdaff37dc..4e2af5eeb8 100644 --- a/oracles/engine.go +++ b/oracles/engine.go @@ -3,6 +3,7 @@ package oracles import ( "context" "fmt" + "sync" "time" "code.vegaprotocol.io/vega/events" @@ -16,14 +17,21 @@ type Broker interface { SendBatch(events []events.Event) } +// TimeService ... +//go:generate go run github.com/golang/mock/mockgen -destination mocks/time_service_mock.go -package mocks code.vegaprotocol.io/vega/oracles TimeService +type TimeService interface { + NotifyOnTick(f func(context.Context, time.Time)) +} + // Engine is responsible of broadcasting the OracleData to products and risk // models interested in it. type Engine struct { - log *logging.Logger - broker Broker - CurrentTime time.Time - + log *logging.Logger + broker Broker + CurrentTime time.Time + buffer []OracleData subscriptions specSubscriptions + lock sync.Mutex } // NewEngine creates a new oracle Engine. @@ -32,31 +40,48 @@ func NewEngine( conf Config, currentTime time.Time, broker Broker, + ts TimeService, ) *Engine { log = log.Named(namedLogger) log.SetLevel(conf.Level.Get()) - return &Engine{ + e := &Engine{ log: log, broker: broker, CurrentTime: currentTime, subscriptions: newSpecSubscriptions(), } + + ts.NotifyOnTick(e.UpdateCurrentTime) + return e } // UpdateCurrentTime listens to update of the current Vega time. -func (e *Engine) UpdateCurrentTime(_ context.Context, ts time.Time) { +func (e *Engine) UpdateCurrentTime(ctx context.Context, ts time.Time) { e.CurrentTime = ts + + e.lock.Lock() + for _, data := range e.buffer { + err := e.sendOracleUpdate(ctx, data) + if err != nil { + e.log.Debug("failed to send oracle update", + logging.Error(err), + ) + } + } + e.buffer = nil + e.lock.Unlock() } -// BroadcastData broadcasts the OracleData to products and risk models that are -// interested in it. If no one is listening to this OracleData, it is discarded. -func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error { +func (e *Engine) sendOracleUpdate(ctx context.Context, data OracleData) error { result, err := e.subscriptions.filterSubscribers(func(spec OracleSpec) (bool, error) { return spec.MatchData(data) }) if err != nil { + e.log.Debug("error in filtering subscribers", + logging.Error(err), + ) return err } @@ -74,6 +99,15 @@ func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error { return nil } +// BroadcastData appends the OracleData to the buffer and is broadcast on chain time event +// to products and risk models that are interested in it. If no one is listening to this OracleData, it is discarded. +func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error { + e.lock.Lock() + e.buffer = append(e.buffer, data) + e.lock.Unlock() + return nil +} + // Subscribe registers a callback for a given OracleSpec that is call when an // OracleData matches the spec. // It returns a SubscriptionID that is used to Unsubscribe. diff --git a/oracles/engine_test.go b/oracles/engine_test.go index 1b1486b252..22fea420c8 100644 --- a/oracles/engine_test.go +++ b/oracles/engine_test.go @@ -6,10 +6,11 @@ import ( "testing" "time" - "code.vegaprotocol.io/vega/broker/mocks" + bmok "code.vegaprotocol.io/vega/broker/mocks" "code.vegaprotocol.io/vega/events" "code.vegaprotocol.io/vega/logging" "code.vegaprotocol.io/vega/oracles" + "code.vegaprotocol.io/vega/oracles/mocks" oraclespb "code.vegaprotocol.io/vega/proto/oracles/v1" "github.com/golang/mock/gomock" @@ -21,7 +22,8 @@ func TestOracleEngine(t *testing.T) { t.Run("Subscribing to oracle engine succeeds", testOracleEngineSubscribingSucceeds) t.Run("Subscribing to oracle engine with without callback fails", testOracleEngineSubscribingWithoutCallbackFails) t.Run("Broadcasting to right callback with correct data succeeds", testOracleEngineBroadcastingCorrectDataSucceeds) - t.Run("Broadcasting to right callback with incorrect data fails", testOracleEngineBroadcastingIncorrectDataFails) + // I'm keeping this for the purpose of the review - this is not relevant anymore because there is no feedback on errors + //t.Run("Broadcasting to right callback with incorrect data fails", testOracleEngineBroadcastingIncorrectDataFails) t.Run("Unsubscribing known ID from oracle engine succeeds", testOracleEngineUnsubscribingKnownIDSucceeds) t.Run("Unsubscribing unknown ID from oracle engine panics", testOracleEngineUnsubscribingUnknownIDPanics) t.Run("Updating current time succeeds", testOracleEngineUpdatingCurrentTimeSucceeds) @@ -100,6 +102,7 @@ func testOracleEngineBroadcastingCorrectDataSucceeds(t *testing.T) { // then require.NoError(t, errB) + engine.UpdateCurrentTime(ctx, currentTime) assert.Equal(t, &dataBTC42.data, btcEquals42.subscriber.ReceivedData) assert.Equal(t, &dataBTC42.data, btcGreater21.subscriber.ReceivedData) assert.Nil(t, ethEquals42.subscriber.ReceivedData) @@ -168,6 +171,7 @@ func testOracleEngineUnsubscribingKnownIDSucceeds(t *testing.T) { errB2 := engine.BroadcastData(context.Background(), dataBTC42.data) // then + engine.UpdateCurrentTime(ctx, currentTime) require.NoError(t, errB1) require.NoError(t, errB2) assert.Equal(t, &dataETH42.data, ethEquals42.subscriber.ReceivedData) @@ -195,12 +199,17 @@ type testEngine struct { func newEngine(ctx context.Context, t *testing.T, currentTime time.Time) *testEngine { broker := newBroker(ctx, t) + ts := newTimeService(ctx, t) + + ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1) + return &testEngine{ Engine: oracles.NewEngine( logging.NewTestLogger(), oracles.NewDefaultConfig(), currentTime, broker, + ts, ), broker: broker, } @@ -276,18 +285,35 @@ func (d *dummySubscriber) Cb(_ context.Context, data oracles.OracleData) error { } type testBroker struct { - *mocks.MockBroker + *bmok.MockBroker + ctx context.Context +} + +type testTimeService struct { + *mocks.MockTimeService ctx context.Context } func newBroker(ctx context.Context, t *testing.T) *testBroker { ctrl := gomock.NewController(t) return &testBroker{ - MockBroker: mocks.NewMockBroker(ctrl), + MockBroker: bmok.NewMockBroker(ctrl), ctx: ctx, } } +func newTimeService(ctx context.Context, t *testing.T) *testTimeService { + ctrl := gomock.NewController(t) + return &testTimeService{ + MockTimeService: mocks.NewMockTimeService(ctrl), + ctx: ctx, + } +} + +func (ts *testTimeService) mockNotifyOnTick(f interface{}) { + ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1) +} + func (b *testBroker) mockNewOracleSpecSubscription(currentTime time.Time, spec oraclespb.OracleSpec) { spec.CreatedAt = currentTime.UnixNano() spec.Status = oraclespb.OracleSpec_STATUS_ACTIVE diff --git a/oracles/mocks/time_service_mock.go b/oracles/mocks/time_service_mock.go new file mode 100644 index 0000000000..67c77c764d --- /dev/null +++ b/oracles/mocks/time_service_mock.go @@ -0,0 +1,48 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: code.vegaprotocol.io/vega/oracle (interfaces: TimeService) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" +) + +// MockTimeService is a mock of TimeService interface. +type MockTimeService struct { + ctrl *gomock.Controller + recorder *MockTimeServiceMockRecorder +} + +// MockTimeServiceMockRecorder is the mock recorder for MockTimeService. +type MockTimeServiceMockRecorder struct { + mock *MockTimeService +} + +// NewMockTimeService creates a new mock instance. +func NewMockTimeService(ctrl *gomock.Controller) *MockTimeService { + mock := &MockTimeService{ctrl: ctrl} + mock.recorder = &MockTimeServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTimeService) EXPECT() *MockTimeServiceMockRecorder { + return m.recorder +} + +// NotifyOnTick mocks base method. +func (m *MockTimeService) NotifyOnTick(f func(context.Context, time.Time)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "NotifyOnTick", f) +} + +// NotifyOnTick indicates an expected call of NotifyOnTick. +func (mr *MockTimeServiceMockRecorder) NotifyOnTick(f interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyOnTick", reflect.TypeOf((*MockTimeService)(nil).NotifyOnTick), f) +} From 9af2283d6bdb9464972a5085170ee917d03048a2 Mon Sep 17 00:00:00 2001 From: Zohar Etzioni Date: Wed, 14 Jul 2021 21:14:19 +0300 Subject: [PATCH 2/4] fixed static test --- oracles/engine_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/oracles/engine_test.go b/oracles/engine_test.go index 22fea420c8..a47a121c15 100644 --- a/oracles/engine_test.go +++ b/oracles/engine_test.go @@ -310,10 +310,6 @@ func newTimeService(ctx context.Context, t *testing.T) *testTimeService { } } -func (ts *testTimeService) mockNotifyOnTick(f interface{}) { - ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1) -} - func (b *testBroker) mockNewOracleSpecSubscription(currentTime time.Time, spec oraclespb.OracleSpec) { spec.CreatedAt = currentTime.UnixNano() spec.Status = oraclespb.OracleSpec_STATUS_ACTIVE From d377d7aacaa31ce3d6ffaa8e22dd868ba032b280 Mon Sep 17 00:00:00 2001 From: Zohar Etzioni Date: Wed, 14 Jul 2021 21:33:10 +0300 Subject: [PATCH 3/4] remove unused test - it became unused because the broadcast can no longer return an error --- oracles/engine_test.go | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/oracles/engine_test.go b/oracles/engine_test.go index a47a121c15..302ca75c2c 100644 --- a/oracles/engine_test.go +++ b/oracles/engine_test.go @@ -22,8 +22,6 @@ func TestOracleEngine(t *testing.T) { t.Run("Subscribing to oracle engine succeeds", testOracleEngineSubscribingSucceeds) t.Run("Subscribing to oracle engine with without callback fails", testOracleEngineSubscribingWithoutCallbackFails) t.Run("Broadcasting to right callback with correct data succeeds", testOracleEngineBroadcastingCorrectDataSucceeds) - // I'm keeping this for the purpose of the review - this is not relevant anymore because there is no feedback on errors - //t.Run("Broadcasting to right callback with incorrect data fails", testOracleEngineBroadcastingIncorrectDataFails) t.Run("Unsubscribing known ID from oracle engine succeeds", testOracleEngineUnsubscribingKnownIDSucceeds) t.Run("Unsubscribing unknown ID from oracle engine panics", testOracleEngineUnsubscribingUnknownIDPanics) t.Run("Updating current time succeeds", testOracleEngineUpdatingCurrentTimeSucceeds) @@ -110,26 +108,6 @@ func testOracleEngineBroadcastingCorrectDataSucceeds(t *testing.T) { assert.Nil(t, btcGreater100.subscriber.ReceivedData) } -func testOracleEngineBroadcastingIncorrectDataFails(t *testing.T) { - // given - btcEquals42 := spec("BTC", oraclespb.Condition_OPERATOR_EQUALS, "42") - dataBTC42 := dataWithPrice("BTC", "hello") - - // setup - ctx := context.Background() - currentTime := time.Now() - engine := newEngine(ctx, t, currentTime) - engine.broker.mockNewOracleSpecSubscription(currentTime, btcEquals42.spec.Proto) - - // when - _ = engine.Subscribe(ctx, btcEquals42.spec, btcEquals42.subscriber.Cb) - errB := engine.BroadcastData(ctx, dataBTC42.data) - - // then - assert.Error(t, errB) - assert.Nil(t, btcEquals42.subscriber.ReceivedData) -} - func testOracleEngineUnsubscribingUnknownIDPanics(t *testing.T) { // setup ctx := context.Background() From 99cf693c7bba504789245698d83487bd443237b9 Mon Sep 17 00:00:00 2001 From: Zohar Etzioni Date: Wed, 14 Jul 2021 22:04:39 +0300 Subject: [PATCH 4/4] get rid of oracle mocks --- execution/market_test.go | 8 ++--- markets/instrument_test.go | 4 +-- oracles/engine_test.go | 2 +- oracles/mocks/time_service_mock.go | 48 ------------------------------ 4 files changed, 7 insertions(+), 55 deletions(-) delete mode 100644 oracles/mocks/time_service_mock.go diff --git a/execution/market_test.go b/execution/market_test.go index c26d580df7..34ae1f1f08 100644 --- a/execution/market_test.go +++ b/execution/market_test.go @@ -8,7 +8,7 @@ import ( "time" bmock "code.vegaprotocol.io/vega/broker/mocks" - omock "code.vegaprotocol.io/vega/oracles/mocks" + emock "code.vegaprotocol.io/vega/execution/mocks" "code.vegaprotocol.io/vega/collateral" "code.vegaprotocol.io/vega/events" @@ -89,7 +89,7 @@ type testMarket struct { ctrl *gomock.Controller collateralEngine *collateral.Engine broker *bmock.MockBroker - timeService *omock.MockTimeService + timeService *emock.MockTimeService now time.Time asset string mas *monitor.AuctionState @@ -114,7 +114,7 @@ func newTestMarket(t *testing.T, now time.Time) *testMarket { // Setup Mocking Expectations tm.broker = bmock.NewMockBroker(ctrl) - tm.timeService = omock.NewMockTimeService(ctrl) + tm.timeService = emock.NewMockTimeService(ctrl) tm.timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1) // eventFn records and count events and orderEvents @@ -246,7 +246,7 @@ func getTestMarket2( feeConfig := fee.NewDefaultConfig() liquidityConfig := liquidity.NewDefaultConfig() broker := bmock.NewMockBroker(ctrl) - timeService := omock.NewMockTimeService(ctrl) + timeService := emock.NewMockTimeService(ctrl) timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1) tm := &testMarket{ diff --git a/markets/instrument_test.go b/markets/instrument_test.go index 7a3eb31f4c..e731964474 100644 --- a/markets/instrument_test.go +++ b/markets/instrument_test.go @@ -6,10 +6,10 @@ import ( "time" "code.vegaprotocol.io/vega/broker/mocks" + emock "code.vegaprotocol.io/vega/execution/mocks" "code.vegaprotocol.io/vega/logging" "code.vegaprotocol.io/vega/markets" "code.vegaprotocol.io/vega/oracles" - omock "code.vegaprotocol.io/vega/oracles/mocks" "code.vegaprotocol.io/vega/products" oraclesv1 "code.vegaprotocol.io/vega/proto/oracles/v1" @@ -116,7 +116,7 @@ func newOracleEngine(t *testing.T) products.OracleEngine { broker := mocks.NewMockBroker(ctrl) broker.EXPECT().Send(gomock.Any()) - ts := omock.NewMockTimeService(ctrl) + ts := emock.NewMockTimeService(ctrl) ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1) return oracles.NewEngine( diff --git a/oracles/engine_test.go b/oracles/engine_test.go index 302ca75c2c..1de0ae6e47 100644 --- a/oracles/engine_test.go +++ b/oracles/engine_test.go @@ -8,9 +8,9 @@ import ( bmok "code.vegaprotocol.io/vega/broker/mocks" "code.vegaprotocol.io/vega/events" + "code.vegaprotocol.io/vega/execution/mocks" "code.vegaprotocol.io/vega/logging" "code.vegaprotocol.io/vega/oracles" - "code.vegaprotocol.io/vega/oracles/mocks" oraclespb "code.vegaprotocol.io/vega/proto/oracles/v1" "github.com/golang/mock/gomock" diff --git a/oracles/mocks/time_service_mock.go b/oracles/mocks/time_service_mock.go deleted file mode 100644 index 67c77c764d..0000000000 --- a/oracles/mocks/time_service_mock.go +++ /dev/null @@ -1,48 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: code.vegaprotocol.io/vega/oracle (interfaces: TimeService) - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - reflect "reflect" - time "time" - - gomock "github.com/golang/mock/gomock" -) - -// MockTimeService is a mock of TimeService interface. -type MockTimeService struct { - ctrl *gomock.Controller - recorder *MockTimeServiceMockRecorder -} - -// MockTimeServiceMockRecorder is the mock recorder for MockTimeService. -type MockTimeServiceMockRecorder struct { - mock *MockTimeService -} - -// NewMockTimeService creates a new mock instance. -func NewMockTimeService(ctrl *gomock.Controller) *MockTimeService { - mock := &MockTimeService{ctrl: ctrl} - mock.recorder = &MockTimeServiceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockTimeService) EXPECT() *MockTimeServiceMockRecorder { - return m.recorder -} - -// NotifyOnTick mocks base method. -func (m *MockTimeService) NotifyOnTick(f func(context.Context, time.Time)) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyOnTick", f) -} - -// NotifyOnTick indicates an expected call of NotifyOnTick. -func (mr *MockTimeServiceMockRecorder) NotifyOnTick(f interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyOnTick", reflect.TypeOf((*MockTimeService)(nil).NotifyOnTick), f) -}