Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscriptions: Encapsulate, replace Pair with Pairs and refactor; improve exchange support #1501

Merged
merged 46 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
845a9ae
Websocket: Use ErrSubscribedAlready
gbjk Feb 19, 2024
9f2a68e
Subscriptions: Replace Pair with Pairs
gbjk Feb 19, 2024
b7658a7
Docs: Update subscriptions in add new exch
gbjk Mar 3, 2024
3f34747
RPC: Update Subscription Pairs
gbjk Mar 3, 2024
3ea6158
Linter: Disable testifylint.Len
gbjk Feb 21, 2024
895469d
Websocket: Add suffix to state consts
gbjk Feb 22, 2024
22dad3c
Binance: Subscription Pairs support
gbjk Mar 2, 2024
e870c62
Bitfinex: Subscription Pairs support
gbjk Mar 2, 2024
d593b3d
Bithumb: Subscription Pairs support
gbjk Mar 2, 2024
6357919
Bitmex: Subscription Pairs support
gbjk Mar 2, 2024
88facd9
Bitstamp: Subscription Pairs support
gbjk Mar 2, 2024
8bec958
BTCMarkets: Subscription Pairs support
gbjk Mar 2, 2024
2e2748a
BTSE: Subscription Pairs support
gbjk Mar 2, 2024
f4a9be6
Coinbase: Subscription Pairs support
gbjk Mar 2, 2024
d1e2f78
Coinut: Subscription Pairs support
gbjk Mar 2, 2024
51b72d7
GateIO: Subscription Pairs support
gbjk Mar 2, 2024
57a9597
Gemini: Subscription Pairs support and improvement
gbjk Mar 2, 2024
d5f572c
Hitbtc: Subscription Pairs support
gbjk Mar 2, 2024
58976bc
Huboi: Subscription Pairs support
gbjk Mar 2, 2024
c5938d1
Kucoin: Subscription Pairs support
gbjk Mar 2, 2024
1098e0d
Okcoin: Subscription Pairs support
gbjk Mar 2, 2024
270647e
Poloniex: Subscription Pairs support
gbjk Mar 2, 2024
b75510a
Kraken: Add subscription Pairs support
gbjk Feb 19, 2024
2d93e88
Bybit: Subscription Pairs support
gbjk Feb 26, 2024
457a257
Okx: Subscription Pairs support
gbjk Mar 2, 2024
0998f74
Bitmex: Subsription configuration
gbjk Mar 5, 2024
a7e52dc
CoinbasePro: Subscription Configuration
gbjk Mar 6, 2024
073f4cb
Websocket: Log actual sent message when Verbose
gbjk Mar 7, 2024
618a185
Subscriptions: Improve clarity of which key is which in Match
gbjk Mar 28, 2024
3ba89cf
Subscriptions: Lint fix for HugeParam
gbjk Mar 28, 2024
258a0e5
Subscriptions: Add AddPairs and move keys from test
gbjk Apr 8, 2024
ddae9e8
Subscriptions: Simplify subscription keys and add key types
gbjk Apr 8, 2024
bc34c97
Subscriptions: Add List.GroupPairs Rename sub.AddPairs
gbjk Apr 8, 2024
d7121d2
Subscription: Fix ExactKey not matching 0 pairs
gbjk Apr 9, 2024
efb9d3e
Subscriptions: Remove unused IdentityKey and HasPairKey
gbjk Apr 9, 2024
4bcac0f
Subscriptions: Fix GetKey test
gbjk Apr 9, 2024
32e2007
Subscriptions: Test coverage improvements
gbjk Apr 11, 2024
8ebb7ce
Websocket: Change State on Add/Remove
gbjk Apr 15, 2024
2c1b481
Subscriptions: Improve error context
gbjk May 2, 2024
2559b20
Subscriptions: Fix Enable: false subs not ignored
gbjk May 16, 2024
b2dc59e
Bitfinex: Fix WsAuth test failing on DataHandler
gbjk May 28, 2024
66d79b5
Deribit: Subscription Pairs support
gbjk May 29, 2024
db50ee9
Websocket: Accept nil lists for checkSubscriptions
gbjk Jun 4, 2024
7c725dd
Websocket: Add context to NilPointer errors
gbjk Jun 4, 2024
9d1c4db
Subscriptions: Add context to nil errors
gbjk Jun 4, 2024
ccf98f4
Exchange: Fix error expectations in UnsubToWSChans
gbjk Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ linters-settings:
disable:
- require-error
- float-compare
# We deliberately use Equal over Len to avoid spamming the contents of large Slices
- len

