forked from thrasher-corp/gocryptotrader
/
orderbook.go
271 lines (232 loc) · 6.8 KB
/
orderbook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package orderbook
import (
"errors"
"fmt"
"sort"
"strings"
"time"
"github.com/gofrs/uuid"
"github.com/nbltrust/gocryptotrader/currency"
"github.com/nbltrust/gocryptotrader/dispatch"
"github.com/nbltrust/gocryptotrader/exchanges/asset"
)
// Get checks and returns the orderbook given an exchange name and currency pair
// if it exists
func Get(exchange string, p currency.Pair, a asset.Item) (*Base, error) {
o, err := service.Retrieve(exchange, p, a)
if err != nil {
return nil, err
}
return o, nil
}
// SubscribeOrderbook subcribes to an orderbook and returns a communication
// channel to stream orderbook data updates
func SubscribeOrderbook(exchange string, p currency.Pair, a asset.Item) (dispatch.Pipe, error) {
exchange = strings.ToLower(exchange)
service.RLock()
defer service.RUnlock()
book, ok := service.Books[exchange][p.Base.Item][p.Quote.Item][a]
if !ok {
return dispatch.Pipe{}, fmt.Errorf("orderbook item not found for %s %s %s",
exchange,
p,
a)
}
return service.mux.Subscribe(book.Main)
}
// SubscribeToExchangeOrderbooks subcribes to all orderbooks on an exchange
func SubscribeToExchangeOrderbooks(exchange string) (dispatch.Pipe, error) {
exchange = strings.ToLower(exchange)
service.RLock()
defer service.RUnlock()
id, ok := service.Exchange[exchange]
if !ok {
return dispatch.Pipe{}, fmt.Errorf("%s exchange orderbooks not found",
exchange)
}
return service.mux.Subscribe(id)
}
// Update stores orderbook data
func (s *Service) Update(b *Base) error {
var ids []uuid.UUID
s.Lock()
switch {
case s.Books[b.ExchangeName] == nil:
s.Books[b.ExchangeName] = make(map[*currency.Item]map[*currency.Item]map[asset.Item]*Book)
s.Books[b.ExchangeName][b.Pair.Base.Item] = make(map[*currency.Item]map[asset.Item]*Book)
s.Books[b.ExchangeName][b.Pair.Base.Item][b.Pair.Quote.Item] = make(map[asset.Item]*Book)
err := s.SetNewData(b)
if err != nil {
s.Unlock()
return err
}
case s.Books[b.ExchangeName][b.Pair.Base.Item] == nil:
s.Books[b.ExchangeName][b.Pair.Base.Item] = make(map[*currency.Item]map[asset.Item]*Book)
s.Books[b.ExchangeName][b.Pair.Base.Item][b.Pair.Quote.Item] = make(map[asset.Item]*Book)
err := s.SetNewData(b)
if err != nil {
s.Unlock()
return err
}
case s.Books[b.ExchangeName][b.Pair.Base.Item][b.Pair.Quote.Item] == nil:
s.Books[b.ExchangeName][b.Pair.Base.Item][b.Pair.Quote.Item] = make(map[asset.Item]*Book)
err := s.SetNewData(b)
if err != nil {
s.Unlock()
return err
}
case s.Books[b.ExchangeName][b.Pair.Base.Item][b.Pair.Quote.Item][b.AssetType] == nil:
err := s.SetNewData(b)
if err != nil {
s.Unlock()
return err
}
default:
book := s.Books[b.ExchangeName][b.Pair.Base.Item][b.Pair.Quote.Item][b.AssetType]
book.b.Bids = b.Bids
book.b.Asks = b.Asks
book.b.LastUpdated = b.LastUpdated
ids = book.Assoc
ids = append(ids, book.Main)
}
s.Unlock()
return s.mux.Publish(ids, b)
}
// SetNewData sets new data
func (s *Service) SetNewData(b *Base) error {
ids, err := s.GetAssociations(b)
if err != nil {
return err
}
singleID, err := s.mux.GetID()
if err != nil {
return err
}
// Below instigates orderbook item separation so we can ensure, in the event
// of a simultaneous update via websocket/rest/fix, we don't affect package
// scoped orderbook data which could result in a potential panic
cpyBook := *b
cpyBook.Bids = make([]Item, len(b.Bids))
copy(cpyBook.Bids, b.Bids)
cpyBook.Asks = make([]Item, len(b.Asks))
copy(cpyBook.Asks, b.Asks)
s.Books[b.ExchangeName][b.Pair.Base.Item][b.Pair.Quote.Item][b.AssetType] = &Book{
b: &cpyBook,
Main: singleID,
Assoc: ids}
return nil
}
// GetAssociations links a singular book with it's dispatch associations
func (s *Service) GetAssociations(b *Base) ([]uuid.UUID, error) {
if b == nil {
return nil, errors.New("orderbook is nil")
}
var ids []uuid.UUID
exchangeID, ok := s.Exchange[b.ExchangeName]
if !ok {
var err error
exchangeID, err = s.mux.GetID()
if err != nil {
return nil, err
}
s.Exchange[b.ExchangeName] = exchangeID
}
ids = append(ids, exchangeID)
return ids, nil
}
// Retrieve gets orderbook data from the slice
func (s *Service) Retrieve(exchange string, p currency.Pair, a asset.Item) (*Base, error) {
exchange = strings.ToLower(exchange)
s.RLock()
defer s.RUnlock()
if s.Books[exchange] == nil {
return nil, fmt.Errorf("no orderbooks for %s exchange", exchange)
}
if s.Books[exchange][p.Base.Item] == nil {
return nil, fmt.Errorf("no orderbooks associated with base currency %s",
p.Base)
}
if s.Books[exchange][p.Base.Item][p.Quote.Item] == nil {
return nil, fmt.Errorf("no orderbooks associated with quote currency %s",
p.Quote)
}
if s.Books[exchange][p.Base.Item][p.Quote.Item][a] == nil {
return nil, fmt.Errorf("no orderbooks associated with asset type %s",
a)
}
return s.Books[exchange][p.Base.Item][p.Quote.Item][a].b, nil
}
// TotalBidsAmount returns the total amount of bids and the total orderbook
// bids value
func (b *Base) TotalBidsAmount() (amountCollated, total float64) {
for x := range b.Bids {
amountCollated += b.Bids[x].Amount
total += b.Bids[x].Amount * b.Bids[x].Price
}
return amountCollated, total
}
// TotalAsksAmount returns the total amount of asks and the total orderbook
// asks value
func (b *Base) TotalAsksAmount() (amountCollated, total float64) {
for y := range b.Asks {
amountCollated += b.Asks[y].Amount
total += b.Asks[y].Amount * b.Asks[y].Price
}
return amountCollated, total
}
// Update updates the bids and asks
func (b *Base) Update(bids, asks []Item) {
b.Bids = bids
b.Asks = asks
b.LastUpdated = time.Now()
}
// Verify ensures that the orderbook items are correctly sorted
// Bids should always go from a high price to a low price and
// asks should always go from a low price to a higher price
func (b *Base) Verify() {
var lastPrice float64
var sortBids, sortAsks bool
for x := range b.Bids {
if lastPrice != 0 && b.Bids[x].Price >= lastPrice {
sortBids = true
break
}
lastPrice = b.Bids[x].Price
}
lastPrice = 0
for x := range b.Asks {
if lastPrice != 0 && b.Asks[x].Price <= lastPrice {
sortAsks = true
break
}
lastPrice = b.Asks[x].Price
}
if sortBids {
sort.Sort(sort.Reverse(byOBPrice(b.Bids)))
}
if sortAsks {
sort.Sort((byOBPrice(b.Asks)))
}
}
// Process processes incoming orderbooks, creating or updating the orderbook
// list
func (b *Base) Process() error {
if b.ExchangeName == "" {
return errors.New(errExchangeNameUnset)
}
b.ExchangeName = strings.ToLower(b.ExchangeName)
if b.Pair.IsEmpty() {
return errors.New(errPairNotSet)
}
if b.AssetType.String() == "" {
return errors.New(errAssetTypeNotSet)
}
if len(b.Asks) == 0 && len(b.Bids) == 0 {
return errors.New(errNoOrderbook)
}
if b.LastUpdated.IsZero() {
b.LastUpdated = time.Now()
}
b.Verify()
return service.Update(b)
}