/
fill_executor.go
111 lines (95 loc) · 2.2 KB
/
fill_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package worker
import (
"encoding/json"
"github.com/irononet/go-exchange/conf"
"github.com/irononet/go-exchange/entities"
"github.com/irononet/go-exchange/service"
"github.com/go-redis/redis"
lru "github.com/hashicorp/golang-lru"
"github.com/siddontang/go-log/log"
"time"
)
const FILL_WORKER_NUM = 10
type FillExecutor struct{
WorkerChs [FILL_WORKER_NUM]chan *entities.Fill
}
func NewFillExecutor() *FillExecutor{
f := &FillExecutor{
WorkerChs: [FILL_WORKER_NUM]chan *entities.Fill{},
}
for i := 0; i < FILL_WORKER_NUM; i++{
f.WorkerChs[i] = make(chan *entities.Fill, 512)
go func(idx int){
settleOrderCache, err := lru.New(1000)
if err != nil{
panic(err)
}
for{
select{
case fill := <- f.WorkerChs[idx]:
if settleOrderCache.Contains(fill.OrderId){
continue
}
order, err := service.GetOrderById(fill.OrderId)
if err != nil{
log.Error(err)
}
if order == nil{
log.Warnf("order not found: %v", fill.OrderId)
continue
}
if order.Status == entities.OrderStatusCancelled || order.Status == entities.OrderStatusFilled{
settleOrderCache.Add(order.ID, struct{}{})
continue
}
err = service.ExecuteFill(fill.OrderId)
if err != nil{
log.Error(err)
}
}
}
}(i)
}
return f
}
func (s *FillExecutor) Start(){
go s.runInspector()
go s.runMqListener()
}
func (s *FillExecutor) runMqListener(){
config := conf.GetConfig()
redisClient := redis.NewClient(&redis.Options{
Addr: config.Redis.Addr,
Password: config.Redis.Password,
DB: 0,
})
for{
ret := redisClient.BRPop(time.Second*1000, entities.TopicFill)
if ret.Err() != nil{
log.Error(ret.Err())
continue
}
var fill entities.Fill
err := json.Unmarshal([]byte(ret.Val()[1]), &fill)
if err != nil{
log.Error(err)
continue
}
s.WorkerChs[fill.OrderId%FILL_WORKER_NUM] <- &fill
}
}
func (s *FillExecutor) runInspector(){
for{
select{
case <- time.After(1 * time.Second):
fills, err := service.GetUnsettledFills(1000)
if err != nil{
log.Error(err)
continue
}
for _, fill := range fills{
s.WorkerChs[fill.OrderId%FILL_WORKER_NUM] <- fill
}
}
}
}