issues:
max-issues-per-linter: 0
Expand Down
6 changes: 3 additions & 3 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ type FeaturesEnabledConfig struct {

// FeaturesConfig stores the exchanges supported and enabled features
type FeaturesConfig struct {
Supports FeaturesSupportedConfig `json:"supports"`
Enabled FeaturesEnabledConfig `json:"enabled"`
Subscriptions []*subscription.Subscription `json:"subscriptions,omitempty"`
Supports FeaturesSupportedConfig `json:"supports"`
Enabled FeaturesEnabledConfig `json:"enabled"`
Subscriptions subscription.List `json:"subscriptions,omitempty"`
}

// APIEndpointsConfig stores the API endpoint addresses
Expand Down
39 changes: 31 additions & 8 deletions currency/pairs.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func (p Pairs) Lower() Pairs {
return newSlice
}

// Contains checks to see if a specified pair exists inside a currency pair
// array
// Contains checks to see if a specified pair exists inside a currency pair array
func (p Pairs) Contains(check Pair, exact bool) bool {
for i := range p {
if (exact && p[i].Equal(check)) ||
Expand All @@ -121,15 +120,13 @@ func (p Pairs) Contains(check Pair, exact bool) bool {
return false
}

// ContainsAll checks to see if all pairs supplied are contained within the
// original pairs list.
// ContainsAll checks to see if all pairs supplied are contained within the original pairs list
func (p Pairs) ContainsAll(check Pairs, exact bool) error {
if len(check) == 0 {
return ErrCurrencyPairsEmpty
}

comparative := make(Pairs, len(p))
copy(comparative, p)
comparative := slices.Clone(p)
list:
for x := range check {
for y := range comparative {
Expand Down Expand Up @@ -204,8 +201,7 @@ func (p Pairs) GetPairsByCurrencies(currencies Currencies) Pairs {

// Remove removes the specified pair from the list of pairs if it exists
func (p Pairs) Remove(pair Pair) (Pairs, error) {
pairs := make(Pairs, len(p))
copy(pairs, p)
pairs := slices.Clone(p)
for x := range p {
if p[x].Equal(pair) {
return append(pairs[:x], pairs[x+1:]...), nil
Expand Down Expand Up @@ -481,3 +477,30 @@ func (p Pairs) GetPairsByBase(baseTerm Code) (Pairs, error) {
}
return pairs, nil
}

// equalKey is a small key for testing pair equality without delimiter
type equalKey struct {
Base *Item
Quote *Item
}

// Equal checks to see if two lists of pairs contain only the same pairs, ignoring delimiter and case
// Does not check for inverted/reciprocal pairs
func (p Pairs) Equal(b Pairs) bool {
gbjk marked this conversation as resolved.
Show resolved Hide resolved
if len(p) != len(b) {
return false
}
if len(p) == 0 {
return true
}
m := map[equalKey]struct{}{}
for i := range p {
m[equalKey{Base: p[i].Base.Item, Quote: p[i].Quote.Item}] = struct{}{}
}
for i := range b {
if _, ok := m[equalKey{Base: b[i].Base.Item, Quote: b[i].Quote.Item}]; !ok {
return false
}
}
return true
}
9 changes: 9 additions & 0 deletions currency/pairs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,12 @@ func TestGetPairsByBase(t *testing.T) {
t.Fatalf("received: '%v' but expected '%v'", len(got), 3)
}
}

// TestPairsEqual exercises Pairs.Equal
func TestPairsEqual(t *testing.T) {
t.Parallel()
orig := Pairs{NewPairWithDelimiter("USDT", "BTC", "-"), NewPair(DAI, XRP), NewPair(DAI, BTC)}
gloriousCode marked this conversation as resolved.
Show resolved Hide resolved
assert.True(t, orig.Equal(Pairs{NewPair(DAI, XRP), NewPair(DAI, BTC), NewPair(USDT, BTC)}), "Equal Pairs should return true")
assert.Equal(t, "USDT-BTC", orig[0].String(), "Equal Pairs should not effect original order or format")
assert.False(t, orig.Equal(Pairs{NewPair(DAI, XRP), NewPair(DAI, BTC), NewPair(USD, LTC)}), "UnEqual Pairs should return false")
}
46 changes: 15 additions & 31 deletions docs/ADD_NEW_EXCHANGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (f *FTX) WsConnect() error {
}
}
// Generates the default subscription set, based off enabled pairs.
subs, err := f.GenerateDefaultSubscriptions()
subs, err := f.generateSubscriptions()
if err != nil {
return err
}
Expand All @@ -733,10 +733,10 @@ func (f *FTX) WsConnect() error {
- Create function to generate default subscriptions:

```go
// GenerateDefaultSubscriptions generates default subscription
func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) {
var subscriptions []subscription.Subscription
subscriptions = append(subscriptions, subscription.Subscription{
// generateSubscriptions generates default subscription
func (f *FTX) generateSubscriptions() (subscription.List, error) {
var subscriptions subscription.List
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: wsMarkets,
})
// Ranges over available channels, pairs and asset types to produce a full
Expand All @@ -754,9 +754,9 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error
"-")
for x := range channels {
subscriptions = append(subscriptions,
subscription.Subscription{
&subscription.Subscription{
Channel: channels[x],
Pair: newPair,
Pair: currency.Pairs{newPair},
Asset: assets[a],
})
}
Expand All @@ -766,9 +766,7 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error
if f.IsWebsocketAuthenticationSupported() {
var authchan = []string{wsOrders, wsFills}
for x := range authchan {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: authchan[x],
})
subscriptions = append(subscriptions, &subscription.Subscription{Channel: authchan[x]})
}
}
return subscriptions, nil
Expand Down Expand Up @@ -809,7 +807,7 @@ type WsSub struct {

```go
// Subscribe sends a websocket message to receive data from the channel
func (f *FTX) Subscribe(channelsToSubscribe []subscription.Subscription) error {
func (f *FTX) Subscribe(channelsToSubscribe subscription.List) error {
// For subscriptions we try to batch as much as possible to limit the amount
// of connection usage but sometimes this is not supported on the exchange
// API.
Expand All @@ -825,13 +823,8 @@ channels:
case wsFills, wsOrders, wsMarkets:
// Authenticated wsFills && wsOrders or wsMarkets which is a channel subscription for the full set of tradable markets do not need a currency pair association.
default:
a, err := f.GetPairAssetType(channelsToSubscribe[i].Pair)
if err != nil {
errs = append(errs, err)
continue channels
}
// Ensures our outbound currency pair is formatted correctly, sometimes our configuration format is different from what our request format needs to be.
formattedPair, err := f.FormatExchangeCurrency(channelsToSubscribe[i].Pair, a)
formattedPair, err := f.FormatExchangeCurrency(channelsToSubscribe[i].Pair, channelsToSubscribe[i].Asset)
if err != nil {
errs = append(errs, err)
continue channels
Expand All @@ -846,10 +839,7 @@ channels:
// When we have a successful subscription, we can alert our internal management system of the success.
f.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
}
if errs != nil {
return errs
}
return nil
return errs
}
```

Expand Down Expand Up @@ -1063,7 +1053,7 @@ func (f *FTX) WsAuth(ctx context.Context) error {

```go
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (f *FTX) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error {
func (f *FTX) Unsubscribe(channelsToUnsubscribe subscription.List) error {
// As with subscribing we want to batch as much as possible, but sometimes this cannot be achieved due to API shortfalls.
var errs common.Errors
channels:
Expand All @@ -1074,13 +1064,7 @@ channels:
switch channelsToUnsubscribe[i].Channel {
case wsFills, wsOrders, wsMarkets:
default:
a, err := f.GetPairAssetType(channelsToUnsubscribe[i].Pair)
if err != nil {
errs = append(errs, err)
continue channels
}

formattedPair, err := f.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, a)
formattedPair, err := f.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, channelsToUnsubscribe[i].Asset)
if err != nil {
errs = append(errs, err)
continue channels
Expand Down Expand Up @@ -1135,8 +1119,8 @@ func (f *FTX) Setup(exch *config.Exchange) error {
Subscriber: f.Subscribe,
// Unsubscriber function outlined above.
UnSubscriber: f.Unsubscribe,
// GenerateDefaultSubscriptions function outlined above.
GenerateSubscriptions: f.GenerateDefaultSubscriptions,
// GenerateSubscriptions function outlined above.
GenerateSubscriptions: f.generateSubscriptions,
// Defines the capabilities of the websocket outlined in supported
// features struct. This allows the websocket connection to be flushed
// appropriately if we have a pair/asset enable/disable change. This is
Expand Down
2 changes: 1 addition & 1 deletion engine/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3121,7 +3121,7 @@ func (s *RPCServer) WebsocketGetSubscriptions(_ context.Context, r *gctrpc.Webso
payload.Subscriptions = append(payload.Subscriptions,
&gctrpc.WebsocketSubscription{
Channel: subs[i].Channel,
Pair: subs[i].Pair.String(),
Pairs: subs[i].Pairs.Join(),
Asset: subs[i].Asset.String(),
Params: string(params),
})
Expand Down
15 changes: 8 additions & 7 deletions exchanges/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ const (
Options
OptionCombo
FutureCombo

// Added to represent a USDT and USDC based linear derivatives(futures/perpetual) assets in Bybit V5.
LinearContract
LinearContract // Added to represent a USDT and USDC based linear derivatives(futures/perpetual) assets in Bybit V5
All

optionsFlag = OptionCombo | Options
futuresFlag = PerpetualContract | PerpetualSwap | Futures | DeliveryFutures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures | LinearContract | FutureCombo
Expand All @@ -70,6 +69,7 @@ const (
options = "options"
optionCombo = "option_combo"
futureCombo = "future_combo"
all = "all"
)

var (
Expand Down Expand Up @@ -120,6 +120,8 @@ func (a Item) String() string {
return optionCombo
case FutureCombo:
return futureCombo
case All:
return all
gbjk marked this conversation as resolved.
Show resolved Hide resolved
default:
return ""
}
Expand Down Expand Up @@ -225,11 +227,10 @@ func New(input string) (Item, error) {
return OptionCombo, nil
case futureCombo:
return FutureCombo, nil
case all:
gloriousCode marked this conversation as resolved.
Show resolved Hide resolved
return All, nil
default:
return 0, fmt.Errorf("%w '%v', only supports %s",
ErrNotSupported,
input,
supportedList)
return 0, fmt.Errorf("%w '%v', only supports %s", ErrNotSupported, input, supportedList)
}
}

Expand Down
14 changes: 7 additions & 7 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ func TestGetHistoricTrades(t *testing.T) {
if mockTests {
expected = 1002
}
assert.Equal(t, expected, len(result), "GetHistoricTrades should return correct number of entries") //nolint:testifylint // assert.Len doesn't produce clear messages on result
assert.Equal(t, expected, len(result), "GetHistoricTrades should return correct number of entries")
for _, r := range result {
if !assert.WithinRange(t, r.Timestamp, start, end, "All trades should be within time range") {
break
Expand Down Expand Up @@ -1982,7 +1982,7 @@ func BenchmarkWsHandleData(bb *testing.B) {
func TestSubscribe(t *testing.T) {
t.Parallel()
b := b
channels := []subscription.Subscription{
channels := subscription.List{
{Channel: "btcusdt@ticker"},
{Channel: "btcusdt@trade"},
}
Expand All @@ -2008,7 +2008,7 @@ func TestSubscribe(t *testing.T) {

func TestSubscribeBadResp(t *testing.T) {
t.Parallel()
channels := []subscription.Subscription{
channels := subscription.List{
{Channel: "moons@ticker"},
}
mock := func(msg []byte, w *websocket.Conn) error {
Expand Down Expand Up @@ -2434,19 +2434,19 @@ func TestSeedLocalCache(t *testing.T) {

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
expected := []subscription.Subscription{}
expected := subscription.List{}
pairs, err := b.GetEnabledPairs(asset.Spot)
assert.NoError(t, err, "GetEnabledPairs should not error")
for _, p := range pairs {
for _, c := range []string{"kline_1m", "depth@100ms", "ticker", "trade"} {
expected = append(expected, subscription.Subscription{
expected = append(expected, &subscription.Subscription{
Channel: p.Format(currency.PairFormat{Delimiter: "", Uppercase: false}).String() + "@" + c,
Pair: p,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
})
}
}
subs, err := b.GenerateSubscriptions()
subs, err := b.generateSubscriptions()
assert.NoError(t, err, "GenerateSubscriptions should not error")
if assert.Len(t, subs, len(expected), "Should have the correct number of subs") {
assert.ElementsMatch(t, subs, expected, "Should get the correct subscriptions")
Expand Down