/
service.go
94 lines (81 loc) · 2.23 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package websocket
import (
"encoding/json"
"fmt"
appCfg "github.com/cosmos/cosmos-sdk/server/config"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/zenchainprotocol/zenchain-node/x/stream/types"
"github.com/tendermint/tendermint/libs/log"
)
// Engine
type Engine struct {
url string
logger log.Logger
}
func NewEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error) {
engine := &Engine{url: url, logger: logger}
return engine, nil
}
func (engine *Engine) URL() string {
return engine.url
}
func (engine *Engine) NewEvent(channel string, data interface{}) (sdk.Event, error) {
eventData, err := json.Marshal(data)
if err != nil {
return sdk.Event{}, err
}
return sdk.NewEvent(
eventTypeBackend,
sdk.NewAttribute("channel", channel),
sdk.NewAttribute("data", string(eventData)),
), nil
}
func (engine *Engine) Write(data types.IStreamData, success *bool) {
defer func() {
if e := recover(); e != nil {
*success = false
engine.logger.Error("error: WebSocketEngine Write", "err", e)
}
}()
wsData := data.(*PushData)
engine.logger.Debug(fmt.Sprintf("error: WebSocketEngine Write data:%v", wsData.RedisBlock))
events := sdk.Events{}
// 1. collect account events
for key, value := range wsData.AccountsMap {
channel := fmt.Sprintf("%s:%s", DexSpotAccount, key)
event, err := engine.NewEvent(channel, value)
if err != nil {
panic(err)
}
events = append(events, event)
}
// 2. collect order events
for key, value := range wsData.OrdersMap {
channel := fmt.Sprintf("%s:%s", DexSpotOrder, key)
event, err := engine.NewEvent(channel, value)
if err != nil {
panic(err)
}
events = append(events, event)
}
// 3. collect matches events
for key, value := range wsData.MatchesMap {
channel := fmt.Sprintf("%s:%s", DexSpotMatch, key)
event, err := engine.NewEvent(channel, value)
if err != nil {
panic(err)
}
events = append(events, event)
}
// 4. collect depth_book events
for key, value := range wsData.DepthBooksMap {
channel := fmt.Sprintf("%s:%s", DexSpotDepthBook, key)
event, err := engine.NewEvent(channel, value)
if err != nil {
panic(err)
}
events = append(events, event)
}
wsData.eventMgr.EmitEvents(events)
*success = true
}