Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kraken Subscription Improvements #1516

Closed
wants to merge 71 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
b0bd2eb
Tests: TestFixtureToDataHandler preserve WS
gbjk Feb 19, 2024
38a9be6
Tests: Fix WsAuth turned off by config checking
gbjk Feb 12, 2024
bd03ab6
Websocket: Use ErrSubscribedAlready
gbjk Feb 19, 2024
9a8823d
Subscriptions: Replace Pair with Pairs
gbjk Feb 19, 2024
3a002d3
Docs: Update subscriptions in add new exch
gbjk Mar 3, 2024
bbb97b6
RPC: Update Subscription Pairs
gbjk Mar 3, 2024
4408610
Linter: Disable testifylint.Len
gbjk Feb 21, 2024
b5231b8
Websocket: Add suffix to state consts
gbjk Feb 22, 2024
337ee30
Binance: Subscription Pairs support
gbjk Mar 2, 2024
badeef6
Bitfinex: Subscription Pairs support
gbjk Mar 2, 2024
00843a2
Bithumb: Subscription Pairs support
gbjk Mar 2, 2024
76af13f
Bitmex: Subscription Pairs support
gbjk Mar 2, 2024
058c3bf
Bitstamp: Subscription Pairs support
gbjk Mar 2, 2024
99fd285
BTCMarkets: Subscription Pairs support
gbjk Mar 2, 2024
027364f
BTSE: Subscription Pairs support
gbjk Mar 2, 2024
27f0dc8
Coinbase: Subscription Pairs support
gbjk Mar 2, 2024
a793e55
Coinut: Subscription Pairs support
gbjk Mar 2, 2024
0e594c0
GateIO: Subscription Pairs support
gbjk Mar 2, 2024
005cbd0
Gemini: Subscription Pairs support and improvement
gbjk Mar 2, 2024
f1ac1e1
Hitbtc: Subscription Pairs support
gbjk Mar 2, 2024
08bac6d
Huboi: Subscription Pairs support
gbjk Mar 2, 2024
28b0648
Kucoin: Subscription Pairs support
gbjk Mar 2, 2024
834d03c
Okcoin: Subscription Pairs support
gbjk Mar 2, 2024
2334f9e
Poloniex: Subscription Pairs support
gbjk Mar 2, 2024
9ec3b64
Kraken: Add subscription Pairs support
gbjk Feb 19, 2024
626237e
Bybit: Subscription Pairs support
gbjk Feb 26, 2024
cac17a8
Okx: Subscription Pairs support
gbjk Mar 2, 2024
ecc0586
Bitmex: Subsription configuration
gbjk Mar 5, 2024
2fea726
CoinbasePro: Subscription Configuration
gbjk Mar 6, 2024
b86b835
Websocket: Log actual sent message when Verbose
gbjk Mar 7, 2024
bdf89da
fixup! Subscriptions: Replace Pair with Pairs
gbjk Mar 14, 2024
1a65170
fixup! RPC: Update Subscription Pairs
gbjk Mar 14, 2024
f231372
fixup! Okx: Subscription Pairs support
gbjk Mar 14, 2024
d9b510d
fixup! Subscriptions: Replace Pair with Pairs
gbjk Mar 14, 2024
32c2ad5
fixup! Subscriptions: Replace Pair with Pairs
gbjk Mar 14, 2024
910467a
fixup! Subscriptions: Replace Pair with Pairs
gbjk Mar 14, 2024
028055f
fixup! Subscriptions: Replace Pair with Pairs
gbjk Mar 17, 2024
4654ad6
fixup! Subscriptions: Replace Pair with Pairs
gbjk Mar 19, 2024
d7e5c4c
fixup! Subscriptions: Replace Pair with Pairs
gbjk Mar 19, 2024
d2b72e1
Subscriptions: Fix subscription.Clone bad .Key
gbjk Apr 1, 2024
09a6b13
Subscriptions: Improve clarity of which key is which in Match
gbjk Mar 28, 2024
0118315
Subscriptions: Lint fix for HugeParam
gbjk Mar 28, 2024
3207685
Subscriptions: Add AddPairs and move keys from test
gbjk Apr 8, 2024
0bef097
Subscriptions: Simplify subscription keys and add key types
gbjk Apr 8, 2024
bb6e77d
Subscriptions: Add List.GroupPairs Rename sub.AddPairs
gbjk Apr 8, 2024
c930884
Subscription: Fix ExactKey not matching 0 pairs
gbjk Apr 9, 2024
20a92be
Subscriptions: Remove unused IdentityKey and HasPairKey
gbjk Apr 9, 2024
6774e9d
Subscriptions: Fix GetKey test
gbjk Apr 9, 2024
bff6198
fixup! Subscription: Fix ExactKey not matching 0 pairs
gbjk Apr 9, 2024
733c8bd
fixup! Subscriptions: Replace Pair with Pairs
gbjk Apr 10, 2024
6ea7312
fixup! Subscriptions: Add List.GroupPairs Rename sub.AddPairs
gbjk Apr 11, 2024
b1242d2
fixup! Subscription: Fix ExactKey not matching 0 pairs
gbjk Apr 11, 2024
c36122e
Subscriptions: Test coverage improvements
gbjk Apr 11, 2024
2314350
fixup! Subscriptions: Fix subscription.Clone bad .Key
gbjk Apr 11, 2024
c448636
fixup! Subscriptions: Test coverage improvements
gbjk Apr 11, 2024
157f00a
fixup! fixup! Subscriptions: Fix subscription.Clone bad .Key
gbjk Apr 12, 2024
cdaa336
fixup! Subscription: Fix ExactKey not matching 0 pairs
gbjk Apr 14, 2024
bebe363
Websocket: Change State on Add/Remove
gbjk Apr 15, 2024
36f2202
Kraken: Fix wsCancelOrders not erroring order id
gbjk Dec 23, 2023
ebf94e2
Kraken: Fix URLMap ignored for websocket URLs
gbjk Dec 27, 2023
898916a
Kraken: Move SeedAssets from Setup to Bootstrap
gbjk Sep 21, 2023
1ff633a
Kraken: Sub Channel improvements
gbjk Sep 21, 2023
6792111
Kraken: Fixes post rebase
gbjk Mar 20, 2024
dc0bb85
Kraken: Simplify all WS handlers with reqId
gbjk Mar 22, 2024
f967b90
Kraken: Enforce Public channel names
gbjk Mar 23, 2024
3f24f80
Websocket: Make ErrNoMessageListener a public error
gbjk Mar 23, 2024
14fb8c4
Kraken: Test fixes
gbjk Mar 24, 2024
f879c7a
Kraken: Remove convert test
gbjk Mar 30, 2024
5a265c4
Convert: Fix TimeFromUnixTimestampDecimal using local
gbjk Mar 30, 2024
1ada0a5
Websocket: Add SendMessageReturnResponses
gbjk Apr 8, 2024
db0f863
Kraken: Generate N+ subs for pairs
gbjk Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ linters-settings:
disable:
- require-error
- float-compare
# We deliberately use Equal over Len to avoid spamming the contents of large Slices
- len

