Skip to content

Commit

Permalink
Merge pull request #3752 from vegaprotocol/feature/3709
Browse files Browse the repository at this point in the history
Updated oracle engine to send events at the end of the block
  • Loading branch information
ze97286 committed Jul 15, 2021
2 parents 876a406 + 99cf693 commit 1a4e455
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 37 deletions.
2 changes: 1 addition & 1 deletion cmd/vega/node/node_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (l *NodeCommand) preRun(_ []string) (err error) {
}

l.collateral = collateral.New(l.Log, l.conf.Collateral, l.broker, now)
l.oracle = oracles.NewEngine(l.Log, l.conf.Oracles, now, l.broker)
l.oracle = oracles.NewEngine(l.Log, l.conf.Oracles, now, l.broker, l.timeService)
l.timeService.NotifyOnTick(l.oracle.UpdateCurrentTime)
l.oracleAdaptors = oracleAdaptors.New()

Expand Down
11 changes: 9 additions & 2 deletions execution/market_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

bmock "code.vegaprotocol.io/vega/broker/mocks"
emock "code.vegaprotocol.io/vega/execution/mocks"

"code.vegaprotocol.io/vega/collateral"
"code.vegaprotocol.io/vega/events"
"code.vegaprotocol.io/vega/execution"
Expand Down Expand Up @@ -87,6 +89,7 @@ type testMarket struct {
ctrl *gomock.Controller
collateralEngine *collateral.Engine
broker *bmock.MockBroker
timeService *emock.MockTimeService
now time.Time
asset string
mas *monitor.AuctionState
Expand All @@ -111,6 +114,8 @@ func newTestMarket(t *testing.T, now time.Time) *testMarket {

// Setup Mocking Expectations
tm.broker = bmock.NewMockBroker(ctrl)
tm.timeService = emock.NewMockTimeService(ctrl)
tm.timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

// eventFn records and count events and orderEvents
eventFn := func(evt events.Event) {
Expand Down Expand Up @@ -153,7 +158,7 @@ func (tm *testMarket) Run(ctx context.Context, mktCfg types.Market) *testMarket
liquidityConfig = liquidity.NewDefaultConfig()
)

oracleEngine := oracles.NewEngine(tm.log, oracles.NewDefaultConfig(), tm.now, tm.broker)
oracleEngine := oracles.NewEngine(tm.log, oracles.NewDefaultConfig(), tm.now, tm.broker, tm.timeService)

mas := monitor.NewAuctionState(&mktCfg, tm.now)
monitor.NewAuctionState(&mktCfg, tm.now)
Expand Down Expand Up @@ -241,6 +246,8 @@ func getTestMarket2(
feeConfig := fee.NewDefaultConfig()
liquidityConfig := liquidity.NewDefaultConfig()
broker := bmock.NewMockBroker(ctrl)
timeService := emock.NewMockTimeService(ctrl)
timeService.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

tm := &testMarket{
log: log,
Expand Down Expand Up @@ -280,7 +287,7 @@ func getTestMarket2(
},
})

oracleEngine := oracles.NewEngine(log, oracles.NewDefaultConfig(), now, broker)
oracleEngine := oracles.NewEngine(log, oracles.NewDefaultConfig(), now, broker, timeService)

// add the token asset
tokAsset := types.Asset{
Expand Down
2 changes: 1 addition & 1 deletion integration/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newExecutionTestSetup() *executionTestSetup {
execsetup.log, collateral.NewDefaultConfig(), execsetup.broker, currentTime,
)
execsetup.oracleEngine = oracles.NewEngine(
execsetup.log, oracles.NewDefaultConfig(), currentTime, execsetup.broker,
execsetup.log, oracles.NewDefaultConfig(), currentTime, execsetup.broker, execsetup.timeService,
)
execsetup.executionEngine = execution.NewEngine(
execsetup.log,
Expand Down
7 changes: 7 additions & 0 deletions markets/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"time"

"code.vegaprotocol.io/vega/broker/mocks"
emock "code.vegaprotocol.io/vega/execution/mocks"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/markets"
"code.vegaprotocol.io/vega/oracles"

"code.vegaprotocol.io/vega/products"
oraclesv1 "code.vegaprotocol.io/vega/proto/oracles/v1"
"code.vegaprotocol.io/vega/types"
Expand Down Expand Up @@ -113,11 +115,16 @@ func newOracleEngine(t *testing.T) products.OracleEngine {
ctrl := gomock.NewController(t)
broker := mocks.NewMockBroker(ctrl)
broker.EXPECT().Send(gomock.Any())

ts := emock.NewMockTimeService(ctrl)
ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

return oracles.NewEngine(
logging.NewTestLogger(),
oracles.NewDefaultConfig(),
time.Now(),
broker,
ts,
)
}

Expand Down
52 changes: 43 additions & 9 deletions oracles/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package oracles
import (
"context"
"fmt"
"sync"
"time"

"code.vegaprotocol.io/vega/events"
Expand All @@ -16,14 +17,21 @@ type Broker interface {
SendBatch(events []events.Event)
}

// TimeService ...
//go:generate go run github.com/golang/mock/mockgen -destination mocks/time_service_mock.go -package mocks code.vegaprotocol.io/vega/oracles TimeService
type TimeService interface {
NotifyOnTick(f func(context.Context, time.Time))
}

// Engine is responsible of broadcasting the OracleData to products and risk
// models interested in it.
type Engine struct {
log *logging.Logger
broker Broker
CurrentTime time.Time

log *logging.Logger
broker Broker
CurrentTime time.Time
buffer []OracleData
subscriptions specSubscriptions
lock sync.Mutex
}

// NewEngine creates a new oracle Engine.
Expand All @@ -32,31 +40,48 @@ func NewEngine(
conf Config,
currentTime time.Time,
broker Broker,
ts TimeService,
) *Engine {
log = log.Named(namedLogger)
log.SetLevel(conf.Level.Get())

return &Engine{
e := &Engine{
log: log,
broker: broker,
CurrentTime: currentTime,
subscriptions: newSpecSubscriptions(),
}

ts.NotifyOnTick(e.UpdateCurrentTime)
return e
}

// UpdateCurrentTime listens to update of the current Vega time.
func (e *Engine) UpdateCurrentTime(_ context.Context, ts time.Time) {
func (e *Engine) UpdateCurrentTime(ctx context.Context, ts time.Time) {
e.CurrentTime = ts

e.lock.Lock()
for _, data := range e.buffer {
err := e.sendOracleUpdate(ctx, data)
if err != nil {
e.log.Debug("failed to send oracle update",
logging.Error(err),
)
}
}
e.buffer = nil
e.lock.Unlock()
}

// BroadcastData broadcasts the OracleData to products and risk models that are
// interested in it. If no one is listening to this OracleData, it is discarded.
func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error {
func (e *Engine) sendOracleUpdate(ctx context.Context, data OracleData) error {
result, err := e.subscriptions.filterSubscribers(func(spec OracleSpec) (bool, error) {
return spec.MatchData(data)
})

if err != nil {
e.log.Debug("error in filtering subscribers",
logging.Error(err),
)
return err
}

Expand All @@ -74,6 +99,15 @@ func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error {
return nil
}

// BroadcastData appends the OracleData to the buffer and is broadcast on chain time event
// to products and risk models that are interested in it. If no one is listening to this OracleData, it is discarded.
func (e *Engine) BroadcastData(ctx context.Context, data OracleData) error {
e.lock.Lock()
e.buffer = append(e.buffer, data)
e.lock.Unlock()
return nil
}

// Subscribe registers a callback for a given OracleSpec that is call when an
// OracleData matches the spec.
// It returns a SubscriptionID that is used to Unsubscribe.
Expand Down
48 changes: 24 additions & 24 deletions oracles/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"testing"
"time"

"code.vegaprotocol.io/vega/broker/mocks"
bmok "code.vegaprotocol.io/vega/broker/mocks"
"code.vegaprotocol.io/vega/events"
"code.vegaprotocol.io/vega/execution/mocks"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/oracles"
oraclespb "code.vegaprotocol.io/vega/proto/oracles/v1"
Expand All @@ -21,7 +22,6 @@ func TestOracleEngine(t *testing.T) {
t.Run("Subscribing to oracle engine succeeds", testOracleEngineSubscribingSucceeds)
t.Run("Subscribing to oracle engine with without callback fails", testOracleEngineSubscribingWithoutCallbackFails)
t.Run("Broadcasting to right callback with correct data succeeds", testOracleEngineBroadcastingCorrectDataSucceeds)
t.Run("Broadcasting to right callback with incorrect data fails", testOracleEngineBroadcastingIncorrectDataFails)
t.Run("Unsubscribing known ID from oracle engine succeeds", testOracleEngineUnsubscribingKnownIDSucceeds)
t.Run("Unsubscribing unknown ID from oracle engine panics", testOracleEngineUnsubscribingUnknownIDPanics)
t.Run("Updating current time succeeds", testOracleEngineUpdatingCurrentTimeSucceeds)
Expand Down Expand Up @@ -100,33 +100,14 @@ func testOracleEngineBroadcastingCorrectDataSucceeds(t *testing.T) {

// then
require.NoError(t, errB)
engine.UpdateCurrentTime(ctx, currentTime)
assert.Equal(t, &dataBTC42.data, btcEquals42.subscriber.ReceivedData)
assert.Equal(t, &dataBTC42.data, btcGreater21.subscriber.ReceivedData)
assert.Nil(t, ethEquals42.subscriber.ReceivedData)
assert.Nil(t, ethLess84.subscriber.ReceivedData)
assert.Nil(t, btcGreater100.subscriber.ReceivedData)
}

func testOracleEngineBroadcastingIncorrectDataFails(t *testing.T) {
// given
btcEquals42 := spec("BTC", oraclespb.Condition_OPERATOR_EQUALS, "42")
dataBTC42 := dataWithPrice("BTC", "hello")

// setup
ctx := context.Background()
currentTime := time.Now()
engine := newEngine(ctx, t, currentTime)
engine.broker.mockNewOracleSpecSubscription(currentTime, btcEquals42.spec.Proto)

// when
_ = engine.Subscribe(ctx, btcEquals42.spec, btcEquals42.subscriber.Cb)
errB := engine.BroadcastData(ctx, dataBTC42.data)

// then
assert.Error(t, errB)
assert.Nil(t, btcEquals42.subscriber.ReceivedData)
}

func testOracleEngineUnsubscribingUnknownIDPanics(t *testing.T) {
// setup
ctx := context.Background()
Expand Down Expand Up @@ -168,6 +149,7 @@ func testOracleEngineUnsubscribingKnownIDSucceeds(t *testing.T) {
errB2 := engine.BroadcastData(context.Background(), dataBTC42.data)

// then
engine.UpdateCurrentTime(ctx, currentTime)
require.NoError(t, errB1)
require.NoError(t, errB2)
assert.Equal(t, &dataETH42.data, ethEquals42.subscriber.ReceivedData)
Expand Down Expand Up @@ -195,12 +177,17 @@ type testEngine struct {

func newEngine(ctx context.Context, t *testing.T, currentTime time.Time) *testEngine {
broker := newBroker(ctx, t)
ts := newTimeService(ctx, t)

ts.EXPECT().NotifyOnTick(gomock.Any()).Times(1)

return &testEngine{
Engine: oracles.NewEngine(
logging.NewTestLogger(),
oracles.NewDefaultConfig(),
currentTime,
broker,
ts,
),
broker: broker,
}
Expand Down Expand Up @@ -276,18 +263,31 @@ func (d *dummySubscriber) Cb(_ context.Context, data oracles.OracleData) error {
}

type testBroker struct {
*mocks.MockBroker
*bmok.MockBroker
ctx context.Context
}

type testTimeService struct {
*mocks.MockTimeService
ctx context.Context
}

func newBroker(ctx context.Context, t *testing.T) *testBroker {
ctrl := gomock.NewController(t)
return &testBroker{
MockBroker: mocks.NewMockBroker(ctrl),
MockBroker: bmok.NewMockBroker(ctrl),
ctx: ctx,
}
}

func newTimeService(ctx context.Context, t *testing.T) *testTimeService {
ctrl := gomock.NewController(t)
return &testTimeService{
MockTimeService: mocks.NewMockTimeService(ctrl),
ctx: ctx,
}
}

func (b *testBroker) mockNewOracleSpecSubscription(currentTime time.Time, spec oraclespb.OracleSpec) {
spec.CreatedAt = currentTime.UnixNano()
spec.Status = oraclespb.OracleSpec_STATUS_ACTIVE
Expand Down

0 comments on commit 1a4e455

Please sign in to comment.