Skip to content

Commit

Permalink
feat: tvwap calculations (#601)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamewozniak committed Mar 4, 2022
1 parent 380ec49 commit 84da2c5
Show file tree
Hide file tree
Showing 12 changed files with 516 additions and 65 deletions.
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket.
- [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket.
- [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx.
- [#601](https://github.com/umee-network/umee/pull/601) Use TVWAP formula for determining prices when available.

### Bug Fixes

Expand Down
64 changes: 49 additions & 15 deletions price-feeder/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,16 @@ func (o *Oracle) GetPrices() map[string]sdk.Dec {
return prices
}

// SetPrices retrieve all the prices from our set of providers as determined
// in the config, average them out, and update the oracle's current exchange
// rates.
// SetPrices retrieves all the prices and candles from our set of providers as
// determined in the config. If candles are available, uses TVWAP in order
// to determine prices. If candles are not available, uses the most recent prices
// with VWAP. Warns the the user of any missing prices, and filters out any faulty
// providers which do not report prices within 2𝜎 of the others.
func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList) error {
g := new(errgroup.Group)
mtx := new(sync.Mutex)
providerPrices := make(map[string]map[string]provider.TickerPrice)
providerCandles := make(map[string]map[string][]provider.CandlePrice)
requiredRates := make(map[string]struct{})

for providerName, currencyPairs := range o.providerPairs {
Expand Down Expand Up @@ -185,19 +188,36 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList
return err
}

candles, err := priceProvider.GetCandlePrices(acceptedPairs...)
if err != nil {
telemetry.IncrCounter(1, "failure", "provider")
return err
}

// flatten and collect prices based on the base currency per provider
//
// e.g.: {ProviderKraken: {"ATOM": <price, volume>, ...}}
mtx.Lock()
for _, cp := range acceptedPairs {
for _, pair := range acceptedPairs {
if _, ok := providerPrices[providerName]; !ok {
providerPrices[providerName] = make(map[string]provider.TickerPrice)
}
if tp, ok := prices[cp.String()]; ok {
providerPrices[providerName][cp.Base] = tp
} else {
if _, ok := providerCandles[providerName]; !ok {
providerCandles[providerName] = make(map[string][]provider.CandlePrice)
}

tp, pricesOk := prices[pair.String()]
cp, candlesOk := candles[pair.String()]
if pricesOk {
providerPrices[providerName][pair.Base] = tp
}
if candlesOk {
providerCandles[providerName][pair.Base] = cp
}

if !pricesOk && !candlesOk {
mtx.Unlock()
return fmt.Errorf("failed to find exchange rate in provider response")
return fmt.Errorf("failed to find any exchange rates in provider response")
}
}
mtx.Unlock()
Expand All @@ -219,6 +239,7 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList
}
}

// warn the user of any missing prices
if len(reportedRates) != len(requiredRates) {
return fmt.Errorf("unable to get prices for all exchange rates")
}
Expand All @@ -228,17 +249,29 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList
}
}

filteredProviderPrices, err := o.filterDeviations(providerPrices)
// attempt to use candles for tvwap calculations
tvwapPrices, err := ComputeTVWAP(providerCandles)
if err != nil {
return err
}

vwapPrices, err := ComputeVWAP(filteredProviderPrices)
if err != nil {
return err
}
// If TVWAP candles are not available or were filtered out due to staleness,
// use most recent prices & VWAP instead.
if len(tvwapPrices) == 0 {
filteredProviderPrices, err := o.filterDeviations(providerPrices)
if err != nil {
return err
}

o.prices = vwapPrices
vwapPrices, err := ComputeVWAP(filteredProviderPrices)
if err != nil {
return err
}

o.prices = vwapPrices
} else {
o.prices = tvwapPrices
}

return nil
}
Expand Down Expand Up @@ -323,7 +356,8 @@ func (o *Oracle) getOrSetProvider(ctx context.Context, providerName string) (pro
// all assets, and filter out any providers that are not within 2𝜎 of the mean.
func (o *Oracle) filterDeviations(
prices map[string]map[string]provider.TickerPrice) (
map[string]map[string]provider.TickerPrice, error) {
map[string]map[string]provider.TickerPrice, error,
) {
var (
filteredPrices = make(map[string]map[string]provider.TickerPrice)
threshold = sdk.MustNewDecFromStr("2")
Expand Down
18 changes: 18 additions & 0 deletions price-feeder/oracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ func (m mockProvider) GetTickerPrices(_ ...types.CurrencyPair) (map[string]provi
return m.prices, nil
}

func (m mockProvider) GetCandlePrices(_ ...types.CurrencyPair) (map[string][]provider.CandlePrice, error) {
candles := make(map[string][]provider.CandlePrice)
for pair, price := range m.prices {
candles[pair] = []provider.CandlePrice{
{
Price: price.Price,
TimeStamp: provider.PastUnixTime(1 * time.Minute),
Volume: price.Volume,
},
}
}
return candles, nil
}

type failingProvider struct {
prices map[string]provider.TickerPrice
}
Expand All @@ -35,6 +49,10 @@ func (m failingProvider) GetTickerPrices(_ ...types.CurrencyPair) (map[string]pr
return nil, fmt.Errorf("unable to get ticker prices")
}

func (m failingProvider) GetCandlePrices(_ ...types.CurrencyPair) (map[string][]provider.CandlePrice, error) {
return nil, fmt.Errorf("unable to get candle prices")
}

type OracleTestSuite struct {
suite.Suite

Expand Down
60 changes: 52 additions & 8 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type (
wsURL url.URL
wsClient *websocket.Conn
logger zerolog.Logger
mtx sync.Mutex
mtx sync.RWMutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
candles map[string][]BinanceCandle // Symbol => BinanceCandle
subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
wsClient: wsConn,
logger: logger.With().Str("provider", "binance").Logger(),
tickers: map[string]BinanceTicker{},
candles: map[string]BinanceCandle{},
candles: map[string][]BinanceCandle{},
subscribedPairs: map[string]types.CurrencyPair{},
}

Expand Down Expand Up @@ -136,7 +136,26 @@ func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[stri
return tickerPrices, nil
}

// GetCandlePrices returns the candlePrices based on the provided pairs.
func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) {
candlePrices := make(map[string][]CandlePrice, len(pairs))

for _, cp := range pairs {
key := cp.String()
prices, err := p.getCandlePrices(key)
if err != nil {
return nil, err
}
candlePrices[key] = prices
}

return candlePrices, nil
}

func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

ticker, ok := p.tickers[key]
if !ok {
return TickerPrice{}, fmt.Errorf("binance provider failed to get ticker price for %s", key)
Expand All @@ -145,13 +164,24 @@ func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
return ticker.toTickerPrice()
}

func (p *BinanceProvider) getTickerTrades(key string) (BinanceCandle, error) {
candle, ok := p.candles[key]
func (p *BinanceProvider) getCandlePrices(key string) ([]CandlePrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

candles, ok := p.candles[key]
if !ok {
return BinanceCandle{}, fmt.Errorf("failed to get ticker trades for %s", key)
return []CandlePrice{}, fmt.Errorf("failed to get candle prices for %s", key)
}

return candle, nil
candleList := []CandlePrice{}
for _, candle := range candles {
cp, err := candle.toCandlePrice()
if err != nil {
return []CandlePrice{}, err
}
candleList = append(candleList, cp)
}
return candleList, nil
}

func (p *BinanceProvider) messageReceived(messageType int, bz []byte) {
Expand Down Expand Up @@ -191,13 +221,27 @@ func (p *BinanceProvider) setTickerPair(ticker BinanceTicker) {
func (p *BinanceProvider) setCandlePair(candle BinanceCandle) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.candles[candle.Symbol] = candle
staleTime := PastUnixTime(providerCandlePeriod)
candleList := []BinanceCandle{}
candleList = append(candleList, candle)

for _, c := range p.candles[candle.Symbol] {
if staleTime < c.Metadata.TimeStamp {
candleList = append(candleList, c)
}
}
p.candles[candle.Symbol] = candleList
}

func (ticker BinanceTicker) toTickerPrice() (TickerPrice, error) {
return newTickerPrice("Binance", ticker.Symbol, ticker.LastPrice, ticker.Volume)
}

func (candle BinanceCandle) toCandlePrice() (CandlePrice, error) {
return newCandlePrice("Binance", candle.Symbol, candle.Metadata.Close, candle.Metadata.Volume,
candle.Metadata.TimeStamp)
}

func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(defaultMaxConnectionTime)
defer reconnectTicker.Stop()
Expand Down
75 changes: 67 additions & 8 deletions price-feeder/oracle/provider/huobi.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type (
wsURL url.URL
wsClient *websocket.Conn
logger zerolog.Logger
mtx sync.Mutex
mtx sync.RWMutex
tickers map[string]HuobiTicker // market.$symbol.ticker => HuobiTicker
candles map[string]HuobiCandle // market.$symbol.kline.$period => HuobiCandle
candles map[string][]HuobiCandle // market.$symbol.kline.$period => HuobiCandle
subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair
}

Expand All @@ -65,9 +65,9 @@ type (

// HuobiCandleTick defines the response type for the candle.
HuobiCandleTick struct {
Close float64 `json:"close"` // Closing price during this period
TimeStamp int64 `json:"id"` // TimeStamp for this as an ID
Volume float64 `json:"volume"` // Volume during this period
Close float64 `json:"close"` // Closing price during this period
TimeStamp int64 `json:"id"` // TimeStamp for this as an ID
Volume float64 `json:"vol"` // Volume during this period
}

// HuobiSubscriptionMsg Msg to subscribe to one ticker channel at time.
Expand All @@ -94,7 +94,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types
wsClient: wsConn,
logger: logger.With().Str("provider", "huobi").Logger(),
tickers: map[string]HuobiTicker{},
candles: map[string]HuobiCandle{},
candles: map[string][]HuobiCandle{},
subscribedPairs: map[string]types.CurrencyPair{},
}

Expand Down Expand Up @@ -122,6 +122,21 @@ func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string
return tickerPrices, nil
}

// GetTickerPrices returns the tickerPrices based on the saved map.
func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) {
candlePrices := make(map[string][]CandlePrice, len(pairs))

for _, cp := range pairs {
price, err := p.getCandlePrices(cp)
if err != nil {
return nil, err
}
candlePrices[cp.String()] = price
}

return candlePrices, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
for _, cp := range cps {
Expand Down Expand Up @@ -252,7 +267,18 @@ func (p *HuobiProvider) setTickerPair(ticker HuobiTicker) {
func (p *HuobiProvider) setCandlePair(candle HuobiCandle) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.candles[candle.CH] = candle
// huobi time period comes in seconds
candle.Tick.TimeStamp = candle.Tick.TimeStamp * 1000
staleTime := PastUnixTime(providerCandlePeriod)
candleList := []HuobiCandle{}
candleList = append(candleList, candle)

for _, c := range p.candles[candle.CH] {
if staleTime < c.Tick.TimeStamp {
candleList = append(candleList, c)
}
}
p.candles[candle.CH] = candleList
}

// reconnect closes the last WS connection and create a new one.
Expand Down Expand Up @@ -291,14 +317,37 @@ func (p *HuobiProvider) subscribeCandlePair(cp types.CurrencyPair) error {
}

func (p *HuobiProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

ticker, ok := p.tickers[currencyPairToHuobiTickerPair(cp)]
if !ok {
return TickerPrice{}, fmt.Errorf("huobi provider failed to get ticker price for %s", cp.String())
return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", cp.String())
}

return ticker.toTickerPrice()
}

func (p *HuobiProvider) getCandlePrices(cp types.CurrencyPair) ([]CandlePrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

candles, ok := p.candles[currencyPairToHuobiCandlePair(cp)]
if !ok {
return []CandlePrice{}, fmt.Errorf("failed to get candles price for %s", cp.String())
}

candleList := []CandlePrice{}
for _, candle := range candles {
cp, err := candle.toCandlePrice()
if err != nil {
return []CandlePrice{}, err
}
candleList = append(candleList, cp)
}
return candleList, nil
}

// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
func (p *HuobiProvider) setSubscribedPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
Expand Down Expand Up @@ -330,6 +379,16 @@ func (ticker HuobiTicker) toTickerPrice() (TickerPrice, error) {
)
}

func (candle HuobiCandle) toCandlePrice() (CandlePrice, error) {
return newCandlePrice(
"Huobi",
candle.CH,
strconv.FormatFloat(candle.Tick.Close, 'f', -1, 64),
strconv.FormatFloat(candle.Tick.Volume, 'f', -1, 64),
candle.Tick.TimeStamp,
)
}

// newHuobiTickerSubscriptionMsg returns a new ticker subscription Msg.
func newHuobiTickerSubscriptionMsg(cp types.CurrencyPair) HuobiSubscriptionMsg {
return HuobiSubscriptionMsg{
Expand Down

0 comments on commit 84da2c5

Please sign in to comment.