issues:
max-issues-per-linter: 0
Expand Down
5 changes: 2 additions & 3 deletions common/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ func TimeFromUnixTimestampFloat(raw interface{}) (time.Time, error) {
return time.UnixMilli(int64(ts)), nil
}

// TimeFromUnixTimestampDecimal converts a unix timestamp in decimal form to
// a time.Time
// TimeFromUnixTimestampDecimal converts a unix timestamp in decimal form to a time.Time
func TimeFromUnixTimestampDecimal(input float64) time.Time {
i, f := math.Modf(input)
return time.Unix(int64(i), int64(f*(1e9)))
return time.Unix(int64(i), int64(f*(1e9))).UTC()
}

// UnixTimestampToTime returns time.time
Expand Down
24 changes: 12 additions & 12 deletions common/convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
)

func TestFloatFromString(t *testing.T) {
Expand Down Expand Up @@ -98,18 +99,17 @@ func TestTimeFromUnixTimestampFloat(t *testing.T) {
}

func TestTimeFromUnixTimestampDecimal(t *testing.T) {
r := TimeFromUnixTimestampDecimal(1590633982.5714)
if r.Year() != 2020 ||
r.Month().String() != "May" ||
r.Day() != 28 {
t.Error("unexpected result")
}

r = TimeFromUnixTimestampDecimal(1560516023.070651)
if r.Year() != 2019 ||
r.Month().String() != "June" ||
r.Day() != 14 {
t.Error("unexpected result")
for in, exp := range map[float64]time.Time{
1590633982.5714: time.Date(2020, 5, 28, 2, 46, 22, 571400000, time.UTC),
1560516023.070651: time.Date(2019, 6, 14, 12, 40, 23, 70651000, time.UTC),
// Examples from Kraken
1373750306.9819: time.Date(2013, 7, 13, 21, 18, 26, 981900000, time.UTC),
1534614098.345543: time.Date(2018, 8, 18, 17, 41, 38, 345543000, time.UTC),
} {
got := TimeFromUnixTimestampDecimal(in)
z, _ := got.Zone()
assert.Equal(t, "UTC", z, "TimeFromUnixTimestampDecimal should return a UTC time")
assert.WithinRangef(t, got, exp.Add(-time.Microsecond), exp.Add(time.Microsecond), "TimeFromUnixTimestampDecimal(%f) should parse a unix timestamp correctly", in)
}
}

Expand Down
6 changes: 3 additions & 3 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ type FeaturesEnabledConfig struct {

// FeaturesConfig stores the exchanges supported and enabled features
type FeaturesConfig struct {
Supports FeaturesSupportedConfig `json:"supports"`
Enabled FeaturesEnabledConfig `json:"enabled"`
Subscriptions []*subscription.Subscription `json:"subscriptions,omitempty"`
Supports FeaturesSupportedConfig `json:"supports"`
Enabled FeaturesEnabledConfig `json:"enabled"`
Subscriptions subscription.List `json:"subscriptions,omitempty"`
}

// APIEndpointsConfig stores the API endpoint addresses
Expand Down
6 changes: 2 additions & 4 deletions currency/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ func NewBTCUSD() Pair {
return NewPair(BTC, USD)
}

// NewPairDelimiter splits the desired currency string at delimiter, the returns
// a Pair struct
// NewPairDelimiter splits the desired currency string at delimiter, the returns a Pair struct
func NewPairDelimiter(currencyPair, delimiter string) (Pair, error) {
if !strings.Contains(currencyPair, delimiter) {
return EMPTYPAIR,
Expand Down Expand Up @@ -77,8 +76,7 @@ func NewPairWithDelimiter(base, quote, delimiter string) Pair {
}
}

// NewPairFromIndex returns a CurrencyPair via a currency string and specific
// index
// NewPairFromIndex returns a CurrencyPair via a currency string and specific index
func NewPairFromIndex(currencyPair, index string) (Pair, error) {
i := strings.Index(currencyPair, index)
if i == -1 {
Expand Down
35 changes: 25 additions & 10 deletions currency/pairs.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package currency

import (
"cmp"
"encoding/json"
"errors"
"fmt"
"math/rand"
"slices"
"strings"

"github.com/thrasher-corp/gocryptotrader/log"
Expand Down Expand Up @@ -59,8 +61,7 @@ func (p Pairs) Join() string {

// Format formats the pair list to the exchange format configuration
func (p Pairs) Format(pairFmt PairFormat) Pairs {
pairs := make(Pairs, len(p))
copy(pairs, p)
pairs := slices.Clone(p)

var err error
for x := range pairs {
Expand Down Expand Up @@ -120,8 +121,7 @@ func (p Pairs) Lower() Pairs {
return newSlice
}

// Contains checks to see if a specified pair exists inside a currency pair
// array
// Contains checks to see if a specified pair exists inside a currency pair array
func (p Pairs) Contains(check Pair, exact bool) bool {
for i := range p {
if (exact && p[i].Equal(check)) ||
Expand All @@ -132,15 +132,13 @@ func (p Pairs) Contains(check Pair, exact bool) bool {
return false
}

// ContainsAll checks to see if all pairs supplied are contained within the
// original pairs list.
// ContainsAll checks to see if all pairs supplied are contained within the original pairs list
func (p Pairs) ContainsAll(check Pairs, exact bool) error {
if len(check) == 0 {
return ErrCurrencyPairsEmpty
}

comparative := make(Pairs, len(p))
copy(comparative, p)
comparative := slices.Clone(p)
list:
for x := range check {
for y := range comparative {
Expand Down Expand Up @@ -215,8 +213,7 @@ func (p Pairs) GetPairsByCurrencies(currencies Currencies) Pairs {

// Remove removes the specified pair from the list of pairs if it exists
func (p Pairs) Remove(pair Pair) (Pairs, error) {
pairs := make(Pairs, len(p))
copy(pairs, p)
pairs := slices.Clone(p)
for x := range p {
if p[x].Equal(pair) {
return append(pairs[:x], pairs[x+1:]...), nil
Expand Down Expand Up @@ -492,3 +489,21 @@ func (p Pairs) GetPairsByBase(baseTerm Code) (Pairs, error) {
}
return pairs, nil
}

// Sort sorts the Pairs in place by String comparison
func (p Pairs) Sort() {
slices.SortFunc(p, func(a, b Pair) int {
return cmp.Compare(a.String(), b.String())
})
}

// Equal checks to see if two lists of pairs contain the only the same pairs, ignoring delimiter and case
// Does not check for inverted/reciprocal pairs
func (p Pairs) Equal(b Pairs) bool {
pFmt := PairFormat{Uppercase: true, Delimiter: ""}
p = p.Format(pFmt)
b = b.Format(pFmt)
p.Sort()
b.Sort()
return slices.Equal(p, b)
}
18 changes: 18 additions & 0 deletions currency/pairs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"encoding/json"
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestPairsUpper(t *testing.T) {
Expand Down Expand Up @@ -898,3 +900,19 @@
t.Fatalf("received: '%v' but expected '%v'", len(got), 3)
}
}

// TestPairsSort exercises Pairs.Sort
func TestPairsSort(t *testing.T) {
p := Pairs{NewPair(USDT, BTC), NewPair(DAI, XRP), NewPair(DAI, BTC)}
p.Sort()
assert.Equal(t, []string{"DAIBTC", "DAIXRP", "USDTBTC"}, p.Strings(), "Pairs should be sorted")
}

// TestPairsEqual exercises Pairs.Equal
func TestPairsEqual(t *testing.T) {

Check failure on line 912 in currency/pairs_test.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

orig := Pairs{NewPairWithDelimiter("USDT", "BTC", "-"), NewPair(DAI, XRP), NewPair(DAI, BTC)}
assert.True(t, orig.Equal(Pairs{NewPair(DAI, XRP), NewPair(DAI, BTC), NewPair(USDT, BTC)}), "Equal Pairs should return true")
assert.Equal(t, "USDT-BTC", orig[0].String(), "Equal Pairs should not effect original order or format")
assert.False(t, orig.Equal(Pairs{NewPair(DAI, XRP), NewPair(DAI, BTC), NewPair(USD, LTC)}), "UnEqual Pairs should return false")
}
46 changes: 15 additions & 31 deletions docs/ADD_NEW_EXCHANGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func (f *FTX) WsConnect() error {
}
}
// Generates the default subscription set, based off enabled pairs.
subs, err := f.GenerateDefaultSubscriptions()
subs, err := f.generateSubscriptions()
if err != nil {
return err
}
Expand All @@ -731,10 +731,10 @@ func (f *FTX) WsConnect() error {
- Create function to generate default subscriptions:

```go
// GenerateDefaultSubscriptions generates default subscription
func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) {
var subscriptions []subscription.Subscription
subscriptions = append(subscriptions, subscription.Subscription{
// generateSubscriptions generates default subscription
func (f *FTX) generateSubscriptions() (subscription.List, error) {
var subscriptions subscription.List
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: wsMarkets,
})
// Ranges over available channels, pairs and asset types to produce a full
Expand All @@ -752,9 +752,9 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error
"-")
for x := range channels {
subscriptions = append(subscriptions,
subscription.Subscription{
&subscription.Subscription{
Channel: channels[x],
Pair: newPair,
Pair: currency.Pairs{newPair},
Asset: assets[a],
})
}
Expand All @@ -764,9 +764,7 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error
if f.IsWebsocketAuthenticationSupported() {
var authchan = []string{wsOrders, wsFills}
for x := range authchan {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: authchan[x],
})
subscriptions = append(subscriptions, &subscription.Subscription{Channel: authchan[x]})
}
}
return subscriptions, nil
Expand Down Expand Up @@ -807,7 +805,7 @@ type WsSub struct {

```go
// Subscribe sends a websocket message to receive data from the channel
func (f *FTX) Subscribe(channelsToSubscribe []subscription.Subscription) error {
func (f *FTX) Subscribe(channelsToSubscribe subscription.List) error {
// For subscriptions we try to batch as much as possible to limit the amount
// of connection usage but sometimes this is not supported on the exchange
// API.
Expand All @@ -823,13 +821,8 @@ channels:
case wsFills, wsOrders, wsMarkets:
// Authenticated wsFills && wsOrders or wsMarkets which is a channel subscription for the full set of tradable markets do not need a currency pair association.
default:
a, err := f.GetPairAssetType(channelsToSubscribe[i].Pair)
if err != nil {
errs = append(errs, err)
continue channels
}
// Ensures our outbound currency pair is formatted correctly, sometimes our configuration format is different from what our request format needs to be.
formattedPair, err := f.FormatExchangeCurrency(channelsToSubscribe[i].Pair, a)
formattedPair, err := f.FormatExchangeCurrency(channelsToSubscribe[i].Pair, channelsToSubscribe[i].Asset)
if err != nil {
errs = append(errs, err)
continue channels
Expand All @@ -844,10 +837,7 @@ channels:
// When we have a successful subscription, we can alert our internal management system of the success.
f.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
}
if errs != nil {
return errs
}
return nil
return errs
}
```

Expand Down Expand Up @@ -1061,7 +1051,7 @@ func (f *FTX) WsAuth(ctx context.Context) error {

```go
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (f *FTX) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error {
func (f *FTX) Unsubscribe(channelsToUnsubscribe subscription.List) error {
// As with subscribing we want to batch as much as possible, but sometimes this cannot be achieved due to API shortfalls.
var errs common.Errors
channels:
Expand All @@ -1072,13 +1062,7 @@ channels:
switch channelsToUnsubscribe[i].Channel {
case wsFills, wsOrders, wsMarkets:
default:
a, err := f.GetPairAssetType(channelsToUnsubscribe[i].Pair)
if err != nil {
errs = append(errs, err)
continue channels
}

formattedPair, err := f.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, a)
formattedPair, err := f.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, channelsToUnsubscribe[i].Asset)
if err != nil {
errs = append(errs, err)
continue channels
Expand Down Expand Up @@ -1133,8 +1117,8 @@ func (f *FTX) Setup(exch *config.Exchange) error {
Subscriber: f.Subscribe,
// Unsubscriber function outlined above.
UnSubscriber: f.Unsubscribe,
// GenerateDefaultSubscriptions function outlined above.
GenerateSubscriptions: f.GenerateDefaultSubscriptions,
// GenerateSubscriptions function outlined above.
GenerateSubscriptions: f.generateSubscriptions,
// Defines the capabilities of the websocket outlined in supported
// features struct. This allows the websocket connection to be flushed
// appropriately if we have a pair/asset enable/disable change. This is
Expand Down
2 changes: 1 addition & 1 deletion engine/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3121,7 +3121,7 @@ func (s *RPCServer) WebsocketGetSubscriptions(_ context.Context, r *gctrpc.Webso
payload.Subscriptions = append(payload.Subscriptions,
&gctrpc.WebsocketSubscription{
Channel: subs[i].Channel,
Pair: subs[i].Pair.String(),
Pairs: subs[i].Pairs.Join(),
Asset: subs[i].Asset.String(),
Params: string(params),
})
Expand Down
15 changes: 8 additions & 7 deletions exchanges/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ const (
USDTMarginedFutures
USDCMarginedFutures
Options

// Added to represent a USDT and USDC based linear derivatives(futures/perpetual) assets in Bybit V5.
LinearContract
LinearContract // Added to represent a USDT and USDC based linear derivatives(futures/perpetual) assets in Bybit V5.
All

futuresFlag = PerpetualContract | PerpetualSwap | Futures | DeliveryFutures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures | LinearContract
supportedFlag = Spot | Margin | CrossMargin | MarginFunding | Index | Binary | PerpetualContract | PerpetualSwap | Futures | DeliveryFutures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures | Options | LinearContract
Expand All @@ -65,6 +64,7 @@ const (
usdtMarginedFutures = "usdtmarginedfutures"
usdcMarginedFutures = "usdcmarginedfutures"
options = "options"
all = "all"
)

var (
Expand Down Expand Up @@ -111,6 +111,8 @@ func (a Item) String() string {
return usdcMarginedFutures
case Options:
return options
case All:
return all
default:
return ""
}
Expand Down Expand Up @@ -212,11 +214,10 @@ func New(input string) (Item, error) {
return USDCMarginedFutures, nil
case options, "option":
return Options, nil
case all:
return All, nil
default:
return 0, fmt.Errorf("%w '%v', only supports %s",
ErrNotSupported,
input,
supportedList)
return 0, fmt.Errorf("%w '%v', only supports %s", ErrNotSupported, input, supportedList)
}
}

Expand Down
Loading
Loading