Skip to content

Commit

Permalink
Kraken: Generate N+ subs for pairs
Browse files Browse the repository at this point in the history
If we generate one sub for all pairs, but then fan it out in the
responses, we end up with a mis-match between the sub store and
GenerateSubs, and when we do FlushChannels it will try to resub
everything again.

Kraken: Generate N+ subs for pairs

If we generate one sub for all pairs, but then fan it out in the
responses, we end up with a mis-match between the sub store and
GenerateSubs, and when we do FlushChannels it will try to resub
everything again.
  • Loading branch information
gbjk committed Apr 15, 2024
1 parent 009f02c commit d45f86a
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 206 deletions.
3 changes: 1 addition & 2 deletions currency/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ func NewBTCUSD() Pair {
return NewPair(BTC, USD)
}

// NewPairDelimiter splits the desired currency string at delimiter, the returns
// a Pair struct
// NewPairDelimiter splits the desired currency string at delimiter, the returns a Pair struct
func NewPairDelimiter(currencyPair, delimiter string) (Pair, error) {
if !strings.Contains(currencyPair, delimiter) {
return EMPTYPAIR,
Expand Down
2 changes: 0 additions & 2 deletions exchanges/kraken/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/thrasher-corp/gocryptotrader/common"
Expand All @@ -34,7 +33,6 @@ const (
// Kraken is the overarching type across the kraken package
type Kraken struct {
exchange.Base
wsRequestMtx sync.Mutex
}

// GetCurrentServerTime returns current server time
Expand Down
64 changes: 36 additions & 28 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
Expand Down Expand Up @@ -939,57 +938,66 @@ func TestWsSubscribe(t *testing.T) {

err := k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{xbtusdPair}}})
assert.NoError(t, err, "Simple subscription should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription")
subs := k.Websocket.GetSubscriptions()
require.Len(t, subs, 1, "Should add 1 Subscription")
assert.Equal(t, subscription.SubscribedState, subs[0].State(), "Subscription should be subscribed state")

err = k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{xbtusdPair}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Resubscribing to the same channel should error with SubFailure")
assert.ErrorIs(t, err, subscription.ErrDuplicate, "Resubscribing to the same channel should error with SubscribedAlready")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error")
subs = k.Websocket.GetSubscriptions()
require.Len(t, subs, 1, "Should not add a subscription on error")
assert.Equal(t, subscription.SubscribedState, subs[0].State(), "Existing subscription state should not change")

err = k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker Pairs: DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
require.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error")

// Mix success and failure
err = k.Subscribe(subscription.List{
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "USD", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "ELF", "/")}},
})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should error correctly")
assert.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")
assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/ELF", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
require.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")

// Just failures
err = k.Subscribe(subscription.List{
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")}},
})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/GOBLIN", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
require.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")

// Just success
err = k.Subscribe(subscription.List{
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "XBT", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("LTC", "ETH", "/")}},
})
assert.NoError(t, err, "Multiple successful subscriptions should not error")

subs := k.Websocket.GetSubscriptions()
subs = k.Websocket.GetSubscriptions()
assert.Len(t, subs, 4, "Should have correct number of subscriptions")

err = k.Unsubscribe(subs[:1])
assert.NoError(t, err, "Simple Unsubscribe should succeed")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should have removed 1 channel")

err = k.Unsubscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "WIZARD", "/")}, Key: 1337}})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error UnsubFail")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/WIZARD", "Unsubscribing from an invalid pair should error correctly")
assert.ErrorIs(t, err, subscription.ErrNotFound, "Simple failing Unsubscribe should error NotFound")
assert.ErrorContains(t, err, "DWARF/WIZARD", "Unsubscribing from an invalid pair should error correctly")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should not have removed any channels")

err = k.Unsubscribe(subscription.List{
subs[1],
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "EAGLE", "/")}, Key: 1338},
})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error UnsubFail")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/EAGLE", "Unsubscribing from an invalid pair should error correctly")
assert.ErrorIs(t, err, subscription.ErrNotFound, "Mixed failing Unsubscribe should error NotFound")
assert.ErrorContains(t, err, "Channel: ticker Pairs: DWARF/EAGLE", "Unsubscribing from an invalid pair should error correctly")

