-
Notifications
You must be signed in to change notification settings - Fork 17
/
service_context.go
132 lines (125 loc) · 4.36 KB
/
service_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package svc
import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/luxun9527/gex/app/account/rpc/internal/config"
"github.com/luxun9527/gex/app/account/rpc/internal/dao/query"
"github.com/luxun9527/gex/common/pkg/confx"
pulsarConfig "github.com/luxun9527/gex/common/pkg/pulsar"
"github.com/luxun9527/gex/common/proto/define"
"github.com/luxun9527/gex/common/utils"
logger "github.com/luxun9527/zaplog"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"gopkg.in/yaml.v3"
"sync"
)
type ServiceContext struct {
Config config.Config
Query *query.Query
MatchConsumerList []pulsar.Consumer
JwtClient *utils.JWT
RedisClient *redis.Redis
Coins *sync.Map
}
func NewServiceContext(c config.Config) *ServiceContext {
logger.InitZapLogger(&c.LoggerConfig)
logx.SetWriter(logger.NewZapWriter(logger.GetZapLogger()))
logx.DisableStat()
var symbolConfig sync.Map
// 从etcd中取出交易对配置。
confx.MustLoadFromEtcd(define.EtcdSymbolPrefix, c.SymbolEtcdConfig, &symbolConfig, confx.WithCustomInitLoadFunc(func(kvs []*mvccpb.KeyValue, target any) {
for _, v := range kvs {
var s define.SymbolInfo
if err := yaml.Unmarshal(v.Value, &s); err != nil {
logx.Severef("get symbol config failed symbolInfo = %v", define.EtcdSymbolPrefix)
}
s.QuoteCoinPrec.Store(s.QuoteCoinPrecValue)
s.BaseCoinPrec.Store(s.BaseCoinPrecValue)
symbolConfig.Store(s.SymbolName, &s)
}
}), confx.WithCustomWatchFunc(func(evs []*clientv3.Event, target any) {
for _, v := range evs {
switch v.Type {
case mvccpb.PUT: //修改或者新增
var s define.SymbolInfo
if err := yaml.Unmarshal(v.Kv.Value, &s); err != nil {
logx.Errorf("get symbol config failed symbolInfo =%v", s)
}
s.QuoteCoinPrec.Store(s.QuoteCoinPrecValue)
s.BaseCoinPrec.Store(s.BaseCoinPrecValue)
symbolConfig.Store(s.SymbolName, &s)
case mvccpb.DELETE: //删除
var s define.SymbolInfo
if err := yaml.Unmarshal(v.Kv.Value, &s); err != nil {
logx.Errorf("get symbol config failed symbolInfo =%v", s)
}
symbolConfig.Delete(s.SymbolName)
logx.Sloww("warn symbol config deleted")
}
}
}))
var coinConfig sync.Map
// 从etcd中取出币种配置。。
confx.MustLoadFromEtcd(define.EtcdCoinPrefix, c.SymbolEtcdConfig, &symbolConfig, confx.WithCustomInitLoadFunc(func(kvs []*mvccpb.KeyValue, target any) {
for _, v := range kvs {
var s define.CoinInfo
if err := yaml.Unmarshal(v.Value, &s); err != nil {
logx.Severef("get symbol config failed symbolInfo = %v", string(v.Value))
}
coinConfig.Store(s.CoinName, &s)
}
}), confx.WithCustomWatchFunc(func(evs []*clientv3.Event, target any) {
for _, v := range evs {
switch v.Type {
case mvccpb.PUT: //修改或者新增
var s define.CoinInfo
if err := yaml.Unmarshal(v.Kv.Value, &s); err != nil {
logx.Errorf("get symbol config failed symbolInfo =%v", s)
}
coinConfig.Store(s.CoinName, &s)
case mvccpb.DELETE: //删除
var s define.CoinInfo
if err := yaml.Unmarshal(v.Kv.Value, &s); err != nil {
logx.Errorf("get symbol config failed symbolInfo =%v", s)
}
coinConfig.Delete(s.CoinName)
logx.Sloww("warn coin config deleted")
}
}
}))
client, err := c.PulsarConfig.BuildClient()
if err != nil {
logx.Severef("init pulsar client failed %v", err)
}
consumers := make([]pulsar.Consumer, 0, 10)
symbolConfig.Range(func(key, value any) bool {
symbolInfo := value.(*define.SymbolInfo)
topic := pulsarConfig.Topic{
Tenant: pulsarConfig.PublicTenant,
Namespace: pulsarConfig.GexNamespace,
Topic: pulsarConfig.MatchResultTopic + "_" + symbolInfo.SymbolName,
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic.BuildTopic(),
SubscriptionName: pulsarConfig.MatchResultAccountSub,
Type: pulsar.Shared,
})
if err != nil {
logx.Severef("init pulsar consumer failed %v", err)
}
consumers = append(consumers, consumer)
return true
})
q := query.Use(c.GormConf.MustNewGormClient())
sc := &ServiceContext{
Config: c,
Query: q,
MatchConsumerList: consumers,
JwtClient: utils.NewJWT(),
RedisClient: redis.MustNewRedis(c.RedisConf),
Coins: &coinConfig,
}
return sc
}