-
Notifications
You must be signed in to change notification settings - Fork 0
/
board.go
152 lines (126 loc) · 3.76 KB
/
board.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package bitflyerspider
import (
"fmt"
"github.com/mitsutoshi/bitflyergo"
"github.com/mitsutoshi/bitflyerspider/lib/bq"
"math"
"sort"
"sync"
"time"
)
type BoardCollector struct {
Asks map[float64]float64
Bids map[float64]float64
AskPrices []float64
BidPrices []float64
BoardMutex *sync.Mutex
SummaryPerSec []*bq.BqBoard
AggInterval time.Duration
}
func NewCollector() *BoardCollector {
return &BoardCollector{
Asks: map[float64]float64{},
Bids: map[float64]float64{},
BoardMutex: &sync.Mutex{},
AggInterval: 1 * time.Second,
}
}
func (c *BoardCollector) BestAskPrice() float64 {
if len(c.AskPrices) > 0 {
return c.AskPrices[0]
}
return 0
}
func (c *BoardCollector) BestBidPrice() float64 {
if len(c.BidPrices) > 0 {
return c.BidPrices[0]
}
return 0
}
func (c *BoardCollector) Spread() float64 {
return c.BestAskPrice() - c.BestBidPrice()
}
func (c *BoardCollector) MidPrice() float64 {
return c.BestBidPrice() + math.Round(c.Spread()/2)
}
// Update board
func (c *BoardCollector) UpdateBoard(newBoard *bitflyergo.Board, refresh bool) {
c.BoardMutex.Lock()
defer c.BoardMutex.Unlock()
// refreshが指定された場合は既存の板を破棄して作り直す
if refresh {
c.Asks = map[float64]float64{}
c.Bids = map[float64]float64{}
}
// Update Asks
for price, size := range newBoard.Asks {
if size > 0 {
// Add or Update
c.Asks[price] = size
} else if _, ok := c.Asks[price]; ok {
// Delete
delete(c.Asks, price)
}
}
// Update Bids
for price, size := range newBoard.Bids {
if size > 0 {
// Add or Update
c.Bids[price] = size
} else if _, ok := c.Bids[price]; ok {
// Delete
delete(c.Bids, price)
}
}
// Update best ask, best bid, mid price and spread.
if len(c.Asks) > 0 {
c.sortAsks()
}
if len(c.Bids) > 0 {
c.sortBids()
}
}
func (c *BoardCollector) sortAsks() {
c.AskPrices = make([]float64, 0, len(c.Asks))
for k := range c.Asks {
c.AskPrices = append(c.AskPrices, k)
}
sort.Sort(sort.Float64Slice(c.AskPrices))
}
func (c *BoardCollector) sortBids() {
c.BidPrices = make([]float64, 0, len(c.Bids))
for k := range c.Bids {
c.BidPrices = append(c.BidPrices, k)
}
sort.Sort(sort.Reverse(sort.Float64Slice(c.BidPrices)))
}
func (c *BoardCollector) Agg() {
const interval = 200 * time.Millisecond
nextTime := time.Now()
for {
time.Sleep(interval)
// 集計時刻を過ぎたら集計して出力
if time.Now().After(nextTime) {
// BigQueryレコードの型に変換
c.BoardMutex.Lock()
askPrice := c.BestAskPrice()
bidPrice := c.BestBidPrice()
if askPrice > 0 && bidPrice > 0 {
c.SummaryPerSec = append(c.SummaryPerSec, &bq.BqBoard{
Time: nextTime,
MidPrice: int(c.MidPrice()),
BestAskPrice: int(askPrice),
BestBidPrice: int(bidPrice),
BestAskSize: c.Asks[askPrice],
BestBidSize: c.Bids[bidPrice],
Spread: int(c.Spread()),
})
}
c.BoardMutex.Unlock()
fmt.Printf("%v -> mid: %v, ask: %v, bid: %v, spread: %v\n",
nextTime.Format("2006-01-02 15:04:05"), c.MidPrice(), askPrice, bidPrice, c.Spread())
// 次の集計時刻を計算
nextTime = nextTime.Add(c.AggInterval)
}
}
}