-
Notifications
You must be signed in to change notification settings - Fork 126
/
price_cache.go
130 lines (108 loc) · 3.38 KB
/
price_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package quotes
import (
"time"
"github.com/shopspring/decimal"
"github.com/layer-3/clearsync/pkg/safe"
)
type priceCacheTrade struct {
Price decimal.Decimal
Volume decimal.Decimal
Weight decimal.Decimal
Source DriverType
Timestamp time.Time
}
type marketKey struct {
baseUnit string
quoteUnit string
}
type PriceCache struct {
weights safe.Map[DriverType, decimal.Decimal]
market safe.Map[marketKey, []priceCacheTrade]
lastPrice safe.Map[marketKey, decimal.Decimal]
nTrades int
bufferTime time.Duration
}
// newPriceCache initializes a new cache to store last n trades for each market.
func newPriceCache(driversWeights map[DriverType]decimal.Decimal, nTrades int, bufferTime time.Duration) *PriceCache {
cache := new(PriceCache)
cache.market = safe.NewMap[marketKey, []priceCacheTrade]()
cache.weights = safe.NewMapWithData(driversWeights)
cache.lastPrice = safe.NewMap[marketKey, decimal.Decimal]()
cache.nTrades = nTrades
cache.bufferTime = bufferTime
return cache
}
// AddTrade adds a new trade to the cache for a market.
func (p *PriceCache) AddTrade(market Market, price, volume decimal.Decimal, timestamp time.Time, source DriverType) {
key := marketKey{baseUnit: market.baseUnit, quoteUnit: market.quoteUnit}
p.market.UpdateInTx(func(m map[marketKey][]priceCacheTrade) {
driversTrades, ok := m[key]
if !ok {
driversTrades = []priceCacheTrade{}
}
newTradesList := []priceCacheTrade{{Price: price, Volume: volume, Weight: decimal.Zero, Timestamp: timestamp, Source: source}}
// transfer all existing trades to a new array
for _, t := range driversTrades {
if t.Source != source && time.Now().Sub(t.Timestamp) <= p.bufferTime {
newTradesList = append(newTradesList, t)
}
}
totalWeights := decimal.Zero
for _, t := range newTradesList {
w, ok := p.weights.Load(t.Source)
if !ok {
continue
}
totalWeights = totalWeights.Add(w)
}
var tradesList []priceCacheTrade
for _, t := range newTradesList {
w, ok := p.weights.Load(t.Source)
if !ok {
continue
}
if totalWeights != decimal.Zero {
t.Weight = w.Div(totalWeights)
tradesList = append(tradesList, t)
}
}
m[key] = tradesList
})
}
func (p *PriceCache) setLastPrice(market Market, newPrice decimal.Decimal) {
key := marketKey{baseUnit: market.baseUnit, quoteUnit: market.quoteUnit}
p.lastPrice.UpdateInTx(func(m map[marketKey]decimal.Decimal) {
m[key] = newPrice
})
}
func (p *PriceCache) getLastPrice(market Market) decimal.Decimal {
record, ok := p.lastPrice.Load(marketKey{baseUnit: market.baseUnit, quoteUnit: market.quoteUnit})
if !ok {
return decimal.Zero
}
return record
}
func (p *PriceCache) GetIndexPrice(event *TradeEvent) (decimal.Decimal, bool) {
trades, ok := p.market.Load(marketKey{baseUnit: event.Market.baseUnit, quoteUnit: event.Market.quoteUnit})
if !ok || len(trades) == 0 {
return event.Price, false
}
top := decimal.Zero
bottom := decimal.Zero
for _, t := range trades {
top = top.Add(t.Price.Mul(t.Weight))
bottom = bottom.Add(t.Weight)
}
if bottom.Equal(decimal.Zero) {
return decimal.Zero, false
}
quotePrice := decimal.NewFromInt(1)
if event.Market.convertTo != "" {
event.Market = Market{baseUnit: event.Market.quoteUnit, quoteUnit: event.Market.convertTo}
quotePrice, ok = p.GetIndexPrice(event)
if !ok {
return decimal.Zero, false
}
}
return top.Div(bottom).Mul(quotePrice), true
}