Skip to content

Commit

Permalink
refactor: split subscribe ticker and candle channels (#610)
Browse files Browse the repository at this point in the history
  • Loading branch information
RafilxTenfen committed Mar 7, 2022
1 parent 06fc763 commit d75ab49
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 108 deletions.
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Refactor

- [#587](https://github.com/umee-network/umee/pull/587) Clean up logs from price feeder providers.
- [#610](https://github.com/umee-network/umee/pull/610) Split subscribtion of ticker and candle channels.

## [v0.1.0](https://github.com/umee-network/umee/releases/tag/price-feeder%2Fv0.1.0) - 2022-02-07

Expand Down
83 changes: 52 additions & 31 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.SubscribeTickers(pairs...); err != nil {
if err := provider.SubscribeCurrencyPairs(pairs...); err != nil {
return nil, err
}

Expand All @@ -100,26 +100,6 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
return provider, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps)*2)

iterator := 0
for _, cp := range cps {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}

if err := p.subscribePairs(pairs...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// GetTickerPrices returns the tickerPrices based on the provided pairs.
func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) {
tickerPrices := make(map[string]TickerPrice, len(pairs))
Expand Down Expand Up @@ -152,6 +132,55 @@ func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[stri
return candlePrices, nil
}

// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels.
func (p *BinanceProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error {
if err := p.subscribeChannels(cps...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribeChannels subscribe to the ticker and candle channels for all currency pairs.
func (p *BinanceProvider) subscribeChannels(cps ...types.CurrencyPair) error {
if err := p.subscribeTickers(cps...); err != nil {
return err
}

return p.subscribeCandles(cps...)
}

// subscribeTickers subscribe to the ticker channel for all currency pairs.
func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToBinanceTickerPair(cp)
}

return p.subscribePairs(pairs...)
}

// subscribeCandles subscribe to the candle channel for all currency pairs.
func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToBinanceCandlePair(cp)
}

return p.subscribePairs(pairs...)
}

// subscribedPairsToSlice returns the map of subscribed pairs as a slice.
func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
}

func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()
Expand Down Expand Up @@ -289,16 +318,8 @@ func (p *BinanceProvider) reconnect() error {
}
p.wsClient = wsConn

pairs := make([]string, len(p.subscribedPairs)*2)
iterator := 0
for _, cp := range p.subscribedPairs {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}

return p.subscribePairs(pairs...)
currencyPairs := p.subscribedPairsToSlice()
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
Expand Down
54 changes: 40 additions & 14 deletions price-feeder/oracle/provider/huobi.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.SubscribeTickers(pairs...); err != nil {
if err := provider.SubscribeCurrencyPairs(pairs...); err != nil {
return nil, err
}

Expand Down Expand Up @@ -137,21 +137,55 @@ func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string
return candlePrices, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error {
if err := p.subscribeChannels(cps...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribeChannels subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) subscribeChannels(cps ...types.CurrencyPair) error {
if err := p.subscribeTickers(cps...); err != nil {
return err
}

return p.subscribeCandles(cps...)
}

// subscribeTickers subscribe all currency pairs into ticker channel.
func (p *HuobiProvider) subscribeTickers(cps ...types.CurrencyPair) error {
for _, cp := range cps {
if err := p.subscribeTickerPair(cp); err != nil {
return err
}
}

return nil
}

// subscribeCandles subscribe all currency pairs into candle channel.
func (p *HuobiProvider) subscribeCandles(cps ...types.CurrencyPair) error {
for _, cp := range cps {
if err := p.subscribeCandlePair(cp); err != nil {
return err
}
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribedPairsToSlice returns the map of subscribed pairs as slice
func (p *HuobiProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
}

func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(huobiReconnectTime)
for {
Expand Down Expand Up @@ -292,16 +326,8 @@ func (p *HuobiProvider) reconnect() error {
}
p.wsClient = wsConn

for _, cp := range p.subscribedPairs {
if err := p.subscribeTickerPair(cp); err != nil {
return err
}
if err := p.subscribeCandlePair(cp); err != nil {
return err
}
}

return nil
currencyPairs := p.subscribedPairsToSlice()
return p.subscribeChannels(currencyPairs...)
}

// subscribeTickerPair write the subscription ticker msg to the provider.
Expand Down
82 changes: 47 additions & 35 deletions price-feeder/oracle/provider/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.SubscribeTickers(pairs...); err != nil {
if err := provider.SubscribeCurrencyPairs(pairs...); err != nil {
return nil, err
}

Expand Down Expand Up @@ -149,6 +149,39 @@ func (p *KrakenProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[strin
return candlePrices, nil
}

// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels.
func (p *KrakenProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error {
if err := p.subscribeChannels(cps...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribeChannels subscribe all currency pairs into ticker and candle channels.
func (p *KrakenProvider) subscribeChannels(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToKrakenPair(cp)
}

if err := p.subscribeTickers(pairs...); err != nil {
return err
}

return p.subscribeCandles(pairs...)
}

// subscribedPairsToSlice returns the map of subscribed pairs as slice
func (p *KrakenProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
}

func (candle KrakenCandle) toCandlePrice() (CandlePrice, error) {
return newCandlePrice(
"Kraken",
Expand Down Expand Up @@ -179,25 +212,6 @@ func (p *KrakenProvider) getCandlePrices(key string) ([]CandlePrice, error) {
return candleList, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToKrakenPair(cp)
}

if err := p.subscribeTickerPairs(pairs...); err != nil {
return err
}
if err := p.subscribeCandlePairs(pairs...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// handleWebSocketMsgs receive all the messages from the provider and controls the
// reconnect function to the web socket.
func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
Expand All @@ -211,6 +225,12 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
case <-time.After(defaultReadNewWSMessage):
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
p.logger.Err(err).Msg("WebSocket closed unexpectedly")
p.keepReconnecting()
continue
}

// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
if err := p.ping(); err != nil {
Expand Down Expand Up @@ -395,24 +415,16 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error {
// reconnect closes the last WS connection and create a new one.
func (p *KrakenProvider) reconnect() error {
p.wsClient.Close()
p.logger.Debug().Msg("trying to reconnect")

wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
if err != nil {
return fmt.Errorf("error connecting to Kraken websocket: %w", err)
}
p.wsClient = wsConn

pairs := make([]string, len(p.subscribedPairs))
iterator := 0
for _, cp := range p.subscribedPairs {
pairs[iterator] = currencyPairToKrakenPair(cp)
iterator++
}

if err := p.subscribeTickerPairs(pairs...); err != nil {
return err
}
return p.subscribeCandlePairs(pairs...)
currencyPairs := p.subscribedPairsToSlice()
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in recconnect.
Expand Down Expand Up @@ -501,14 +513,14 @@ func (p *KrakenProvider) ping() error {
return p.wsClient.WriteMessage(websocket.PingMessage, ping)
}

// subscribeTickerPairs write the subscription msg to the provider.
func (p *KrakenProvider) subscribeTickerPairs(pairs ...string) error {
// subscribeTickers write the subscription msg to the provider.
func (p *KrakenProvider) subscribeTickers(pairs ...string) error {
subsMsg := newKrakenTickerSubscriptionMsg(pairs...)
return p.wsClient.WriteJSON(subsMsg)
}

// subscribeCandlePairs write the subscription msg to the provider.
func (p *KrakenProvider) subscribeCandlePairs(pairs ...string) error {
// subscribeCandles write the subscription msg to the provider.
func (p *KrakenProvider) subscribeCandles(pairs ...string) error {
subsMsg := newKrakenCandleSubscriptionMsg(pairs...)
return p.wsClient.WriteJSON(subsMsg)
}
Expand Down

0 comments on commit d75ab49

Please sign in to comment.