forked from timpalpant/go-tradier
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
144 lines (126 loc) · 3.55 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package tradier
import (
"bufio"
"encoding/json"
"io"
)
// StreamEvent is used to unmarshal stream events before they are demuxed.
// Message contains the remainder of the type-specific message.
//
// StreamEvents can be demuxed into type-specific events using
// the StreamDemuxer.
type StreamEvent struct {
Type string
Symbol string
Message json.RawMessage
Error error
}
func UnmarshalStreamEvent(buf []byte, se *StreamEvent) error {
se.Message = make([]byte, len(buf))
copy(se.Message, buf)
se.Error = json.Unmarshal(buf, se)
return se.Error
}
type QuoteEvent struct {
Symbol string
Bid float64
BidSize int64 `json:"bidsz"`
BidExchange string `json:"bidexch"`
BidDateMs int64 `json:"biddate,string"`
Ask float64
AskSize int64 `json:"asksz"`
AskExchange string `json:"askexch"`
AskDateMs int64 `json:"askdate,string"`
}
type TimeSaleEvent struct {
Type string `json:"type"`
Symbol string
Exchange string `json:"exch"`
Bid float64 `json:",string"`
Ask float64 `json:",string"`
Last float64 `json:",string"`
Size int64 `json:",string"`
DateMs int64 `json:"date,string"`
Seq int64
Flag string
Cancel bool
Correction bool
Session string
}
type TradeEvent struct {
Symbol string
Type string `json:"type"`
Exchange string `json:"exch"`
Price float64 `json:",string"`
Last float64 `json:",string"`
Size int64 `json:",string"`
CumulativeVolume int64 `json:"cvol,string"`
DateMs int64 `json:"date,string"`
}
type SummaryEvent struct {
Symbol string
Open float64 `json:",string"`
High float64 `json:",string"`
Low float64 `json:",string"`
PreviousClose float64 `json:"prevClose,string"`
}
// MarketEventStream scans the newline-delimited market stream
// sent by Tradier and decodes each event into a StreamEvent.
type MarketEventStream struct {
// A message on this channel indicates to the http consumer to shutdown the stream.
// All channels will be closed by the goroutine that owns this stream.
closeChan chan struct{}
}
func NewMarketEventStream(input io.ReadCloser, output chan *StreamEvent) *MarketEventStream {
mes := &MarketEventStream{
closeChan: make(chan struct{}),
}
go mes.consumeEvents(input, output)
return mes
}
func (mes *MarketEventStream) Stop() {
close(mes.closeChan)
}
func (mes *MarketEventStream) consumeEvents(
input io.ReadCloser,
output chan *StreamEvent) {
scanner := bufio.NewScanner(input)
defer input.Close()
defer close(output)
for scanner.Scan() {
event := &StreamEvent{}
if err := UnmarshalStreamEvent(scanner.Bytes(), event); err != nil {
Logger.Println(err)
}
select {
case output <- event:
case <-mes.closeChan:
return
default:
Logger.Println("stream output channel is full, dropping stream event")
}
}
if err := scanner.Err(); err != nil {
Logger.Println(err)
}
}
func DecodeQuote(e *StreamEvent) (*QuoteEvent, error) {
q := &QuoteEvent{Symbol: e.Symbol}
err := json.Unmarshal(e.Message, q)
return q, err
}
func DecodeTrade(e *StreamEvent) (*TradeEvent, error) {
t := &TradeEvent{Symbol: e.Symbol}
err := json.Unmarshal(e.Message, t)
return t, err
}
func DecodeTimeSale(e *StreamEvent) (*TimeSaleEvent, error) {
ts := &TimeSaleEvent{Symbol: e.Symbol}
err := json.Unmarshal(e.Message, ts)
return ts, err
}
func DecodeSummary(e *StreamEvent) (*SummaryEvent, error) {
s := &SummaryEvent{Symbol: e.Symbol}
err := json.Unmarshal(e.Message, s)
return s, err
}