Skip to content

Commit

Permalink
Completed adding european options public stream handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
samuael committed Apr 2, 2024
1 parent 2c19072 commit 2c6a1e9
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 32 deletions.
76 changes: 76 additions & 0 deletions exchanges/binance/binance_eoptions_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package binance

import (
"encoding/json"

"github.com/thrasher-corp/gocryptotrader/common/convert"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/types"
Expand Down Expand Up @@ -477,3 +479,77 @@ type WsOptionsKlineData struct {
TakerTradeAmount types.Number `json:"Q"`
} `json:"k"`
}

// WsOptionIncomingResp used by wsHandleEOptionsData
type WsOptionIncomingResp struct {
EventType string `json:"e"`
Result json.RawMessage `json:"result"`
ID int64 `json:"id"`
Stream string `json:"stream"`
Data json.RawMessage `json:"data"`
}

// WsOptionIncomingResps list of WsOptionIncomingResp
type WsOptionIncomingResps struct {
Instances []WsOptionIncomingResp

// To record the information about whther the incoming data was a slice or sing object instance.

Check failure on line 496 in exchanges/binance/binance_eoptions_types.go

View workflow job for this annotation

GitHub Actions / lint

`whther` is a misspelling of `whether` (misspell)

Check failure on line 496 in exchanges/binance/binance_eoptions_types.go

View workflow job for this annotation

GitHub Actions / Spell checker

whther ==> whether
// Reason: Some slices may have a single element, which creates uncertainity about whether the incoming data is slice or object instance.

Check failure on line 497 in exchanges/binance/binance_eoptions_types.go

View workflow job for this annotation

GitHub Actions / lint

`uncertainity` is a misspelling of `uncertainty` (misspell)

Check failure on line 497 in exchanges/binance/binance_eoptions_types.go

View workflow job for this annotation

GitHub Actions / Spell checker

uncertainity ==> uncertainty
IsSlice bool
}

// UnmarshalJSON deserializes incoming object or slice into WsOptionIncomingResps([]WsOptionIncomingResp) instance.
func (a *WsOptionIncomingResps) UnmarshalJSON(data []byte) error {
var resp []WsOptionIncomingResp
isSlice := true
err := json.Unmarshal(data, &resp)
if err != nil {
isSlice = false
var newResp WsOptionIncomingResp
err = json.Unmarshal(data, &newResp)
if err != nil {
return err
}
resp = append(resp, newResp)
}
a.Instances = resp
a.IsSlice = isSlice
return nil
}

// WsOpenInterest represents a single open interest instance.
type WsOpenInterest struct {
EventType string `json:"e"`
EventTime convert.ExchangeTime `json:"E"`
Symbol string `json:"s"`
OpenInterestInContract string `json:"o"` // Base
OpenInterestInUSDT string `json:"h"`
}

// WsOptionsNewPair represents a new options pair update information
type WsOptionsNewPair struct {
EventType string `json:"e"`
EventTime convert.ExchangeTime `json:"E"`
ID int64 `json:"id"`
UnderlyingAssetID int64 `json:"cid"`
UnderlyingIndexOfContract string `json:"u"`
QuotationAsset string `json:"qa"`
TradingPairName string `json:"s"`
Unit int `json:"unit"` // Conversion ratio, the quantity of the underlying asset represented by a single contract
MinimumTradeVolume string `json:"mq"` // Minimum trade volume of the underlying asset
OptionType string `json:"d"`
StrikePrice string `json:"sp"`
ExpirationTime convert.ExchangeTime `json:"ed"`
}

