forked from spacecodewor/fmpcloud-go
/
websocket_client.go
143 lines (115 loc) · 3.19 KB
/
websocket_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package fmpcloud
import (
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// Core params
const (
WebsocketStock WebsocketURL = "wss://websockets.financialmodelingprep.com"
WebsocketCrypto WebsocketURL = "wss://crypto.financialmodelingprep.com"
WebsocketForex WebsocketURL = "wss://forex.financialmodelingprep.com"
)
// WebsocketURL type for websocket url
type WebsocketURL string
// WebsocketConfig for create new Websocket client
type WebsocketConfig struct {
Logger *zap.Logger
APIKey string
URL WebsocketURL
Debug bool
}
// WebsocketClient ...
type WebsocketClient struct {
conn *websocket.Conn
apiKey string
logger *zap.Logger
debug bool
}
// Event ...
type Event struct {
Event string `json:"event"`
Message string `json:"message"`
Status int `json:"status"`
Symbol string `json:"s"`
Type string `json:"type"`
Time int64 `json:"t"`
Ap *float64 `json:"ap"`
As *float64 `json:"as"`
Bp *float64 `json:"bp"`
Bs *float64 `json:"bs"`
Lp *float64 `json:"lp"`
Ls *float64 `json:"ls"`
}
// NewWebsocketClient creates a new API client
func NewWebsocketClient(cfg WebsocketConfig) (*WebsocketClient, error) {
websocketClient := &WebsocketClient{logger: cfg.Logger, debug: cfg.Debug, apiKey: cfg.APIKey}
if websocketClient.logger == nil {
logger, err := createNewLogger()
if err != nil {
return nil, errors.Wrap(err, "Error create new zap logger")
}
websocketClient.logger = logger
}
websocketClient.connect(cfg.URL)
return websocketClient, nil
}
func (w *WebsocketClient) Close() error {
return w.conn.Close()
}
func (w *WebsocketClient) connect(url WebsocketURL) {
if w.conn != nil {
return
}
ws, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
return
}
w.conn = ws
}
func (w *WebsocketClient) Login() error {
sub := `{"event": "login", "data": {"apiKey": "` + w.apiKey + `" }}`
if err := w.conn.WriteMessage(websocket.TextMessage, []byte(sub)); err != nil {
return errors.Wrap(err, "can't login in websocket server")
}
return nil
}
func (w *WebsocketClient) Subscribe(tiker string) error {
sub := `{"event": "subscribe", "data": {"ticker": "` + tiker + `" }}`
if err := w.conn.WriteMessage(websocket.TextMessage, []byte(sub)); err != nil {
return errors.Wrap(err, "can't subscribe to event")
}
return nil
}
func (w *WebsocketClient) Unsubscribe(tiker string) error {
sub := `{"event": "unsubscribe", "data": {"ticker": "` + tiker + `" }}`
if err := w.conn.WriteMessage(websocket.TextMessage, []byte(sub)); err != nil {
return errors.Wrap(err, "can't unsubscribe from event")
}
return nil
}
func (w *WebsocketClient) RunReadLoop(fn func(event Event) error) error {
for {
_, msg, err := w.conn.ReadMessage()
if err != nil {
return errors.Wrap(err, "can't read message")
}
var event Event
if err := jsoniter.Unmarshal(msg, &event); err != nil {
w.logger.Error(
"Can't unmarshal event",
zap.Error(err),
zap.Any("message", string(msg)),
)
continue
}
if err := fn(event); err != nil {
return err
}
}
}
// String ...
func (w WebsocketURL) String() string {
return string(w)
}