-
Notifications
You must be signed in to change notification settings - Fork 790
/
trade.go
304 lines (281 loc) · 8.23 KB
/
trade.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
package trade
import (
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/database"
tradesql "github.com/thrasher-corp/gocryptotrader/database/repository/trade"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/log"
)
// Setup creates the trade processor if trading is supported
func (p *Processor) setup(wg *sync.WaitGroup) {
p.mutex.Lock()
p.bufferProcessorInterval = BufferProcessorIntervalTime
p.mutex.Unlock()
go p.Run(wg)
}
// Setup configures necessary fields to the `Trade` structure that govern trade data
// processing.
func (t *Trade) Setup(exchangeName string, tradeFeedEnabled bool, c chan interface{}) {
t.exchangeName = exchangeName
t.dataHandler = c
t.tradeFeedEnabled = tradeFeedEnabled
}
// Update processes trade data, either by saving it or routing it through
// the data channel.
func (t *Trade) Update(save bool, data ...Data) error {
if len(data) == 0 {
// nothing to do
return nil
}
if t.tradeFeedEnabled {
t.dataHandler <- data
}
if save {
if err := AddTradesToBuffer(t.exchangeName, data...); err != nil {
return err
}
}
return nil
}
// AddTradesToBuffer will push trade data onto the buffer
func AddTradesToBuffer(exchangeName string, data ...Data) error {
cfg := database.DB.GetConfig()
if database.DB == nil || cfg == nil || !cfg.Enabled {
return nil
}
if len(data) == 0 {
return nil
}
if atomic.AddInt32(&processor.started, 0) == 0 {
var wg sync.WaitGroup
wg.Add(1)
processor.setup(&wg)
wg.Wait()
}
validDatas := make([]Data, 0, len(data))
var errs error
for i := range data {
if data[i].Price == 0 ||
data[i].Amount == 0 ||
data[i].CurrencyPair.IsEmpty() ||
data[i].Exchange == "" ||
data[i].Timestamp.IsZero() {
errs = common.AppendError(errs, fmt.Errorf("%v received invalid trade data: %+v", exchangeName, data[i]))
continue
}
if data[i].Price < 0 {
data[i].Price *= -1
data[i].Side = order.Sell
}
if data[i].Amount < 0 {
data[i].Amount *= -1
data[i].Side = order.Sell
}
if data[i].Side == order.Bid {
data[i].Side = order.Buy
}
if data[i].Side == order.Ask {
data[i].Side = order.Sell
}
uu, err := uuid.NewV4()
if err != nil {
errs = common.AppendError(errs, fmt.Errorf("%s uuid failed to generate for trade: %+v", exchangeName, data[i]))
}
data[i].ID = uu
validDatas = append(validDatas, data[i])
}
processor.mutex.Lock()
processor.buffer = append(processor.buffer, validDatas...)
processor.mutex.Unlock()
return errs
}
// Run will save trade data to the database in batches
func (p *Processor) Run(wg *sync.WaitGroup) {
wg.Done()
if !atomic.CompareAndSwapInt32(&p.started, 0, 1) {
log.Errorln(log.Trade, "trade processor already started")
return
}
defer func() {
atomic.CompareAndSwapInt32(&p.started, 1, 0)
}()
p.mutex.Lock()
ticker := time.NewTicker(p.bufferProcessorInterval)
p.mutex.Unlock()
for {
<-ticker.C
p.mutex.Lock()
//nolint: gocritic
bufferCopy := append(p.buffer[:0:0], p.buffer...)
p.buffer = nil
p.mutex.Unlock()
if len(bufferCopy) == 0 {
ticker.Stop()
return
}
err := SaveTradesToDatabase(bufferCopy...)
if err != nil {
log.Errorln(log.Trade, err)
}
}
}
// SaveTradesToDatabase converts trades and saves results to database
func SaveTradesToDatabase(trades ...Data) error {
sqlTrades, err := tradeToSQLData(trades...)
if err != nil {
return err
}
return tradesql.Insert(sqlTrades...)
}
// GetTradesInRange calls db function to return trades in range
// to minimise tradesql package usage
func GetTradesInRange(exchangeName, assetType, base, quote string, startDate, endDate time.Time) ([]Data, error) {
if exchangeName == "" || assetType == "" || base == "" || quote == "" || startDate.IsZero() || endDate.IsZero() {
return nil, errors.New("invalid arguments received")
}
if !database.DB.IsConnected() {
return nil, fmt.Errorf("cannot process trades in range %s-%s as %w", startDate, endDate, database.ErrDatabaseNotConnected)
}
results, err := tradesql.GetInRange(exchangeName, assetType, base, quote, startDate, endDate)
if err != nil {
return nil, err
}
return SQLDataToTrade(results...)
}
// HasTradesInRanges Creates an executes an SQL query to verify if a trade exists within a timeframe
func HasTradesInRanges(exchangeName, assetType, base, quote string, rangeHolder *kline.IntervalRangeHolder) error {
if exchangeName == "" || assetType == "" || base == "" || quote == "" {
return errors.New("invalid arguments received")
}
return tradesql.VerifyTradeInIntervals(exchangeName, assetType, base, quote, rangeHolder)
}
func tradeToSQLData(trades ...Data) ([]tradesql.Data, error) {
sort.Sort(ByDate(trades))
results := make([]tradesql.Data, len(trades))
for i := range trades {
tradeID, err := uuid.NewV4()
if err != nil {
return nil, err
}
results[i] = tradesql.Data{
ID: tradeID.String(),
Timestamp: trades[i].Timestamp,
Exchange: trades[i].Exchange,
Base: trades[i].CurrencyPair.Base.String(),
Quote: trades[i].CurrencyPair.Quote.String(),
AssetType: trades[i].AssetType.String(),
Price: trades[i].Price,
Amount: trades[i].Amount,
Side: trades[i].Side.String(),
TID: trades[i].TID,
}
}
return results, nil
}
// SQLDataToTrade converts sql data to glorious trade data
func SQLDataToTrade(dbTrades ...tradesql.Data) ([]Data, error) {
result := make([]Data, len(dbTrades))
for i := range dbTrades {
cp, err := currency.NewPairFromStrings(dbTrades[i].Base, dbTrades[i].Quote)
if err != nil {
return nil, err
}
a, err := asset.New(dbTrades[i].AssetType)
if err != nil {
return nil, err
}
s, err := order.StringToOrderSide(dbTrades[i].Side)
if err != nil {
return nil, err
}
result[i] = Data{
ID: uuid.FromStringOrNil(dbTrades[i].ID),
Timestamp: dbTrades[i].Timestamp.UTC(),
Exchange: dbTrades[i].Exchange,
CurrencyPair: cp.Upper(),
AssetType: a,
Price: dbTrades[i].Price,
Amount: dbTrades[i].Amount,
Side: s,
}
}
return result, nil
}
// ConvertTradesToCandles turns trade data into kline.Items
func ConvertTradesToCandles(interval kline.Interval, trades ...Data) (*kline.Item, error) {
if len(trades) == 0 {
return nil, ErrNoTradesSupplied
}
groupedData := groupTradesToInterval(interval, trades...)
candles := kline.Item{
Exchange: trades[0].Exchange,
Pair: trades[0].CurrencyPair,
Asset: trades[0].AssetType,
Interval: interval,
}
for k, v := range groupedData {
candles.Candles = append(candles.Candles, classifyOHLCV(time.Unix(k, 0), v...))
}
return &candles, nil
}
func groupTradesToInterval(interval kline.Interval, times ...Data) map[int64][]Data {
groupedData := make(map[int64][]Data)
for i := range times {
nearestInterval := getNearestInterval(times[i].Timestamp, interval)
groupedData[nearestInterval] = append(
groupedData[nearestInterval],
times[i],
)
}
return groupedData
}
func getNearestInterval(t time.Time, interval kline.Interval) int64 {
return t.Truncate(interval.Duration()).UTC().Unix()
}
func classifyOHLCV(t time.Time, datas ...Data) (c kline.Candle) {
sort.Sort(ByDate(datas))
c.Open = datas[0].Price
c.Close = datas[len(datas)-1].Price
for i := range datas {
if datas[i].Price < 0 {
datas[i].Price *= -1
}
if datas[i].Amount < 0 {
datas[i].Amount *= -1
}
if datas[i].Price < c.Low || c.Low == 0 {
c.Low = datas[i].Price
}
if datas[i].Price > c.High {
c.High = datas[i].Price
}
c.Volume += datas[i].Amount
}
c.Time = t
return c
}
// FilterTradesByTime removes any trades that are not between the start
// and end times
func FilterTradesByTime(trades []Data, startTime, endTime time.Time) []Data {
if startTime.IsZero() || endTime.IsZero() {
// can't filter without boundaries
return trades
}
var filteredTrades []Data
for i := range trades {
if trades[i].Timestamp.After(startTime) && trades[i].Timestamp.Before(endTime) {
filteredTrades = append(filteredTrades, trades[i])
}
}
return filteredTrades
}