forked from JonCooperWorks/go-tdameritrade
/
streaming.go
256 lines (218 loc) · 7.75 KB
/
streaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package tdameritrade
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
)
type Command struct {
Requests []StreamRequest `json:"requests"`
}
type StreamRequest struct {
Service string `json:"service"`
Requestid string `json:"requestid"`
Command string `json:"command"`
Account string `json:"account"`
Source string `json:"source"`
Parameters StreamParams `json:"parameters"`
}
type StreamParams struct {
Keys string `json:"keys"`
Fields string `json:"fields"`
}
// NewStreamAuthCommand creates a StreamAuthCommand from a TD Ameritrade UserPrincipal.
// It validates the account ID against the accounts in the UserPrincipal to avoid creating invalid messages unneccesarily.
func NewStreamAuthCommand(userPrincipal *UserPrincipal, accountID string) (*StreamAuthCommand, error) {
// findAccount ensures that a user has passed us an account they control to avoid wasting TD Ameritrade's time.
account, err := findAccount(userPrincipal, accountID)
if err != nil {
return nil, err
}
timestamp, err := time.Parse("2006-01-02T15:04:05-0700", userPrincipal.StreamerInfo.TokenTimestamp)
if err != nil {
return nil, err
}
credentials := url.Values{}
credentials.Add("userid", account.AccountID)
credentials.Add("token", userPrincipal.StreamerInfo.Token)
credentials.Add("company", account.Company)
credentials.Add("segment", account.Segment)
credentials.Add("cddomain", account.AccountCdDomainID)
credentials.Add("usergroup", userPrincipal.StreamerInfo.UserGroup)
credentials.Add("accesslevel", userPrincipal.StreamerInfo.AccessLevel)
credentials.Add("authorized", "Y")
credentials.Add("timestamp", fmt.Sprintf("%d", ConvertToEpoch(timestamp)))
credentials.Add("appid", userPrincipal.StreamerInfo.AppID)
credentials.Add("acl", userPrincipal.StreamerInfo.ACL)
// TD Ameritrade expects this JSON command from clients.
authCmd := StreamAuthCommand{
Requests: []StreamAuthRequest{
{
Service: "ADMIN",
Command: "LOGIN",
Requestid: 0,
Account: account.AccountID,
Source: userPrincipal.StreamerInfo.AppID,
Parameters: StreamAuthParams{
Credential: credentials.Encode(),
Token: userPrincipal.StreamerInfo.Token,
Version: "1.0",
},
},
},
}
return &authCmd, nil
}
type StreamAuthCommand struct {
Requests []StreamAuthRequest `json:"requests"`
}
type StreamAuthRequest struct {
Service string `json:"service"`
Command string `json:"command"`
Requestid int `json:"requestid"`
Account string `json:"account"`
Source string `json:"source"`
Parameters StreamAuthParams `json:"parameters"`
}
type StreamAuthResponse struct {
Response []StreamAuthResponseBody `json:"response"`
}
type StreamAuthResponseBody struct {
Service string `json:"service"`
Requestid string `json:"requestid"`
Command string `json:"command"`
Timestamp int64 `json:"timestamp"`
Content StreamAuthResponseContent `json:"content"`
}
type StreamAuthResponseContent struct {
Code int `json:"code"`
Msg string `json:"msg"`
}
type StreamAuthParams struct {
Credential string `json:"credential"`
Token string `json:"token"`
Version string `json:"version"`
}
// StreamingClient provides real time updates from TD Ameritrade's streaming API.
// See https://developer.tdameritrade.com/content/streaming-data for more information.
type StreamingClient struct {
connection *websocket.Conn
messages chan []byte
errors chan error
mu sync.Mutex
}
// Close closes the underlying websocket connection.
func (s *StreamingClient) Close() error {
close(s.messages)
close(s.errors)
return s.connection.Close()
}
// SendText sends a byte payload to TD Ameritrade's websocket.
// TD Ameritrade commands are JSON encoded payloads.
// You should generally be using SendCommand to send commands to TD Ameritrade.
func (s *StreamingClient) SendText(payload []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.connection.WriteMessage(websocket.TextMessage, payload)
}
// ReceiveText returns read-only channels with the raw byte responses from TD Ameritrade and errors generated while streaming.
// Callers should select over both of these channels to avoid blocking one.
// Callers are able to handle errors how thes see fit.
// All errors will be from Gorilla's websocket library and implement the net.Error interface.
func (s *StreamingClient) ReceiveText() (<-chan []byte, <-chan error) {
return s.messages, s.errors
}
// SendCommand serializes and sends a Command struct to TD Ameritrade.
// It is a wrapper around SendText.
func (s *StreamingClient) SendCommand(command Command) error {
commandBytes, err := json.Marshal(command)
if err != nil {
return err
}
return s.SendText(commandBytes)
}
func (s *StreamingClient) Authenticate(authCmd *StreamAuthCommand) error {
jsonCmd, err := json.Marshal(authCmd)
if err != nil {
return err
}
// Authenticate with TD's websocket using the StreamAuthCommand
return s.SendText(jsonCmd)
}
// NewUnauthenticatedStreamingClient returns an unauthenticated streaming client that has a connection to the TD Ameritrade websocket.
// You can get an authenticated streaming client with NewAuthenticatedStreamingClient.
// To authenticate manually, send a JSON serialized StreamAuthCommand message with the StreamingClient's Authenticate method.
// You'll need to Close a streaming client to free up the underlying resources.
func NewUnauthenticatedStreamingClient(userPrincipal *UserPrincipal) (*StreamingClient, error) {
streamURL := url.URL{
Scheme: "wss",
Host: userPrincipal.StreamerInfo.StreamerSocketURL,
Path: "/ws",
}
conn, _, err := websocket.DefaultDialer.Dial(streamURL.String(), nil)
if err != nil {
return nil, err
}
streamingClient := &StreamingClient{
connection: conn,
messages: make(chan []byte),
errors: make(chan error),
}
// Pass messages and errors down the respective channels.
go func() {
for {
_, message, err := streamingClient.connection.ReadMessage()
if err != nil {
streamingClient.errors <- err
return
}
streamingClient.messages <- message
}
}()
return streamingClient, nil
}
// NewAuthenticatedStreamingClient returns a client that will pull live updates for a TD Ameritrade account.
// It sends an initial authentication message to TD Ameritrade and waits for a response before returning.
// Use NewUnauthenticatedStreamingClient if you want to handle authentication yourself.
// You'll need to Close a StreamingClient to free up the underlying resources.
func NewAuthenticatedStreamingClient(userPrincipal *UserPrincipal, accountID string) (*StreamingClient, error) {
streamingClient, err := NewUnauthenticatedStreamingClient(userPrincipal)
if err != nil {
return nil, err
}
authCmd, err := NewStreamAuthCommand(userPrincipal, accountID)
if err != nil {
return nil, err
}
err = streamingClient.Authenticate(authCmd)
if err != nil {
return nil, err
}
// Wait on a response from TD Ameritrade.
select {
case message := <-streamingClient.messages:
var authResponse StreamAuthResponse
err = json.Unmarshal(message, &authResponse)
if err != nil {
return nil, err
}
// Response with a code 0 means authentication succeeded.
if authResponse.Response[0].Content.Code != 0 {
return nil, errors.New(authResponse.Response[0].Content.Msg)
}
return streamingClient, nil
case err := <-streamingClient.errors:
return nil, err
}
}
func findAccount(userPrincipal *UserPrincipal, accountID string) (*UserAccountInfo, error) {
for _, acc := range userPrincipal.Accounts {
if acc.AccountID == accountID {
return &acc, nil
}
}
return nil, fmt.Errorf("account '%s' not found", accountID)
}