// WsOptionsOrderbook represents a partial orderbook websocket stream data
type WsOptionsOrderbook struct {
EventType string `json:"e"`
EventTime convert.ExchangeTime `json:"E"`
TransactionTime convert.ExchangeTime `json:"T"`
OptionSymbol string `json:"symbol"`
UpdateID int64 `json:"u"` // update id in event
PUpdateID int64 `json:"pu"` // same as update id in event
Bids [][2]types.Number `json:"b"` // 0: Price 1: Quantity
Asks [][2]types.Number `json:"a"`
}
135 changes: 103 additions & 32 deletions exchanges/binance/binance_eoptions_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
Expand Down Expand Up @@ -44,8 +45,8 @@ const (

// defaultEOptionsSubscriptions list of default subscription channels
var defaultEOptionsSubscriptions = []string{
cnlTicker,
cnlKline,
// cnlTicker,
// cnlKline,
cnlDepth,
}

Expand All @@ -58,7 +59,7 @@ func (b *Binance) WsOptionsConnect() error {
var dialer websocket.Dialer
dialer.HandshakeTimeout = b.Config.HTTPTimeout
dialer.Proxy = http.ProxyFromEnvironment
wsURL := eoptionsWebsocketURL
wsURL := eoptionsWebsocketURL + "stream"
err = b.Websocket.SetWebsocketURL(wsURL, false, false)
if err != nil {
b.Websocket.SetCanUseAuthenticatedEndpoints(false)
Expand All @@ -79,7 +80,6 @@ func (b *Binance) WsOptionsConnect() error {
}
}
}
wsURL += "ws/ETH-240628-800-C@ticker"
err = b.Websocket.SetWebsocketURL(wsURL, false, false)
if err != nil {
return err
Expand All @@ -100,9 +100,7 @@ func (b *Binance) WsOptionsConnect() error {
if err != nil {
return err
}
println("Length of Subscriptions: ", len(subscriptions))
return b.OptionSubscribe(subscriptions)
// return nil
}

func (b *Binance) handleEOptionsSubscriptions(operation string, subscs []subscription.Subscription) error {
Expand Down Expand Up @@ -145,7 +143,7 @@ func (b *Binance) handleEOptionsSubscriptions(operation string, subscs []subscri
level, okay := subscs[s].Params["level"].(string)
if !okay {
// deefault level set to 50
level = "50"
level = "10"
}
var intervalString string
if subscs[s].Interval != kline.Interval(0) {
Expand Down Expand Up @@ -190,7 +188,7 @@ func (b *Binance) OptionUnsubscribe(subscs []subscription.Subscription) error {

// GenerateEOptionsDefaultSubscriptions generates the default subscription set
func (b *Binance) GenerateEOptionsDefaultSubscriptions() ([]subscription.Subscription, error) {
var channels = defaultEOptionsSubscriptions
channels := defaultEOptionsSubscriptions
var subscriptions []subscription.Subscription
pairs, err := b.FetchTradablePairs(context.Background(), asset.Options)
if err != nil {
Expand Down Expand Up @@ -301,46 +299,117 @@ func (b *Binance) wsEOptionsFuturesReadData() {
}

func (b *Binance) wsHandleEOptionsData(respRaw []byte) error {
println("incoming: ", string(respRaw))
result := struct {
EventType string `json:"e"`
Result json.RawMessage `json:"result"`
ID int64 `json:"id"`
Stream string `json:"stream"`
Data json.RawMessage `json:"data"`
}{}
var result WsOptionIncomingResps
err := json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
if result.EventType == "" || (result.ID != 0 && result.Result != nil) {
if !b.Websocket.Match.IncomingWithData(result.ID, respRaw) {
if result.Instances[0].EventType == "" || (result.Instances[0].ID != 0 && result.Instances[0].Result != nil) {
if !b.Websocket.Match.IncomingWithData(result.Instances[0].ID, respRaw) {
return errors.New("Unhandled data: " + string(respRaw))
}
return nil
}
var stream string
switch result.Stream {
switch result.Instances[0].Stream {
case cnlTrade:
return b.processOptionsTradeStream(respRaw)
case cnlIndex:
return b.processOptionsIndexPrice(respRaw)
case "24hrTicker":
// TODO: slice and single object ticker are supposed to be handled
return b.processOptionsTicker(respRaw)
return b.processOptionsTicker(respRaw, result.IsSlice)
case "markPrice":
return b.processOptionsMarkPrices(respRaw)
case "kline":
return b.processOptionsKline(respRaw)
case "openInterest":
return b.processOptionsOpenInterest(respRaw)
case "option_pair":
return b.processOptionsPair(respRaw)
case "depth":
return b.processOptionsOrderbook(respRaw)
default:
stream = extractStreamInfo(result.Stream)
b.Websocket.DataHandler <- stream.UnhandledMessageWarning{
Message: string(respRaw),
}
return fmt.Errorf("unhandled stream data %s", string(respRaw))
}
}

// orderbookSnapshotLoadedPairsMap used for validation of whether the symbol has snapshot orderbook data in the buffer or not.
var orderbookSnapshotLoadedPairsMap = map[string]bool{}

func (b *Binance) processOptionsOrderbook(data []byte) error {
var resp WsOptionsOrderbook
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(resp.OptionSymbol)
if err != nil {
return err
}
asks := make([]orderbook.Item, len(resp.Asks))
for a := range resp.Asks {
asks[a].Price = resp.Asks[a][0].Float64()
asks[a].Amount = resp.Asks[a][1].Float64()
}
bids := make([]orderbook.Item, len(resp.Bids))
for b := range resp.Bids {
bids[b].Price = resp.Bids[b][0].Float64()
bids[b].Amount = resp.Bids[b][1].Float64()
}
if len(asks) == 0 && len(bids) == 0 {
return nil
}
okay := orderbookSnapshotLoadedPairsMap[resp.OptionSymbol]
if !okay {
err = b.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Pair: pair,
Exchange: b.Name,
Asset: asset.Options,
LastUpdated: resp.TransactionTime.Time(),
LastUpdateID: resp.UpdateID,
Asks: asks,
Bids: bids,
})
if err != nil {
return err
}
} else {
err = b.Websocket.Orderbook.Update(&orderbook.Update{
Pair: pair,
Asks: asks,
Bids: bids,
Asset: asset.Options,
UpdateID: resp.UpdateID,
UpdateTime: resp.TransactionTime.Time(),
})
if err != nil {
return err
}
}
return nil
}

// processOptionsPair new symbol listing stream
func (b *Binance) processOptionsPair(data []byte) error {
var resp WsOptionsNewPair
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
switch stream {
case "ticker":
case "index":
b.Websocket.DataHandler <- resp
return nil
}

func (b *Binance) processOptionsOpenInterest(data []byte) error {
var resp []WsOpenInterest
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
return fmt.Errorf("unhandled stream data %s", string(respRaw))
b.Websocket.DataHandler <- resp
return nil
}

func (b *Binance) processOptionsKline(data []byte) error {
Expand Down Expand Up @@ -390,18 +459,20 @@ func (b *Binance) processOptionsIndexPrice(data []byte) error {
return nil
}

func (b *Binance) processOptionsTicker(data []byte) error {
println("Handling Ticker")
func (b *Binance) processOptionsTicker(data []byte, isSlice bool) error {
var resp []OptionsTicker24Hr
err := json.Unmarshal(data, &resp)
if err != nil {
if isSlice {
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
} else {
respSingle := OptionsTicker24Hr{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
resp = append(resp, respSingle)
return err
}
for a := range resp {
pair, err := currency.NewPairFromString(resp[a].Symbol)
Expand Down

0 comments on commit 2c6a1e9

Please sign in to comment.