subs = k.Websocket.GetSubscriptions()
assert.Len(t, subs, 2, "Should have removed only 1 more channel")
Expand Down Expand Up @@ -1035,7 +1043,6 @@ func TestWsOrderbookSub(t *testing.T) {
Pairs: currency.Pairs{xbtusdPair},
Levels: 42,
}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error")
assert.ErrorContains(t, err, "Subscription depth not supported", "Bad subscription should error about depth")
}

Expand Down Expand Up @@ -1066,7 +1073,6 @@ func TestWsCandlesSub(t *testing.T) {
Pairs: currency.Pairs{xbtusdPair},
Interval: kline.Interval(time.Minute * time.Duration(127)),
}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error")
assert.ErrorContains(t, err, "Subscription ohlc interval not supported", "Bad subscription should error about interval")
}

Expand Down Expand Up @@ -1211,7 +1217,7 @@ func TestWsOpenOrders(t *testing.T) {
assert.Equal(t, order.Pending, v.Status, "order status")
assert.Equal(t, 0.0, v.Price, "price")
assert.Equal(t, 0.0001, v.Amount, "amount")
assert.Equal(t, time.UnixMicro(1692851641361371), v.Date, "Date")
assert.Equal(t, time.UnixMicro(1692851641361371).UTC(), v.Date, "Date")
case 3:
assert.Equal(t, "OKB55A-UEMMN-YUXM2A", v.OrderID, "OrderID")
assert.Equal(t, order.Open, v.Status, "order status")
Expand All @@ -1228,7 +1234,7 @@ func TestWsOpenOrders(t *testing.T) {
assert.Equal(t, 0.0001, v.ExecutedAmount, "ExecutedAmount")
assert.Equal(t, 26425.2, v.AverageExecutedPrice, "AverageExecutedPrice")
assert.Equal(t, 0.00687, v.Fee, "Fee")
assert.Equal(t, time.UnixMicro(1692851641361447), v.LastUpdated, "LastUpdated")
assert.Equal(t, time.UnixMicro(1692851641361447).UTC(), v.LastUpdated, "LastUpdated")
case 6:
assert.Equal(t, "OGTT3Y-C6I3P-XRI6HR", v.OrderID, "OrderID")
assert.Equal(t, order.UnknownStatus, v.Status, "order status")
Expand All @@ -1238,7 +1244,7 @@ func TestWsOpenOrders(t *testing.T) {
case 7:
assert.Equal(t, "OGTT3Y-C6I3P-XRI6HR", v.OrderID, "OrderID")
assert.Equal(t, order.Closed, v.Status, "order status")
assert.Equal(t, time.UnixMicro(1692675961789052), v.LastUpdated, "LastUpdated")
assert.Equal(t, time.UnixMicro(1692675961789052).UTC(), v.LastUpdated, "LastUpdated")
assert.Equal(t, 10.00345345, v.ExecutedAmount, "ExecutedAmount")
assert.Equal(t, 0.001, v.Fee, "Fee")
assert.Equal(t, 34.5, v.AverageExecutedPrice, "AverageExecutedPrice")
Expand Down Expand Up @@ -1428,13 +1434,15 @@ func TestWsOrderbookMax10Depth(t *testing.T) {
currency.NewPairWithDelimiter("LUNA", "EUR", "/"),
currency.NewPairWithDelimiter("GST", "EUR", "/"),
}
err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{
Channel: subscription.OrderbookChannel,
Pairs: pairs,
Asset: asset.Spot,
Levels: 10,
})
require.NoError(t, err, "AddSuccessfulSubscriptions must not error")
for _, p := range pairs {
err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{
Channel: subscription.OrderbookChannel,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
Levels: 10,
})
require.NoError(t, err, "AddSuccessfulSubscriptions must not error")
}

for x := range websocketXDGUSDOrderbookUpdates {
err := k.wsHandleData([]byte(websocketXDGUSDOrderbookUpdates[x]))
Expand Down
Loading

0 comments on commit d45f86a

Please sign in to comment.